initialise repo
[debian/orchestra.git] / src / conductor / client.go
1 /* client.go
2  *
3  * Client Handling
4 */
5
6 package main
7 import (
8         o "orchestra"
9         "net"
10         "time"
11         "os"
12         "crypto/tls"
13         "crypto/x509"
14 )
15
16 const (
17         KeepaliveDelay =        200e9 // once every 200 seconds.
18         RetryDelay     =        10e9 // retry every 10 seconds.  Must be smaller than the keepalive to avoid channel race.
19         OutputQueueDepth =      10 // This needs to be large enough that we don't deadlock on ourself.
20 )
21
22
23 type ClientInfo struct {
24         Player          string
25         PktOutQ         chan *o.WirePkt
26         PktInQ          chan *o.WirePkt
27         abortQ          chan int
28         TaskQ           chan *TaskRequest
29         connection      net.Conn
30         pendingTasks    map[uint64]*TaskRequest
31 }
32
33 func NewClientInfo() (client *ClientInfo) {
34         client = new(ClientInfo)
35         client.abortQ = make(chan int, 2)
36         client.PktOutQ = make(chan *o.WirePkt, OutputQueueDepth)
37         client.PktInQ = make(chan *o.WirePkt)
38         client.TaskQ = make(chan *TaskRequest)
39
40         return client
41 }
42
43 func (client *ClientInfo) Abort() {
44         PlayerDied(client)
45         reg := ClientGet(client.Player)
46         if reg != nil {
47                 reg.Disassociate()
48         }
49         client.abortQ <- 1;
50 }
51
52 func (client *ClientInfo) Name() (name string) {
53         if client.Player == "" {
54                 return "UNK:" + client.connection.RemoteAddr().String()
55         }
56         return client.Player
57 }
58
59 // Must not be used from inside of handlers.
60 func (client *ClientInfo) Send(p *o.WirePkt) {
61         client.PktOutQ <- p
62 }
63
64 // Can only be used form inside of handlers and the main client loop.
65 func (client *ClientInfo) sendNow(p *o.WirePkt) {
66         _, err := p.Send(client.connection)
67         if err != nil {
68                 o.Warn("Error sending pkt to %s: %s.  Terminating connection.", client.Name(), err)
69                 client.Abort()
70         }
71 }
72
73 func (client *ClientInfo) SendTask(task *TaskRequest) {
74         tr := task.Encode()
75         p, err := o.Encode(tr)
76         o.MightFail(err, "Couldn't encode task for client.")
77         client.Send(p)
78         task.RetryTime = time.Nanoseconds() + RetryDelay
79 }
80
81 func (client *ClientInfo) GotTask(task *TaskRequest) {
82         /* first up, look at the task state */
83         switch (task.State) {
84         case TASK_QUEUED:
85                 fallthrough
86         case TASK_PENDINGRESULT:
87                 /* this is a new task.  We should send it straight */
88                 task.Player = client.Player
89                 task.State = TASK_PENDINGRESULT
90                 client.pendingTasks[task.job.Id] = task
91                 client.SendTask(task)
92                 // request a update to the spool so the PENDING flag is stored.
93                 JobWriteUpdate(task.job.Id)
94         case TASK_FINISHED:
95                 /* discard.  We don't care about tasks that are done. */                
96         }
97 }
98
99 // reset the task state so it can be requeued.
100 func CleanTask(task *TaskRequest) {
101         task.State = TASK_QUEUED
102         task.Player = ""
103 }
104
105 // this merges the state from the registry record into the client it's called against.
106 // it also copies back the active communication channels to the registry record.
107 func (client *ClientInfo) MergeState(regrecord *ClientInfo) {
108         client.Player = regrecord.Player
109         client.pendingTasks = regrecord.pendingTasks
110
111         regrecord.TaskQ = client.TaskQ
112         regrecord.abortQ = client.abortQ
113         regrecord.PktOutQ = client.PktOutQ
114         regrecord.PktInQ = client.PktInQ
115         regrecord.connection = client.connection
116 }
117
118 // Sever the connection state from the client (used against registry records only)
119 func (client *ClientInfo) Disassociate() {
120         client.TaskQ = nil
121         client.abortQ = nil
122         client.PktInQ = nil
123         client.PktOutQ = nil
124         client.connection = nil
125 }
126
127 func handleNop(client *ClientInfo, message interface{}) {
128         o.Debug("Client %s: NOP Received", client.Name())
129 }
130
131 func handleIdentify(client *ClientInfo, message interface{}) {
132         if client.Player != "" {
133                 o.Warn("Client %s: Tried to reintroduce itself. Terminating Connection.", client.Name())
134                 client.Abort()
135                 return
136         }
137         ic, _ := message.(*o.IdentifyClient)
138         o.Info("Client %s: Identified Itself As \"%s\"", client.Name(), *ic.Hostname)
139         client.Player = *ic.Hostname
140         if (!HostAuthorised(client.Player)) {
141                 o.Warn("Client %s: Not Authorised.  Terminating Connection.", client.Name())
142                 client.Abort()
143                 return
144         }
145
146         /* if we're TLS, verify the client's certificate given the name it used */
147         tlsc, ok := client.connection.(*tls.Conn)
148         if ok && !*DontVerifyPeer {
149                 intermediates := x509.NewCertPool()
150
151                 o.Debug("Connection is TLS.")
152                 o.Debug("Checking Connection State")
153                 cs := tlsc.ConnectionState()
154                 vo := x509.VerifyOptions{
155                 Roots: CACertPool,
156                 Intermediates: intermediates,
157                 DNSName: client.Player,
158                 }
159                 if cs.PeerCertificates == nil || cs.PeerCertificates[0] == nil {
160                         o.Warn("Peer didn't provide a certificate. Aborting Connection.")
161                         client.Abort()
162                         return
163                 }
164                 // load any intermediate certificates from the chain
165                 // into the intermediates pool so we can verify that
166                 // the chain can be rebuilt.
167                 //
168                 // All we care is that we can reach an authorised CA.
169                 //
170                 //FIXME: Need CRL handling.
171                 if len(cs.PeerCertificates) > 1 {
172                         for i := 1; i < len(cs.PeerCertificates); i++ {
173                                 intermediates.AddCert(cs.PeerCertificates[i])
174                         }
175                 }
176                 _, err := cs.PeerCertificates[0].Verify(vo)
177                 if err != nil {
178                         o.Warn("couldn't verify client certificate: %s", err)
179                         client.Abort()
180                         return
181                 }
182         }
183         reg := ClientGet(client.Player)
184         if nil == reg {
185                 o.Warn("Couldn't register client %s.  aborting connection.", client.Name())
186                 client.Abort()
187                 return
188         }
189         client.MergeState(reg)
190 }
191
192 func handleReadyForTask(client *ClientInfo, message interface{}) {
193         o.Debug("Client %s: Asked for Job", client.Name())
194         PlayerWaitingForJob(client)
195 }
196
197 func handleIllegal(client *ClientInfo, message interface{}) {
198         o.Warn("Client %s: Sent Illegal Message")
199         client.Abort()
200 }
201
202 func handleResult(client *ClientInfo, message interface{}){
203         jr, _ := message.(*o.ProtoTaskResponse)
204         r := ResponseFromProto(jr)
205         // at this point in time, we only care about terminal
206         // condition codes.  a Job that isn't finished is just
207         // prodding us back to let us know it lives.
208         if r.IsFinished() {
209                 job := JobGet(r.id)
210                 if nil == job {
211                         o.Warn("Client %s: NAcking for Job %d - couldn't find job data.", client.Name(), r.id)
212                         nack := o.MakeNack(r.id)
213                         client.sendNow(nack)
214                 } else {
215                         job := JobGet(r.id)
216                         if job != nil {
217                                 o.Debug("Got Response.  Acking.")
218                                 /* if the job exists, Ack it. */
219                                 ack := o.MakeAck(r.id)
220                                 client.sendNow(ack)
221                         }
222                         // now, we only accept the results if we were
223                         // expecting the results (ie: it was pending)
224                         // and expunge the task information from the
225                         // pending list so we stop bugging the client for it.
226                         task, exists := client.pendingTasks[r.id]
227                         if exists {
228                                 o.Debug("Storing results for Job %d", r.id)
229                                 // store the result.
230                                 if !JobAddResult(client.Player, r) {
231                                         o.Assert("Couldn't add result for pending task")
232                                 }
233
234                                 // next, work out if the job is a retryable failure or not
235                                 var didretry bool = false
236
237                                 if r.DidFail() {
238                                         o.Info("Client %s reports failure for Job %d", client.Name(), r.id)
239                                         if r.CanRetry() {
240                                                 job := JobGet(r.id)
241                                                 if job.Scope == SCOPE_ONEOF {
242                                                         // right, we're finally deep enough to work out what's going on!
243                                                         JobDisqualifyPlayer(r.id, client.Player)
244                                                         if len(job.Players) >= 1 {
245                                                                 // still players left we can try?  then go for it!
246                                                                 CleanTask(task)
247                                                                 DispatchTask(task)
248                                                                 didretry = true
249                                                         }
250                                                 }
251                                         }
252                                 }
253                                 if !didretry {
254                                         // if we didn't retry, the task needs to be marked as finished.
255                                         task.State = TASK_FINISHED
256                                 }
257                                 // update the job state.
258                                 JobReviewState(r.id)
259
260                                 client.pendingTasks[r.id] = nil, false
261                         }
262                 }
263         }
264 }
265
266
267 var dispatcher  = map[uint8] func(*ClientInfo,interface{}) {
268         o.TypeNop:              handleNop,
269         o.TypeIdentifyClient:   handleIdentify,
270         o.TypeReadyForTask:     handleReadyForTask,
271         o.TypeTaskResponse:     handleResult,
272         /* C->P only messages, should never appear on the wire. */
273         o.TypeTaskRequest:      handleIllegal,
274
275 }
276
277 var loopFudge int64 = 10e6; /* 10 ms should be enough fudgefactor */
278 func clientLogic(client *ClientInfo) {
279         loop := true
280         for loop {
281                 var     retryWait <-chan int64 = nil
282                 var     retryTask *TaskRequest = nil
283                 if (client.Player != "") {
284                         var waitTime int64 = 0
285                         var now int64 = 0
286                         cleanPass := false
287                         attempts := 0
288                         for !cleanPass && attempts < 10 {
289                                 /* reset our state for the pass */
290                                 waitTime = 0
291                                 retryTask = nil
292                                 attempts++
293                                 cleanPass = true
294                                 now = time.Nanoseconds() + loopFudge
295                                 // if the client is correctly associated,
296                                 // evaluate all jobs for outstanding retries,
297                                 // and work out when our next retry is due.
298                                 for _,v := range client.pendingTasks {
299                                         if v.RetryTime < now {
300                                                 client.SendTask(v)
301                                                 cleanPass = false
302                                         } else {
303                                                 if waitTime == 0 || v.RetryTime < waitTime {
304                                                         retryTask = v
305                                                         waitTime = v.RetryTime
306                                                 }
307                                         }
308                                 }
309                         }
310                         if (attempts > 10) {
311                                 o.Fail("Couldn't find next timeout without restarting excessively.")
312                         }
313                         if (retryTask != nil) {
314                                 retryWait = time.After(waitTime-time.Nanoseconds())
315                         }
316                 }
317                 select {
318                 case <-retryWait:
319                         client.SendTask(retryTask)
320                 case p := <-client.PktInQ:
321                         /* we've received a packet.  do something with it. */
322                         if client.Player == "" && p.Type != o.TypeIdentifyClient {
323                                 o.Warn("Client %s didn't Identify self - got type %d instead!  Terminating Connection.", client.Name(), p.Type)
324                                 client.Abort()
325                                 break
326                         }
327                         var upkt interface {} = nil
328                         if p.Length > 0 {
329                                 var err os.Error
330
331                                 upkt, err = p.Decode()
332                                 if err != nil {
333                                         o.Warn("Error unmarshalling message from Client %s: %s.  Terminating Connection.", client.Name(), err)
334                                         client.Abort()
335                                         break
336                                 }
337                         }
338                         handler, exists := dispatcher[p.Type]
339                         if (exists) {
340                                 handler(client, upkt)
341                         } else {
342                                 o.Warn("Unhandled Pkt Type %d", p.Type)
343                         }
344                 case p := <-client.PktOutQ:
345                         if p != nil {
346                                 client.sendNow(p)
347                         }
348                 case t := <-client.TaskQ:
349                         client.GotTask(t)
350                 case <-client.abortQ:
351                         o.Debug("Client %s connection has been told to abort!", client.Name())
352                         loop = false
353                 case <-time.After(KeepaliveDelay):
354                         p := o.MakeNop()
355                         o.Debug("Sending Keepalive to %s", client.Name())
356                         _, err := p.Send(client.connection)
357                         if err != nil {
358                                 o.Warn("Error sending pkt to %s: %s.  Terminating Connection.", client.Name(), err)     
359                                 client.Abort()
360                         }
361                 }
362         }
363         client.connection.Close()
364 }
365
366 func clientReceiver(client *ClientInfo) {
367         conn := client.connection
368
369
370         loop := true
371         for loop {
372                 pkt, err := o.Receive(conn)
373                 if nil != err {
374                         o.Warn("Error receiving pkt from %s: %s", conn.RemoteAddr().String(), err)
375                         client.Abort()
376                         client.connection.Close()
377                         loop = false
378                 } else {
379                         client.PktInQ <- pkt
380                 }
381         }
382         o.Debug("Client %s connection reader has exited it's loop!", conn.RemoteAddr().String())
383 }
384
385 /* The Main Server loop calls this method to hand off connections to us */
386 func HandleConnection(conn net.Conn) {
387         /* this is a temporary client info, we substitute it for the real
388          * one once we ID the connection correctly */
389         c := NewClientInfo()
390         c.connection = conn
391         go clientReceiver(c)
392         go clientLogic(c)
393 }