3 // The Registry provides a 'threadsafe' interface to various global information stores.
5 // The registry dispatch thread is forbidden from performing any work that is likely to block.
6 // Result channels must be buffered with enough space for the full set of results.
19 requestAddClient = iota
28 requestGetJobResultNames
29 requestDisqualifyPlayer
30 requestReviewJobStatus
37 jobLingerTime = int64(30e9)
40 type registryRequest struct {
47 responseChannel chan *registryResponse
50 type registryResponse struct {
59 chanRegistryRequest = make(chan *registryRequest, requestQueueSize)
60 clientList = make(map[string]*ClientInfo)
61 jobRegister = make(map[uint64]*JobRequest)
62 expiryChan <-chan int64
66 expiryLoopFudge int64 = 10e6; /* 10 ms should be enough fudgefactor */
70 expiryList = list.New()
73 func regInternalAdd(hostname string) {
74 o.Warn("Registry: New Host \"%s\"", hostname)
75 clientList[hostname] = NewClientInfo()
76 // do this initialisation here since it'll help unmask sequencing errors
77 clientList[hostname].pendingTasks = make(map[uint64]*TaskRequest)
78 clientList[hostname].Player = hostname
81 func regInternalDel(hostname string) {
82 o.Warn("Registry: Deleting Host \"%s\"", hostname)
83 /* remove it from the registry */
84 clientList[hostname] = nil, false
87 func regInternalExpireJob(jobid uint64) {
88 job, exists := jobRegister[jobid]
90 if job.State.Finished() {
91 jobRegister[jobid] = nil, false
93 o.Assert("Tried to expire incomplete job.")
98 func regInternalFindNextExpiry() {
99 if expiryChan != nil {
100 o.Assert("Attempted to Find Next Expiry avenue with expiry timer active.")
102 // if there's nothing to expire, do nothing.
103 if expiryList.Len() == 0 {
107 for expiryChan == nil && expiryList.Len() > 0 {
108 jobif := expiryList.Remove(expiryList.Front())
109 req, ok := jobif.(*JobRequest)
111 o.Assert("item in expiryList not a *JobRequest")
113 if (time.Nanoseconds() + expiryLoopFudge) > req.expirytime {
114 regInternalExpireJob(req.Id)
116 expiryChan = time.After(req.expirytime - time.Nanoseconds())
122 func regInternalMarkJobForExpiry(job *JobRequest) {
123 job.expirytime = time.Nanoseconds() + jobLingerTime
124 expiryList.PushBack(job)
125 // if there is no job pending expiry, feed it into the delay loop
126 if expiryChan == nil {
127 regInternalFindNextExpiry()
131 var registryHandlers = map[int] func(*registryRequest, *registryResponse) {
132 requestAddClient: regintAddClient,
133 requestGetClient: regintGetClient,
134 requestDeleteClient: regintDeleteClient,
135 requestSyncClients: regintSyncClients,
136 requestAddJob: regintAddJob,
137 requestGetJob: regintGetJob,
138 requestAddJobResult: regintAddJobResult,
139 requestGetJobResult: regintGetJobResult,
140 requestGetJobResultNames: regintGetJobResultNames,
141 requestDisqualifyPlayer: regintDisqualifyPlayer,
142 requestReviewJobStatus: regintReviewJobStatus,
143 requestWriteJobUpdate: regintWriteJobUpdate,
144 requestWriteJobAll: regintWriteJobAll,
147 func manageRegistry() {
150 case req := <-chanRegistryRequest:
151 resp := new(registryResponse)
152 // by default, we failed.
154 // find the operation
155 handler, exists := registryHandlers[req.operation]
159 if req.responseChannel != nil {
160 req.responseChannel <- resp
163 o.Debug("job%d: Expiring Job Record", expiryJobid)
164 regInternalExpireJob(expiryJobid)
166 regInternalFindNextExpiry()
171 func StartRegistry() {
175 func newRequest(wants_response bool) (req *registryRequest) {
176 req = new(registryRequest)
178 req.responseChannel = make(chan *registryResponse, 1)
184 func ClientAdd(hostname string) (success bool) {
185 r := newRequest(true)
186 r.operation = requestAddClient
187 r.hostname = hostname
188 chanRegistryRequest <- r
189 resp := <- r.responseChannel
194 func regintAddClient(req *registryRequest, resp *registryResponse) {
195 _, exists := clientList[req.hostname]
199 regInternalAdd(req.hostname)
204 func ClientDelete(hostname string) (success bool) {
205 r := newRequest(true)
206 r.operation = requestDeleteClient
207 r.hostname = hostname
208 chanRegistryRequest <- r
209 resp := <- r.responseChannel
214 func regintDeleteClient(req *registryRequest, resp *registryResponse) {
215 _, exists := clientList[req.hostname]
218 regInternalDel(req.hostname)
224 func ClientGet(hostname string) (info *ClientInfo) {
225 r := newRequest(true)
226 r.operation = requestGetClient
227 r.hostname = hostname
228 chanRegistryRequest <- r
229 resp := <- r.responseChannel
236 func regintGetClient(req *registryRequest, resp *registryResponse) {
237 clinfo, exists := clientList[req.hostname]
246 func ClientUpdateKnown(hostnames []string) {
247 /* this is an asynchronous, we feed it into the registry
248 * and it'll look after itself.
250 r := newRequest(false)
251 r.operation = requestSyncClients
252 r.hostlist = hostnames
253 chanRegistryRequest <- r
256 func regintSyncClients(req *registryRequest, resp *registryResponse) {
257 // we need to make sure the registered clients matches the
258 // hostlist we're given.
260 // First, we transform the array into a map
261 newhosts := make(map[string]bool)
262 for k,_ := range req.hostlist {
263 newhosts[req.hostlist[k]] = true
265 // now, scan the current list, checking to see if they exist.
266 // Remove them from the newhosts map if they do exist.
267 for k,_ := range clientList {
268 _, exists := newhosts[k]
270 // remove it from the newhosts map
271 newhosts[k] = false, false
276 // now that we're finished, we should only have new clients in
277 // the newhosts list left.
278 for k,_ := range newhosts {
284 // Add a Job to the registry. Return true if successful, returns
285 // false if the job is lacking critical information (such as a JobId)
286 // and can't be registered.
287 func JobAdd(job *JobRequest) bool {
288 rr := newRequest(true)
289 rr.operation = requestAddJob
292 chanRegistryRequest <- rr
293 resp := <- rr.responseChannel
297 func regintAddJob(req *registryRequest, resp *registryResponse) {
301 // ensure that the players are sorted!
302 sort.Strings(req.job.Players)
304 req.job.updateState()
305 // and register the job
306 _, overwrite := jobRegister[req.job.Id]
308 jobRegister[req.job.Id] = req.job
309 // force a queue update.
310 req.job.UpdateInSpool()
311 if req.job.State.Finished() {
312 regInternalMarkJobForExpiry(req.job)
318 // Get a Job from the registry. Returns the job if successful,
319 // returns nil if the job couldn't be found.
320 func JobGet(id uint64) *JobRequest {
321 rr := newRequest(true)
322 rr.operation = requestGetJob
325 chanRegistryRequest <- rr
326 resp := <- rr.responseChannel
327 if resp.jobs == nil {
333 func regintGetJob(req *registryRequest, resp *registryResponse) {
334 job, exists := jobRegister[req.id]
335 resp.success = exists
337 resp.jobs = make([]*JobRequest, 1)
340 o.Warn("Received Request for job%d which is not in memory", req.id)
341 go regintGetJobDeferred(req.id, req.responseChannel)
342 // mask out the responseChannel so the deferred handler can use it.
343 req.responseChannel = nil
347 func regintGetJobDeferred(jobid uint64, responseChannel chan<- *registryResponse) {
348 resp := new(registryResponse)
350 defer func (resp *registryResponse, rChan chan<- *registryResponse) {
352 }(resp, responseChannel)
354 req, err := LoadFromFinished(jobid)
356 o.Warn("Couldn't load job%d from disk. Doesn't exist?", jobid)
359 // fix up the state, and stuff it back into the system
361 resp.jobs = make([]*JobRequest, 1)
366 // Attach a result to a Job in the Registry
368 // This exists in order to prevent nasty concurrency problems
369 // when trying to put results back onto the job. Reading a job is far
370 // less of a problem than writing to it.
371 func JobAddResult(playername string, task *TaskResponse) bool {
372 rr := newRequest(true)
373 rr.operation = requestAddJobResult
375 rr.hostname = playername
376 chanRegistryRequest <- rr
377 resp := <- rr.responseChannel
381 func regintAddJobResult(req *registryRequest, resp *registryResponse) {
382 job, exists := jobRegister[req.tresp.id]
383 resp.success = exists
385 job.Results[req.hostname] = req.tresp
386 // force a queue update.
391 // Get a result from the registry
392 func JobGetResult(id uint64, playername string) (tresp *TaskResponse) {
393 rr := newRequest(true)
394 rr.operation = requestGetJobResult
396 rr.hostname = playername
397 chanRegistryRequest <- rr
398 resp := <- rr.responseChannel
402 func regintGetJobResult(req *registryRequest, resp *registryResponse) {
403 job, exists := jobRegister[req.id]
405 result, exists := job.Results[req.hostname]
406 resp.success = exists
415 // Get a list of names we have results for against a given job.
416 func JobGetResultNames(id uint64) (names []string) {
417 rr := newRequest(true)
418 rr.operation = requestGetJobResultNames
421 chanRegistryRequest <- rr
422 resp := <- rr.responseChannel
426 func regintGetJobResultNames(req *registryRequest, resp *registryResponse) {
427 job, exists := jobRegister[req.id]
428 resp.success = exists
430 resp.names = make([]string, len(job.Results))
432 for k, _ := range job.Results {
439 // Disqualify a player from servicing a job
440 func JobDisqualifyPlayer(id uint64, playername string) bool {
441 rr := newRequest(true)
442 rr.operation = requestDisqualifyPlayer
444 rr.hostname = playername
446 chanRegistryRequest <- rr
447 resp := <- rr.responseChannel
452 func regintDisqualifyPlayer(req *registryRequest, resp *registryResponse) {
453 job, exists := jobRegister[req.id]
455 idx := sort.Search(len(job.Players), func(idx int) bool { return job.Players[idx] >= req.hostname })
456 if (job.Players[idx] == req.hostname) {
458 newplayers := make([]string, len(job.Players)-1)
459 copy(newplayers[0:idx], job.Players[0:idx])
460 copy(newplayers[idx:len(job.Players)-1], job.Players[idx+1:len(job.Players)])
461 job.Players = newplayers
463 // force a queue update.
473 func JobReviewState(id uint64) bool {
474 rr := newRequest(true)
475 rr.operation = requestReviewJobStatus
478 chanRegistryRequest <- rr
479 resp := <- rr.responseChannel
484 func regintReviewJobStatus(req *registryRequest, resp *registryResponse) {
485 job, exists := jobRegister[req.id]
486 resp.success = exists
489 // force a queue update.
494 func JobWriteUpdate(id uint64) {
495 rr := newRequest(false)
496 rr.operation = requestWriteJobUpdate
498 chanRegistryRequest <- rr
501 func regintWriteJobUpdate(req *registryRequest, resp *registryResponse) {
502 job, exists := jobRegister[req.id]
503 resp.success = exists
509 func JobWriteAll() bool {
510 rr := newRequest(true)
511 rr.operation = requestWriteJobAll
513 chanRegistryRequest <- rr
514 resp := <-rr.responseChannel
519 func regintWriteJobAll(req *registryRequest, resp *registryResponse) {
520 for _, job := range jobRegister {
527 func (job *JobRequest) updateState() {
528 if job.Results == nil {
529 o.Assert("job.Results nil for jobid %d", job.Id)
532 was_finished := job.State.Finished()
535 // look for a success (any success) in the responses
536 var success bool = false
537 for host, res := range job.Results {
539 o.Debug("nil result for %s?", host)
542 if res.State == RESP_FINISHED {
547 // update the job state based upon these findings
549 job.State = JOB_SUCCESSFUL
551 if len(job.Players) < 1 {
552 job.State = JOB_FAILED
554 job.State = JOB_PENDING
561 for pidx := range job.Players {
562 p := job.Players[pidx]
563 resp, exists := job.Results[p]
567 } else if resp.State == RESP_FINISHED {
572 if (success + failed) < len(job.Players) {
573 job.State = JOB_PENDING
574 } else if success == len(job.Players) {
575 job.State = JOB_SUCCESSFUL
576 } else if failed == len(job.Players) {
577 job.State = JOB_FAILED
579 job.State = JOB_FAILED_PARTIAL
582 if !was_finished && job.State.Finished() {
583 o.Debug("job%d: Finished - Setting Expiry Time", job.Id)
584 regInternalMarkJobForExpiry(job)