initialise repo
[debian/orchestra.git] / src / player / player.go
1 /* player.go
2 */
3
4 package main
5
6 import (
7         "os"
8         "fmt"
9         "flag"
10         o       "orchestra"
11         "crypto/tls"
12         "crypto/x509"
13         "net"
14         "time"
15         "container/list"
16 )
17
18 const (
19         InitialReconnectDelay           = 5e9
20         MaximumReconnectDelay           = 300e9
21         ReconnectDelayScale             = 2
22         KeepaliveDelay                  = 200e9
23         RetryDelay                      = 5e9
24 )
25
26 type NewConnectionInfo struct {
27         conn net.Conn
28         timeout int64
29 }
30
31 var (
32         ConfigFile              = flag.String("config-file", "/etc/orchestra/player.conf", "Path to the configuration file")    
33         DontVerifyPeer          = flag.Bool("dont-verify-peer", false, "Ignore TLS verification for the peer")
34         CertPair tls.Certificate
35         CACertPool *x509.CertPool
36         LocalHostname string    = ""
37         
38         receivedMessage         = make(chan *o.WirePkt)
39         lostConnection          = make(chan int)
40         reloadScores            = make(chan int, 2)
41         pendingQueue            = list.New()
42         unacknowledgedQueue     = list.New()
43         newConnection           = make(chan *NewConnectionInfo)
44         pendingTaskRequest      = false
45         InvalidValueError       = os.NewError("Invalid value")
46 )
47
48 func getNextPendingTask() (task *TaskRequest) {
49         e := pendingQueue.Front()
50         if e != nil {
51                 task, _ = e.Value.(*TaskRequest)
52                 pendingQueue.Remove(e)
53         }
54         return task
55 }
56
57 func appendPendingTask(task *TaskRequest) {
58         pendingTaskRequest = false
59         pendingQueue.PushBack(task)
60 }
61
62 func getNextUnacknowledgedResponse() (resp *TaskResponse) {
63         e := unacknowledgedQueue.Front()
64         if e != nil {
65                 resp, _ = e.Value.(*TaskResponse)
66                 unacknowledgedQueue.Remove(e)
67         }
68         return resp
69 }
70
71 func appendUnacknowledgedResponse(resp *TaskResponse) {
72         resp.RetryTime = time.Nanoseconds() + RetryDelay
73         unacknowledgedQueue.PushBack(resp)
74 }
75
76 func acknowledgeResponse(jobid uint64) {
77         for e := unacknowledgedQueue.Front(); e != nil; e = e.Next() {
78                 resp := e.Value.(*TaskResponse)
79                 if resp.id == jobid {
80                         unacknowledgedQueue.Remove(e)
81                 }
82         }
83 }
84
85 func sendResponse(c net.Conn, resp *TaskResponse) {
86         //FIXME: update retry time on Response
87         o.Debug("Sending Response!")
88         ptr := resp.Encode()
89         p, err := o.Encode(ptr)
90         o.MightFail(err, "Failed to encode response")
91         _, err = p.Send(c)
92         if err != nil {
93                 o.Warn("Transmission error: %s", err)
94                 c.Close()
95                 prequeueResponse(resp)
96                 lostConnection <- 1
97         } else {
98                 appendUnacknowledgedResponse(resp)
99         }
100 }
101
102 func prequeueResponse(resp *TaskResponse) {
103         unacknowledgedQueue.PushFront(resp)
104 }
105
106 func Reader(conn net.Conn) {
107         defer func(l chan int) {
108                 l <- 1
109         }(lostConnection)
110
111         for {
112                 pkt, err := o.Receive(conn)
113                 if (err != nil) {
114                         o.Warn("Error receiving message: %s", err)
115                         break;
116                 }
117                 receivedMessage <- pkt
118         }       
119 }
120
121 func handleNop(c net.Conn, message interface{}) {
122         o.Debug("NOP Received")
123 }
124
125 func handleIllegal(c net.Conn, message interface{}) {
126         o.Fail("Got Illegal Message")
127 }
128
129 func handleRequest(c net.Conn, message interface{}) {
130         o.Debug("Request Recieved.  Decoding!")
131         ptr, ok := message.(*o.ProtoTaskRequest)
132         if !ok {
133                 o.Assert("CC stuffed up - handleRequest got something that wasn't a ProtoTaskRequest.")
134         }
135         task := TaskFromProto(ptr)
136         /* search the registry for the task */
137         o.Debug("Request for Job.ID %d", task.Id)
138         existing := TaskGet(task.Id)
139         if nil != existing {
140                 if (existing.MyResponse.IsFinished()) {
141                         o.Debug("job%d: Resending Response", task.Id)
142                         sendResponse(c, existing.MyResponse)
143                 }
144         } else {
145                 // check to see if we have the score
146                 // add the Job to our Registry
147                 task.MyResponse = NewTaskResponse()
148                 task.MyResponse.id = task.Id
149                 task.MyResponse.State = RESP_PENDING            
150                 TaskAdd(task)
151                 o.Info("Added New Task (Job ID %d) to our local registry", task.Id)
152                 // and then push it onto the pending job list so we know it needs actioning.
153                 appendPendingTask(task)
154         }
155 }
156
157 func handleAck(c net.Conn, message interface{}) {
158         o.Debug("Ack Received")
159         ack, ok := message.(*o.ProtoAcknowledgement)
160         if !ok {
161                 o.Assert("CC stuffed up - handleAck got something that wasn't a ProtoAcknowledgement.")
162         }
163         if ack.Id != nil {
164                 acknowledgeResponse(*ack.Id)
165         }
166 }
167
168
169 var dispatcher  = map[uint8] func(net.Conn, interface{}) {
170         o.TypeNop:              handleNop,
171         o.TypeTaskRequest:      handleRequest,
172         o.TypeAcknowledgement:  handleAck,
173
174         /* P->C only messages, should never appear on the wire to us. */
175         o.TypeIdentifyClient:   handleIllegal,
176         o.TypeReadyForTask:     handleIllegal,
177         o.TypeTaskResponse:     handleIllegal,
178 }
179
180 func connectMe(initialDelay int64) {
181         var backOff int64 = initialDelay
182         for {
183                 // Sleep first.
184                 if backOff > 0 {
185                         o.Info("Sleeping for %d seconds", backOff/1e9)
186                         err := time.Sleep(backOff)
187                         o.MightFail(err, "Couldn't Sleep")
188                         backOff *= ReconnectDelayScale
189                         if backOff > MaximumReconnectDelay {
190                                 backOff = MaximumReconnectDelay
191                         }
192                 } else {
193                         backOff = InitialReconnectDelay
194                 }
195
196                 tconf := &tls.Config{
197                 RootCAs: CACertPool,
198                 }
199                 tconf.Certificates = append(tconf.Certificates, CertPair)
200
201                 // update our local hostname.
202                 LocalHostname = GetStringOpt("player name")
203                 if (LocalHostname == "") {
204                         LocalHostname = o.ProbeHostname()
205                         o.Warn("No hostname provided - probed hostname: %s", LocalHostname)
206                 }
207
208                 masterHostname := GetStringOpt("master")
209
210                 raddr := fmt.Sprintf("%s:%d", masterHostname, 2258)
211                 o.Info("Connecting to %s", raddr)
212                 conn, err := tls.Dial("tcp", raddr, tconf)              
213                 if err == nil && !*DontVerifyPeer {
214                         conn.Handshake()
215                         err = conn.VerifyHostname(masterHostname)
216                 }
217                 if err == nil {
218                         nc := new(NewConnectionInfo)
219                         nc.conn = conn
220                         nc.timeout = backOff
221                         newConnection <- nc
222                         return
223                 }
224                 o.Warn("Couldn't connect to master: %s", err)
225         }
226 }
227
228 func ProcessingLoop() {
229         var     conn                    net.Conn                = nil
230         var     nextRetryResp           *TaskResponse           = nil
231         var     taskCompletionChan      <-chan *TaskResponse    = nil
232         var     connectDelay            int64                   = 0
233         var     doScoreReload           bool                    = false
234         // kick off a new connection attempt.
235         go connectMe(connectDelay)
236
237         // and this is where we spin!
238         for {   
239                 var retryDelay int64 = 0
240                 var retryChan  <-chan int64 = nil
241
242                 if conn != nil {
243                         for nextRetryResp == nil {
244                                 nextRetryResp = getNextUnacknowledgedResponse()
245                                 if nil == nextRetryResp {
246                                         break
247                                 }
248                                 retryDelay = nextRetryResp.RetryTime - time.Nanoseconds()
249                                 if retryDelay < 0 {
250                                         sendResponse(conn, nextRetryResp)
251                                         nextRetryResp = nil
252                                 }
253                         }
254                         if nextRetryResp != nil {
255                                 retryChan = time.After(retryDelay)
256                         }
257                 }
258                 if taskCompletionChan == nil {
259                         nextTask := getNextPendingTask()
260                         if nextTask != nil {
261                                 taskCompletionChan = ExecuteTask(nextTask)
262                         } else {
263                                 if conn != nil && !pendingTaskRequest {
264                                         o.Debug("Asking for trouble")
265                                         p := o.MakeReadyForTask()
266                                         p.Send(conn)
267                                         o.Debug("Sent Request for trouble")
268                                         pendingTaskRequest = true
269                                 }
270                         }
271                 }
272                 select {
273                 // Currently executing job finishes.
274                 case newresp := <- taskCompletionChan:
275                         o.Debug("job%d: Completed with State %s\n", newresp.id, newresp.State)
276                         // preemptively set a retrytime.
277                         newresp.RetryTime = time.Nanoseconds()
278                         // ENOCONN - sub it in as our next retryresponse, and prepend the old one onto the queue.
279                         if nil == conn {
280                                 if nil != nextRetryResp {
281                                         prequeueResponse(nextRetryResp)
282                                 }
283                                 o.Debug("job%d: Queuing Initial Response", newresp.id)
284                                 nextRetryResp = newresp
285                         } else {
286                                 o.Debug("job%d: Sending Initial Response", newresp.id)
287                                 sendResponse(conn, newresp)
288                         }
289                         if doScoreReload {
290                                 o.Info("Performing Deferred score reload")
291                                 LoadScores()
292                                 doScoreReload = false
293                         }
294                         taskCompletionChan = nil
295                 // If the current unacknowledged response needs a retry, send it.
296                 case <-retryChan:
297                         sendResponse(conn, nextRetryResp)
298                         nextRetryResp = nil
299                 // New connection.  Set up the receiver thread and Introduce ourselves.
300                 case nci := <-newConnection:
301                         if conn != nil {
302                                 conn.Close()
303                         }
304                         conn = nci.conn
305                         connectDelay = nci.timeout
306                         pendingTaskRequest = false
307
308                         // start the reader
309                         go Reader(conn)
310                 
311                         /* Introduce ourself */
312                         p := o.MakeIdentifyClient(LocalHostname)
313                         p.Send(conn)
314                 // Lost connection.  Shut downt he connection.
315                 case <-lostConnection:
316                         o.Warn("Lost Connection to Master")
317                         conn.Close()
318                         conn = nil
319                         // restart the connection attempts
320                         go connectMe(connectDelay)
321                 // Message received from master.  Decode and action.
322                 case p := <-receivedMessage:
323                         // because the message could possibly be an ACK, push the next retry response back into the queue so acknowledge can find it.
324                         if nil != nextRetryResp {
325                                 prequeueResponse(nextRetryResp)
326                                 nextRetryResp = nil
327                         }
328                         var upkt interface{} = nil
329                         if p.Length > 0 {
330                                 var err os.Error
331                                 upkt, err = p.Decode()
332                                 o.MightFail(err, "Couldn't decode packet from master")
333                         }
334                         handler, exists := dispatcher[p.Type]
335                         if (exists) {
336                                 connectDelay = 0
337                                 handler(conn, upkt)
338                         } else {
339                                 o.Fail("Unhandled Pkt Type %d", p.Type)
340                         }
341                 // Reload scores
342                 case <-reloadScores:
343                         // fortunately this is actually completely safe as 
344                         // long as nobody's currently executing.
345                         // who'd have thunk it?
346                         if taskCompletionChan == nil {
347                                 o.Info("Reloading scores")
348                                 LoadScores()
349                         } else {
350                                 o.Info("Deferring score reload (execution in progress)")
351                                 doScoreReload = true
352                         }
353                 // Keepalive delay expired.  Send Nop.
354                 case <-time.After(KeepaliveDelay):
355                         if conn == nil {
356                                 break
357                         }
358                         o.Debug("Sending Nop")
359                         p := o.MakeNop()
360                         p.Send(conn)
361                 }
362         }
363 }
364
365 func main() {
366         o.SetLogName("player")
367
368         flag.Parse()
369
370         ConfigLoad()
371         LoadScores()
372         ProcessingLoop()
373 }