19 InitialReconnectDelay = 5e9
20 MaximumReconnectDelay = 300e9
21 ReconnectDelayScale = 2
22 KeepaliveDelay = 200e9
26 type NewConnectionInfo struct {
32 ConfigFile = flag.String("config-file", "/etc/orchestra/player.conf", "Path to the configuration file")
33 DontVerifyPeer = flag.Bool("dont-verify-peer", false, "Ignore TLS verification for the peer")
34 CertPair tls.Certificate
35 CACertPool *x509.CertPool
36 LocalHostname string = ""
38 receivedMessage = make(chan *o.WirePkt)
39 lostConnection = make(chan int)
40 reloadScores = make(chan int, 2)
41 pendingQueue = list.New()
42 unacknowledgedQueue = list.New()
43 newConnection = make(chan *NewConnectionInfo)
44 pendingTaskRequest = false
45 InvalidValueError = os.NewError("Invalid value")
48 func getNextPendingTask() (task *TaskRequest) {
49 e := pendingQueue.Front()
51 task, _ = e.Value.(*TaskRequest)
52 pendingQueue.Remove(e)
57 func appendPendingTask(task *TaskRequest) {
58 pendingTaskRequest = false
59 pendingQueue.PushBack(task)
62 func getNextUnacknowledgedResponse() (resp *TaskResponse) {
63 e := unacknowledgedQueue.Front()
65 resp, _ = e.Value.(*TaskResponse)
66 unacknowledgedQueue.Remove(e)
71 func appendUnacknowledgedResponse(resp *TaskResponse) {
72 resp.RetryTime = time.Nanoseconds() + RetryDelay
73 unacknowledgedQueue.PushBack(resp)
76 func acknowledgeResponse(jobid uint64) {
77 for e := unacknowledgedQueue.Front(); e != nil; e = e.Next() {
78 resp := e.Value.(*TaskResponse)
80 unacknowledgedQueue.Remove(e)
85 func sendResponse(c net.Conn, resp *TaskResponse) {
86 //FIXME: update retry time on Response
87 o.Debug("Sending Response!")
89 p, err := o.Encode(ptr)
90 o.MightFail(err, "Failed to encode response")
93 o.Warn("Transmission error: %s", err)
95 prequeueResponse(resp)
98 appendUnacknowledgedResponse(resp)
102 func prequeueResponse(resp *TaskResponse) {
103 unacknowledgedQueue.PushFront(resp)
106 func Reader(conn net.Conn) {
107 defer func(l chan int) {
112 pkt, err := o.Receive(conn)
114 o.Warn("Error receiving message: %s", err)
117 receivedMessage <- pkt
121 func handleNop(c net.Conn, message interface{}) {
122 o.Debug("NOP Received")
125 func handleIllegal(c net.Conn, message interface{}) {
126 o.Fail("Got Illegal Message")
129 func handleRequest(c net.Conn, message interface{}) {
130 o.Debug("Request Recieved. Decoding!")
131 ptr, ok := message.(*o.ProtoTaskRequest)
133 o.Assert("CC stuffed up - handleRequest got something that wasn't a ProtoTaskRequest.")
135 task := TaskFromProto(ptr)
136 /* search the registry for the task */
137 o.Debug("Request for Job.ID %d", task.Id)
138 existing := TaskGet(task.Id)
140 if (existing.MyResponse.IsFinished()) {
141 o.Debug("job%d: Resending Response", task.Id)
142 sendResponse(c, existing.MyResponse)
145 // check to see if we have the score
146 // add the Job to our Registry
147 task.MyResponse = NewTaskResponse()
148 task.MyResponse.id = task.Id
149 task.MyResponse.State = RESP_PENDING
151 o.Info("Added New Task (Job ID %d) to our local registry", task.Id)
152 // and then push it onto the pending job list so we know it needs actioning.
153 appendPendingTask(task)
157 func handleAck(c net.Conn, message interface{}) {
158 o.Debug("Ack Received")
159 ack, ok := message.(*o.ProtoAcknowledgement)
161 o.Assert("CC stuffed up - handleAck got something that wasn't a ProtoAcknowledgement.")
164 acknowledgeResponse(*ack.Id)
169 var dispatcher = map[uint8] func(net.Conn, interface{}) {
170 o.TypeNop: handleNop,
171 o.TypeTaskRequest: handleRequest,
172 o.TypeAcknowledgement: handleAck,
174 /* P->C only messages, should never appear on the wire to us. */
175 o.TypeIdentifyClient: handleIllegal,
176 o.TypeReadyForTask: handleIllegal,
177 o.TypeTaskResponse: handleIllegal,
180 func connectMe(initialDelay int64) {
181 var backOff int64 = initialDelay
185 o.Info("Sleeping for %d seconds", backOff/1e9)
186 err := time.Sleep(backOff)
187 o.MightFail(err, "Couldn't Sleep")
188 backOff *= ReconnectDelayScale
189 if backOff > MaximumReconnectDelay {
190 backOff = MaximumReconnectDelay
193 backOff = InitialReconnectDelay
196 tconf := &tls.Config{
199 tconf.Certificates = append(tconf.Certificates, CertPair)
201 // update our local hostname.
202 LocalHostname = GetStringOpt("player name")
203 if (LocalHostname == "") {
204 LocalHostname = o.ProbeHostname()
205 o.Warn("No hostname provided - probed hostname: %s", LocalHostname)
208 masterHostname := GetStringOpt("master")
210 raddr := fmt.Sprintf("%s:%d", masterHostname, 2258)
211 o.Info("Connecting to %s", raddr)
212 conn, err := tls.Dial("tcp", raddr, tconf)
213 if err == nil && !*DontVerifyPeer {
215 err = conn.VerifyHostname(masterHostname)
218 nc := new(NewConnectionInfo)
224 o.Warn("Couldn't connect to master: %s", err)
228 func ProcessingLoop() {
229 var conn net.Conn = nil
230 var nextRetryResp *TaskResponse = nil
231 var taskCompletionChan <-chan *TaskResponse = nil
232 var connectDelay int64 = 0
233 var doScoreReload bool = false
234 // kick off a new connection attempt.
235 go connectMe(connectDelay)
237 // and this is where we spin!
239 var retryDelay int64 = 0
240 var retryChan <-chan int64 = nil
243 for nextRetryResp == nil {
244 nextRetryResp = getNextUnacknowledgedResponse()
245 if nil == nextRetryResp {
248 retryDelay = nextRetryResp.RetryTime - time.Nanoseconds()
250 sendResponse(conn, nextRetryResp)
254 if nextRetryResp != nil {
255 retryChan = time.After(retryDelay)
258 if taskCompletionChan == nil {
259 nextTask := getNextPendingTask()
261 taskCompletionChan = ExecuteTask(nextTask)
263 if conn != nil && !pendingTaskRequest {
264 o.Debug("Asking for trouble")
265 p := o.MakeReadyForTask()
267 o.Debug("Sent Request for trouble")
268 pendingTaskRequest = true
273 // Currently executing job finishes.
274 case newresp := <- taskCompletionChan:
275 o.Debug("job%d: Completed with State %s\n", newresp.id, newresp.State)
276 // preemptively set a retrytime.
277 newresp.RetryTime = time.Nanoseconds()
278 // ENOCONN - sub it in as our next retryresponse, and prepend the old one onto the queue.
280 if nil != nextRetryResp {
281 prequeueResponse(nextRetryResp)
283 o.Debug("job%d: Queuing Initial Response", newresp.id)
284 nextRetryResp = newresp
286 o.Debug("job%d: Sending Initial Response", newresp.id)
287 sendResponse(conn, newresp)
290 o.Info("Performing Deferred score reload")
292 doScoreReload = false
294 taskCompletionChan = nil
295 // If the current unacknowledged response needs a retry, send it.
297 sendResponse(conn, nextRetryResp)
299 // New connection. Set up the receiver thread and Introduce ourselves.
300 case nci := <-newConnection:
305 connectDelay = nci.timeout
306 pendingTaskRequest = false
311 /* Introduce ourself */
312 p := o.MakeIdentifyClient(LocalHostname)
314 // Lost connection. Shut downt he connection.
315 case <-lostConnection:
316 o.Warn("Lost Connection to Master")
319 // restart the connection attempts
320 go connectMe(connectDelay)
321 // Message received from master. Decode and action.
322 case p := <-receivedMessage:
323 // because the message could possibly be an ACK, push the next retry response back into the queue so acknowledge can find it.
324 if nil != nextRetryResp {
325 prequeueResponse(nextRetryResp)
328 var upkt interface{} = nil
331 upkt, err = p.Decode()
332 o.MightFail(err, "Couldn't decode packet from master")
334 handler, exists := dispatcher[p.Type]
339 o.Fail("Unhandled Pkt Type %d", p.Type)
343 // fortunately this is actually completely safe as
344 // long as nobody's currently executing.
345 // who'd have thunk it?
346 if taskCompletionChan == nil {
347 o.Info("Reloading scores")
350 o.Info("Deferring score reload (execution in progress)")
353 // Keepalive delay expired. Send Nop.
354 case <-time.After(KeepaliveDelay):
358 o.Debug("Sending Nop")
366 o.SetLogName("player")