17 KeepaliveDelay = 200e9 // once every 200 seconds.
18 RetryDelay = 10e9 // retry every 10 seconds. Must be smaller than the keepalive to avoid channel race.
19 OutputQueueDepth = 10 // This needs to be large enough that we don't deadlock on ourself.
23 type ClientInfo struct {
25 PktOutQ chan *o.WirePkt
26 PktInQ chan *o.WirePkt
28 TaskQ chan *TaskRequest
30 pendingTasks map[uint64]*TaskRequest
33 func NewClientInfo() (client *ClientInfo) {
34 client = new(ClientInfo)
35 client.abortQ = make(chan int, 2)
36 client.PktOutQ = make(chan *o.WirePkt, OutputQueueDepth)
37 client.PktInQ = make(chan *o.WirePkt)
38 client.TaskQ = make(chan *TaskRequest)
43 func (client *ClientInfo) Abort() {
45 reg := ClientGet(client.Player)
52 func (client *ClientInfo) Name() (name string) {
53 if client.Player == "" {
54 return "UNK:" + client.connection.RemoteAddr().String()
59 // Must not be used from inside of handlers.
60 func (client *ClientInfo) Send(p *o.WirePkt) {
64 // Can only be used form inside of handlers and the main client loop.
65 func (client *ClientInfo) sendNow(p *o.WirePkt) {
66 _, err := p.Send(client.connection)
68 o.Warn("Error sending pkt to %s: %s. Terminating connection.", client.Name(), err)
73 func (client *ClientInfo) SendTask(task *TaskRequest) {
75 p, err := o.Encode(tr)
76 o.MightFail(err, "Couldn't encode task for client.")
78 task.RetryTime = time.Nanoseconds() + RetryDelay
81 func (client *ClientInfo) GotTask(task *TaskRequest) {
82 /* first up, look at the task state */
86 case TASK_PENDINGRESULT:
87 /* this is a new task. We should send it straight */
88 task.Player = client.Player
89 task.State = TASK_PENDINGRESULT
90 client.pendingTasks[task.job.Id] = task
92 // request a update to the spool so the PENDING flag is stored.
93 JobWriteUpdate(task.job.Id)
95 /* discard. We don't care about tasks that are done. */
99 // reset the task state so it can be requeued.
100 func CleanTask(task *TaskRequest) {
101 task.State = TASK_QUEUED
105 // this merges the state from the registry record into the client it's called against.
106 // it also copies back the active communication channels to the registry record.
107 func (client *ClientInfo) MergeState(regrecord *ClientInfo) {
108 client.Player = regrecord.Player
109 client.pendingTasks = regrecord.pendingTasks
111 regrecord.TaskQ = client.TaskQ
112 regrecord.abortQ = client.abortQ
113 regrecord.PktOutQ = client.PktOutQ
114 regrecord.PktInQ = client.PktInQ
115 regrecord.connection = client.connection
118 // Sever the connection state from the client (used against registry records only)
119 func (client *ClientInfo) Disassociate() {
124 client.connection = nil
127 func handleNop(client *ClientInfo, message interface{}) {
128 o.Debug("Client %s: NOP Received", client.Name())
131 func handleIdentify(client *ClientInfo, message interface{}) {
132 if client.Player != "" {
133 o.Warn("Client %s: Tried to reintroduce itself. Terminating Connection.", client.Name())
137 ic, _ := message.(*o.IdentifyClient)
138 o.Info("Client %s: Identified Itself As \"%s\"", client.Name(), *ic.Hostname)
139 client.Player = *ic.Hostname
140 if (!HostAuthorised(client.Player)) {
141 o.Warn("Client %s: Not Authorised. Terminating Connection.", client.Name())
146 /* if we're TLS, verify the client's certificate given the name it used */
147 tlsc, ok := client.connection.(*tls.Conn)
148 if ok && !*DontVerifyPeer {
149 intermediates := x509.NewCertPool()
151 o.Debug("Connection is TLS.")
152 o.Debug("Checking Connection State")
153 cs := tlsc.ConnectionState()
154 vo := x509.VerifyOptions{
156 Intermediates: intermediates,
157 DNSName: client.Player,
159 if cs.PeerCertificates == nil || cs.PeerCertificates[0] == nil {
160 o.Warn("Peer didn't provide a certificate. Aborting Connection.")
164 // load any intermediate certificates from the chain
165 // into the intermediates pool so we can verify that
166 // the chain can be rebuilt.
168 // All we care is that we can reach an authorised CA.
170 //FIXME: Need CRL handling.
171 if len(cs.PeerCertificates) > 1 {
172 for i := 1; i < len(cs.PeerCertificates); i++ {
173 intermediates.AddCert(cs.PeerCertificates[i])
176 _, err := cs.PeerCertificates[0].Verify(vo)
178 o.Warn("couldn't verify client certificate: %s", err)
183 reg := ClientGet(client.Player)
185 o.Warn("Couldn't register client %s. aborting connection.", client.Name())
189 client.MergeState(reg)
192 func handleReadyForTask(client *ClientInfo, message interface{}) {
193 o.Debug("Client %s: Asked for Job", client.Name())
194 PlayerWaitingForJob(client)
197 func handleIllegal(client *ClientInfo, message interface{}) {
198 o.Warn("Client %s: Sent Illegal Message")
202 func handleResult(client *ClientInfo, message interface{}){
203 jr, _ := message.(*o.ProtoTaskResponse)
204 r := ResponseFromProto(jr)
205 // at this point in time, we only care about terminal
206 // condition codes. a Job that isn't finished is just
207 // prodding us back to let us know it lives.
211 o.Warn("Client %s: NAcking for Job %d - couldn't find job data.", client.Name(), r.id)
212 nack := o.MakeNack(r.id)
217 o.Debug("Got Response. Acking.")
218 /* if the job exists, Ack it. */
219 ack := o.MakeAck(r.id)
222 // now, we only accept the results if we were
223 // expecting the results (ie: it was pending)
224 // and expunge the task information from the
225 // pending list so we stop bugging the client for it.
226 task, exists := client.pendingTasks[r.id]
228 o.Debug("Storing results for Job %d", r.id)
230 if !JobAddResult(client.Player, r) {
231 o.Assert("Couldn't add result for pending task")
234 // next, work out if the job is a retryable failure or not
235 var didretry bool = false
238 o.Info("Client %s reports failure for Job %d", client.Name(), r.id)
241 if job.Scope == SCOPE_ONEOF {
242 // right, we're finally deep enough to work out what's going on!
243 JobDisqualifyPlayer(r.id, client.Player)
244 if len(job.Players) >= 1 {
245 // still players left we can try? then go for it!
254 // if we didn't retry, the task needs to be marked as finished.
255 task.State = TASK_FINISHED
257 // update the job state.
260 client.pendingTasks[r.id] = nil, false
267 var dispatcher = map[uint8] func(*ClientInfo,interface{}) {
268 o.TypeNop: handleNop,
269 o.TypeIdentifyClient: handleIdentify,
270 o.TypeReadyForTask: handleReadyForTask,
271 o.TypeTaskResponse: handleResult,
272 /* C->P only messages, should never appear on the wire. */
273 o.TypeTaskRequest: handleIllegal,
277 var loopFudge int64 = 10e6; /* 10 ms should be enough fudgefactor */
278 func clientLogic(client *ClientInfo) {
281 var retryWait <-chan int64 = nil
282 var retryTask *TaskRequest = nil
283 if (client.Player != "") {
284 var waitTime int64 = 0
288 for !cleanPass && attempts < 10 {
289 /* reset our state for the pass */
294 now = time.Nanoseconds() + loopFudge
295 // if the client is correctly associated,
296 // evaluate all jobs for outstanding retries,
297 // and work out when our next retry is due.
298 for _,v := range client.pendingTasks {
299 if v.RetryTime < now {
303 if waitTime == 0 || v.RetryTime < waitTime {
305 waitTime = v.RetryTime
311 o.Fail("Couldn't find next timeout without restarting excessively.")
313 if (retryTask != nil) {
314 retryWait = time.After(waitTime-time.Nanoseconds())
319 client.SendTask(retryTask)
320 case p := <-client.PktInQ:
321 /* we've received a packet. do something with it. */
322 if client.Player == "" && p.Type != o.TypeIdentifyClient {
323 o.Warn("Client %s didn't Identify self - got type %d instead! Terminating Connection.", client.Name(), p.Type)
327 var upkt interface {} = nil
331 upkt, err = p.Decode()
333 o.Warn("Error unmarshalling message from Client %s: %s. Terminating Connection.", client.Name(), err)
338 handler, exists := dispatcher[p.Type]
340 handler(client, upkt)
342 o.Warn("Unhandled Pkt Type %d", p.Type)
344 case p := <-client.PktOutQ:
348 case t := <-client.TaskQ:
350 case <-client.abortQ:
351 o.Debug("Client %s connection has been told to abort!", client.Name())
353 case <-time.After(KeepaliveDelay):
355 o.Debug("Sending Keepalive to %s", client.Name())
356 _, err := p.Send(client.connection)
358 o.Warn("Error sending pkt to %s: %s. Terminating Connection.", client.Name(), err)
363 client.connection.Close()
366 func clientReceiver(client *ClientInfo) {
367 conn := client.connection
372 pkt, err := o.Receive(conn)
374 o.Warn("Error receiving pkt from %s: %s", conn.RemoteAddr().String(), err)
376 client.connection.Close()
382 o.Debug("Client %s connection reader has exited it's loop!", conn.RemoteAddr().String())
385 /* The Main Server loop calls this method to hand off connections to us */
386 func HandleConnection(conn net.Conn) {
387 /* this is a temporary client info, we substitute it for the real
388 * one once we ID the connection correctly */