initialise repo
[debian/orchestra.git] / src / conductor / registry.go
1 // registry.go
2 //
3 // The Registry provides a 'threadsafe' interface to various global information stores.
4 //
5 // The registry dispatch thread is forbidden from performing any work that is likely to block.
6 // Result channels must be buffered with enough space for the full set of results.
7
8 package main
9
10 import (
11         o "orchestra"
12         "sort"
13         "time"
14         "container/list"
15 )
16
17 // Request Types
18 const (
19         requestAddClient = iota
20         requestGetClient
21         requestDeleteClient
22         requestSyncClients
23
24         requestAddJob
25         requestGetJob
26         requestAddJobResult
27         requestGetJobResult
28         requestGetJobResultNames
29         requestDisqualifyPlayer
30         requestReviewJobStatus
31
32         requestWriteJobUpdate
33         requestWriteJobAll
34
35         requestQueueSize                = 10
36
37         jobLingerTime                   = int64(30e9)
38 )
39
40 type registryRequest struct {
41         operation               int
42         id                      uint64
43         hostname                string
44         hostlist                []string
45         job                     *JobRequest
46         tresp                   *TaskResponse
47         responseChannel         chan *registryResponse
48 }
49
50 type registryResponse struct {
51         success                 bool
52         info                    *ClientInfo
53         tresp                   *TaskResponse
54         names                   []string
55         jobs                    []*JobRequest
56 }
57
58 var (
59         chanRegistryRequest     = make(chan *registryRequest, requestQueueSize)
60         clientList              = make(map[string]*ClientInfo)
61         jobRegister             = make(map[uint64]*JobRequest)
62         expiryChan              <-chan int64
63         expiryJobid             uint64
64         expiryList              *list.List
65
66         expiryLoopFudge         int64 = 10e6; /* 10 ms should be enough fudgefactor */
67 )
68
69 func init() {
70         expiryList = list.New()
71 }
72
73 func regInternalAdd(hostname string) {
74         o.Warn("Registry: New Host \"%s\"", hostname)
75         clientList[hostname] = NewClientInfo()
76         // do this initialisation here since it'll help unmask sequencing errors
77         clientList[hostname].pendingTasks = make(map[uint64]*TaskRequest)
78         clientList[hostname].Player = hostname
79 }
80
81 func regInternalDel(hostname string) {
82         o.Warn("Registry: Deleting Host \"%s\"", hostname)
83         /* remove it from the registry */
84         clientList[hostname] = nil, false
85 }
86
87 func regInternalExpireJob(jobid uint64) {
88         job, exists := jobRegister[jobid]
89         if exists {
90                 if job.State.Finished() {
91                         jobRegister[jobid] = nil, false
92                 } else {
93                         o.Assert("Tried to expire incomplete job.")
94                 }
95         }
96 }
97
98 func regInternalFindNextExpiry() {
99         if expiryChan != nil {
100                 o.Assert("Attempted to Find Next Expiry avenue with expiry timer active.")
101         }
102         // if there's nothing to expire, do nothing.
103         if expiryList.Len() == 0 {
104                 return
105         }
106
107         for expiryChan == nil && expiryList.Len() > 0 {
108                 jobif := expiryList.Remove(expiryList.Front())
109                 req, ok := jobif.(*JobRequest)
110                 if !ok {
111                         o.Assert("item in expiryList not a *JobRequest")
112                 }
113                 if (time.Nanoseconds() + expiryLoopFudge) > req.expirytime {
114                         regInternalExpireJob(req.Id)
115                 } else {
116                         expiryChan = time.After(req.expirytime - time.Nanoseconds())
117                         expiryJobid = req.Id
118                 }
119         }
120 }
121
122 func regInternalMarkJobForExpiry(job *JobRequest) {
123         job.expirytime = time.Nanoseconds() + jobLingerTime
124         expiryList.PushBack(job)
125         // if there is no job pending expiry, feed it into the delay loop
126         if expiryChan == nil {
127                 regInternalFindNextExpiry()
128         }
129 }
130
131 var registryHandlers = map[int] func(*registryRequest, *registryResponse) {
132 requestAddClient:       regintAddClient,
133 requestGetClient:       regintGetClient,
134 requestDeleteClient:    regintDeleteClient,
135 requestSyncClients:     regintSyncClients,
136 requestAddJob:          regintAddJob,
137 requestGetJob:          regintGetJob,
138 requestAddJobResult:    regintAddJobResult,
139 requestGetJobResult:    regintGetJobResult,
140 requestGetJobResultNames:       regintGetJobResultNames,
141 requestDisqualifyPlayer:        regintDisqualifyPlayer,
142 requestReviewJobStatus: regintReviewJobStatus,
143 requestWriteJobUpdate:  regintWriteJobUpdate,
144 requestWriteJobAll:     regintWriteJobAll,
145 }
146
147 func manageRegistry() {
148         for {
149                 select {
150                 case req := <-chanRegistryRequest:
151                         resp := new(registryResponse)
152                         // by default, we failed.
153                         resp.success = false
154                         // find the operation
155                         handler, exists := registryHandlers[req.operation]
156                         if exists {
157                                 handler(req, resp)
158                         }
159                         if req.responseChannel != nil {
160                                 req.responseChannel <- resp
161                         }
162                 case <-expiryChan:
163                         o.Debug("job%d: Expiring Job Record", expiryJobid)
164                         regInternalExpireJob(expiryJobid)
165                         expiryChan = nil
166                         regInternalFindNextExpiry()
167                 }
168         }
169 }
170
171 func StartRegistry() {
172         go manageRegistry()
173 }
174
175 func newRequest(wants_response bool) (req *registryRequest) {
176         req = new(registryRequest)
177         if wants_response {
178                 req.responseChannel = make(chan *registryResponse, 1)
179         }
180
181         return req
182 }
183         
184 func ClientAdd(hostname string) (success bool) {
185         r := newRequest(true)
186         r.operation = requestAddClient
187         r.hostname = hostname
188         chanRegistryRequest <- r
189         resp := <- r.responseChannel
190         
191         return resp.success
192 }
193
194 func regintAddClient(req *registryRequest, resp *registryResponse) {
195         _, exists := clientList[req.hostname]
196         if exists {
197                 resp.success = false
198         } else {
199                 regInternalAdd(req.hostname)
200                 resp.success = true
201         }
202 }
203
204 func ClientDelete(hostname string) (success bool) {
205         r := newRequest(true)
206         r.operation = requestDeleteClient
207         r.hostname = hostname
208         chanRegistryRequest <- r
209         resp := <- r.responseChannel
210         
211         return resp.success
212 }
213
214 func regintDeleteClient(req *registryRequest, resp *registryResponse) {
215         _, exists := clientList[req.hostname]
216         if exists {
217                 resp.success = true
218                 regInternalDel(req.hostname)
219         } else {
220                 resp.success = false
221         }
222 }
223
224 func ClientGet(hostname string) (info *ClientInfo) {
225         r := newRequest(true)
226         r.operation = requestGetClient
227         r.hostname = hostname
228         chanRegistryRequest <- r
229         resp := <- r.responseChannel
230         if resp.success {
231                 return resp.info
232         }
233         return nil
234 }
235
236 func regintGetClient(req *registryRequest, resp *registryResponse) {
237         clinfo, exists := clientList[req.hostname]
238         if exists {
239                 resp.success = true
240                 resp.info = clinfo
241         } else {
242                 resp.success = false
243         }
244 }
245
246 func ClientUpdateKnown(hostnames []string) {
247         /* this is an asynchronous, we feed it into the registry 
248          * and it'll look after itself.
249         */
250         r := newRequest(false)
251         r.operation = requestSyncClients
252         r.hostlist = hostnames
253         chanRegistryRequest <- r
254 }
255
256 func regintSyncClients(req *registryRequest, resp *registryResponse) {
257         // we need to make sure the registered clients matches the
258         // hostlist we're given.
259         //
260         // First, we transform the array into a map
261         newhosts := make(map[string]bool)
262         for k,_ := range req.hostlist {
263                 newhosts[req.hostlist[k]] = true
264         }
265         // now, scan the current list, checking to see if they exist.
266         // Remove them from the newhosts map if they do exist.
267         for k,_ := range clientList {
268                 _, exists := newhosts[k]
269                 if exists {
270                         // remove it from the newhosts map
271                         newhosts[k] = false, false
272                 } else {
273                         regInternalDel(k)
274                 }
275         }
276         // now that we're finished, we should only have new clients in
277         // the newhosts list left.
278         for k,_ := range newhosts {
279                 regInternalAdd(k)
280         }
281         // and we're done.
282 }
283
284 // Add a Job to the registry.  Return true if successful, returns
285 // false if the job is lacking critical information (such as a JobId)
286 // and can't be registered.
287 func JobAdd(job *JobRequest) bool {
288         rr := newRequest(true)
289         rr.operation = requestAddJob
290         rr.job = job
291
292         chanRegistryRequest <- rr
293         resp := <- rr.responseChannel 
294         return resp.success
295 }
296
297 func regintAddJob(req *registryRequest, resp *registryResponse) {
298         if nil == req.job {
299                 return
300         }
301         // ensure that the players are sorted!
302         sort.Strings(req.job.Players)
303         // update the state
304         req.job.updateState()
305         // and register the job
306         _, overwrite := jobRegister[req.job.Id]
307         if !overwrite {
308                 jobRegister[req.job.Id] = req.job
309                 // force a queue update.
310                 req.job.UpdateInSpool()
311                 if req.job.State.Finished() {
312                         regInternalMarkJobForExpiry(req.job)
313                 }
314                 resp.success = true
315         }
316 }
317
318 // Get a Job from the registry.  Returns the job if successful,
319 // returns nil if the job couldn't be found.
320 func JobGet(id uint64) *JobRequest {
321         rr := newRequest(true)
322         rr.operation = requestGetJob
323         rr.id = id
324
325         chanRegistryRequest <- rr
326         resp := <- rr.responseChannel
327         if resp.jobs == nil {
328                 return nil
329         }
330         return resp.jobs[0]
331 }
332
333 func regintGetJob(req *registryRequest, resp *registryResponse) {
334         job, exists := jobRegister[req.id]
335         resp.success = exists
336         if exists {
337                 resp.jobs = make([]*JobRequest, 1)
338                 resp.jobs[0] = job
339         } else {
340                 o.Warn("Received Request for job%d which is not in memory", req.id)
341                 go regintGetJobDeferred(req.id, req.responseChannel)
342                 // mask out the responseChannel so the deferred handler can use it.
343                 req.responseChannel = nil
344         }
345 }
346
347 func regintGetJobDeferred(jobid uint64, responseChannel chan<- *registryResponse) {
348         resp := new(registryResponse)
349         resp.success = false
350         defer func (resp *registryResponse, rChan chan<- *registryResponse) {
351                 rChan <- resp;
352         }(resp, responseChannel)
353
354         req, err := LoadFromFinished(jobid)
355         if err != nil {
356                 o.Warn("Couldn't load job%d from disk.  Doesn't exist?", jobid)
357                 return
358         }
359         // fix up the state, and stuff it back into the system
360         RestoreJobState(req)
361         resp.jobs = make([]*JobRequest, 1)
362         resp.jobs[0] = req
363         resp.success = true
364 }
365
366 // Attach a result to a Job in the Registry
367 //
368 // This exists in order to prevent nasty concurrency problems
369 // when trying to put results back onto the job.  Reading a job is far
370 // less of a problem than writing to it.
371 func JobAddResult(playername string, task *TaskResponse) bool {
372         rr := newRequest(true)
373         rr.operation = requestAddJobResult
374         rr.tresp = task
375         rr.hostname = playername
376         chanRegistryRequest <- rr
377         resp := <- rr.responseChannel
378         return resp.success
379 }
380
381 func regintAddJobResult(req *registryRequest, resp *registryResponse) {
382         job, exists := jobRegister[req.tresp.id]
383         resp.success = exists
384         if exists {
385                 job.Results[req.hostname] = req.tresp
386                 // force a queue update.
387                 job.UpdateInSpool()
388         }
389 }
390
391 // Get a result from the registry
392 func JobGetResult(id uint64, playername string) (tresp *TaskResponse) {
393         rr := newRequest(true)
394         rr.operation = requestGetJobResult
395         rr.id = id
396         rr.hostname = playername
397         chanRegistryRequest <- rr
398         resp := <- rr.responseChannel
399         return resp.tresp
400 }
401
402 func regintGetJobResult(req *registryRequest, resp *registryResponse) {
403         job, exists := jobRegister[req.id]
404         if exists {
405                 result, exists := job.Results[req.hostname]
406                 resp.success = exists
407                 if exists {
408                         resp.tresp = result
409                 }
410         } else {
411                 resp.success = false
412         }
413 }
414
415 // Get a list of names we have results for against a given job.
416 func JobGetResultNames(id uint64) (names []string) {
417         rr := newRequest(true)
418         rr.operation = requestGetJobResultNames
419         rr.id = id
420
421         chanRegistryRequest <- rr
422         resp := <- rr.responseChannel 
423         return resp.names
424 }
425
426 func regintGetJobResultNames(req *registryRequest, resp *registryResponse) {
427         job, exists := jobRegister[req.id]
428         resp.success = exists
429         if exists {
430                 resp.names = make([]string, len(job.Results))
431                 idx := 0
432                 for k, _ := range job.Results {
433                         resp.names[idx] = k
434                         idx++
435                 }
436         }
437 }
438
439 //  Disqualify a player from servicing a job
440 func JobDisqualifyPlayer(id uint64, playername string) bool {
441         rr := newRequest(true)
442         rr.operation = requestDisqualifyPlayer
443         rr.id = id
444         rr.hostname = playername
445
446         chanRegistryRequest <- rr
447         resp := <- rr.responseChannel
448
449         return resp.success
450 }
451
452 func regintDisqualifyPlayer(req *registryRequest, resp *registryResponse) {
453         job, exists := jobRegister[req.id]
454         if exists {
455                 idx := sort.Search(len(job.Players), func(idx int) bool { return job.Players[idx] >= req.hostname })
456                 if (job.Players[idx] == req.hostname) {
457                         resp.success = true
458                         newplayers := make([]string, len(job.Players)-1)
459                         copy(newplayers[0:idx], job.Players[0:idx])
460                         copy(newplayers[idx:len(job.Players)-1], job.Players[idx+1:len(job.Players)])
461                         job.Players = newplayers
462                         job.updateState()
463                         // force a queue update.
464                         job.UpdateInSpool()
465                 } else {
466                         resp.success = false
467                 }
468         } else {
469                 resp.success = false
470         }
471 }
472
473 func JobReviewState(id uint64) bool {
474         rr := newRequest(true)
475         rr.operation = requestReviewJobStatus
476         rr.id = id
477
478         chanRegistryRequest <- rr
479         resp := <- rr.responseChannel
480
481         return resp.success
482 }
483
484 func regintReviewJobStatus(req *registryRequest, resp *registryResponse) {
485         job, exists := jobRegister[req.id]
486         resp.success = exists
487         if exists {
488                 job.updateState()
489                 // force a queue update.
490                 job.UpdateInSpool()
491         }
492 }
493
494 func JobWriteUpdate(id uint64) {
495         rr := newRequest(false)
496         rr.operation = requestWriteJobUpdate
497         rr.id = id
498         chanRegistryRequest <- rr
499 }
500
501 func regintWriteJobUpdate(req *registryRequest, resp *registryResponse) {
502         job, exists := jobRegister[req.id]
503         resp.success = exists
504         if exists {
505                 job.UpdateInSpool()
506         }
507 }
508
509 func JobWriteAll() bool {
510         rr := newRequest(true)
511         rr.operation = requestWriteJobAll
512
513         chanRegistryRequest <- rr
514         resp := <-rr.responseChannel
515
516         return resp.success
517 }
518
519 func regintWriteJobAll(req *registryRequest, resp *registryResponse) {
520         for _, job := range jobRegister {
521                 job.UpdateInSpool()
522         }
523         resp.success = true
524 }
525
526 // Ugh.
527 func (job *JobRequest) updateState() {
528         if job.Results == nil {
529                 o.Assert("job.Results nil for jobid %d", job.Id)
530                 return
531         }
532         was_finished := job.State.Finished()
533         switch job.Scope {
534         case SCOPE_ONEOF:
535                 // look for a success (any success) in the responses
536                 var success bool = false
537                 for host, res := range job.Results {
538                         if res == nil {
539                                 o.Debug("nil result for %s?", host)
540                                 continue
541                         }
542                         if res.State == RESP_FINISHED {
543                                 success = true
544                                 break
545                         }
546                 }
547                 // update the job state based upon these findings
548                 if success {
549                         job.State = JOB_SUCCESSFUL
550                 } else {
551                         if len(job.Players) < 1 {
552                                 job.State = JOB_FAILED
553                         } else {
554                                 job.State = JOB_PENDING
555                         }
556                 }
557         case SCOPE_ALLOF:
558                 var success int = 0
559                 var failed  int = 0
560                 
561                 for pidx := range job.Players {
562                         p := job.Players[pidx]
563                         resp, exists := job.Results[p]
564                         if exists {
565                                 if resp.DidFail() {
566                                         failed++
567                                 } else if resp.State == RESP_FINISHED {
568                                         success++
569                                 }
570                         }
571                 }
572                 if (success + failed) < len(job.Players) {
573                         job.State = JOB_PENDING
574                 } else if success == len(job.Players) {
575                         job.State = JOB_SUCCESSFUL
576                 } else if failed == len(job.Players) {
577                         job.State = JOB_FAILED
578                 } else {
579                         job.State = JOB_FAILED_PARTIAL
580                 }
581         }
582         if !was_finished && job.State.Finished() {
583                 o.Debug("job%d: Finished - Setting Expiry Time", job.Id)
584                 regInternalMarkJobForExpiry(job)
585         }
586 }