+// registry.go
+//
+// The Registry provides a 'threadsafe' interface to various global information stores.
+//
+// The registry dispatch thread is forbidden from performing any work that is likely to block.
+// Result channels must be buffered with enough space for the full set of results.
+
+package main
+
+import (
+ o "orchestra"
+ "sort"
+ "time"
+ "container/list"
+)
+
+// Request Types
+const (
+ requestAddClient = iota
+ requestGetClient
+ requestDeleteClient
+ requestSyncClients
+
+ requestAddJob
+ requestGetJob
+ requestAddJobResult
+ requestGetJobResult
+ requestGetJobResultNames
+ requestDisqualifyPlayer
+ requestReviewJobStatus
+
+ requestWriteJobUpdate
+ requestWriteJobAll
+
+ requestQueueSize = 10
+
+ jobLingerTime = int64(30e9)
+)
+
+type registryRequest struct {
+ operation int
+ id uint64
+ hostname string
+ hostlist []string
+ job *JobRequest
+ tresp *TaskResponse
+ responseChannel chan *registryResponse
+}
+
+type registryResponse struct {
+ success bool
+ info *ClientInfo
+ tresp *TaskResponse
+ names []string
+ jobs []*JobRequest
+}
+
+var (
+ chanRegistryRequest = make(chan *registryRequest, requestQueueSize)
+ clientList = make(map[string]*ClientInfo)
+ jobRegister = make(map[uint64]*JobRequest)
+ expiryChan <-chan int64
+ expiryJobid uint64
+ expiryList *list.List
+
+ expiryLoopFudge int64 = 10e6; /* 10 ms should be enough fudgefactor */
+)
+
+func init() {
+ expiryList = list.New()
+}
+
+func regInternalAdd(hostname string) {
+ o.Warn("Registry: New Host \"%s\"", hostname)
+ clientList[hostname] = NewClientInfo()
+ // do this initialisation here since it'll help unmask sequencing errors
+ clientList[hostname].pendingTasks = make(map[uint64]*TaskRequest)
+ clientList[hostname].Player = hostname
+}
+
+func regInternalDel(hostname string) {
+ o.Warn("Registry: Deleting Host \"%s\"", hostname)
+ /* remove it from the registry */
+ clientList[hostname] = nil, false
+}
+
+func regInternalExpireJob(jobid uint64) {
+ job, exists := jobRegister[jobid]
+ if exists {
+ if job.State.Finished() {
+ jobRegister[jobid] = nil, false
+ } else {
+ o.Assert("Tried to expire incomplete job.")
+ }
+ }
+}
+
+func regInternalFindNextExpiry() {
+ if expiryChan != nil {
+ o.Assert("Attempted to Find Next Expiry avenue with expiry timer active.")
+ }
+ // if there's nothing to expire, do nothing.
+ if expiryList.Len() == 0 {
+ return
+ }
+
+ for expiryChan == nil && expiryList.Len() > 0 {
+ jobif := expiryList.Remove(expiryList.Front())
+ req, ok := jobif.(*JobRequest)
+ if !ok {
+ o.Assert("item in expiryList not a *JobRequest")
+ }
+ if (time.Nanoseconds() + expiryLoopFudge) > req.expirytime {
+ regInternalExpireJob(req.Id)
+ } else {
+ expiryChan = time.After(req.expirytime - time.Nanoseconds())
+ expiryJobid = req.Id
+ }
+ }
+}
+
+func regInternalMarkJobForExpiry(job *JobRequest) {
+ job.expirytime = time.Nanoseconds() + jobLingerTime
+ expiryList.PushBack(job)
+ // if there is no job pending expiry, feed it into the delay loop
+ if expiryChan == nil {
+ regInternalFindNextExpiry()
+ }
+}
+
+var registryHandlers = map[int] func(*registryRequest, *registryResponse) {
+requestAddClient: regintAddClient,
+requestGetClient: regintGetClient,
+requestDeleteClient: regintDeleteClient,
+requestSyncClients: regintSyncClients,
+requestAddJob: regintAddJob,
+requestGetJob: regintGetJob,
+requestAddJobResult: regintAddJobResult,
+requestGetJobResult: regintGetJobResult,
+requestGetJobResultNames: regintGetJobResultNames,
+requestDisqualifyPlayer: regintDisqualifyPlayer,
+requestReviewJobStatus: regintReviewJobStatus,
+requestWriteJobUpdate: regintWriteJobUpdate,
+requestWriteJobAll: regintWriteJobAll,
+}
+
+func manageRegistry() {
+ for {
+ select {
+ case req := <-chanRegistryRequest:
+ resp := new(registryResponse)
+ // by default, we failed.
+ resp.success = false
+ // find the operation
+ handler, exists := registryHandlers[req.operation]
+ if exists {
+ handler(req, resp)
+ }
+ if req.responseChannel != nil {
+ req.responseChannel <- resp
+ }
+ case <-expiryChan:
+ o.Debug("job%d: Expiring Job Record", expiryJobid)
+ regInternalExpireJob(expiryJobid)
+ expiryChan = nil
+ regInternalFindNextExpiry()
+ }
+ }
+}
+
+func StartRegistry() {
+ go manageRegistry()
+}
+
+func newRequest(wants_response bool) (req *registryRequest) {
+ req = new(registryRequest)
+ if wants_response {
+ req.responseChannel = make(chan *registryResponse, 1)
+ }
+
+ return req
+}
+
+func ClientAdd(hostname string) (success bool) {
+ r := newRequest(true)
+ r.operation = requestAddClient
+ r.hostname = hostname
+ chanRegistryRequest <- r
+ resp := <- r.responseChannel
+
+ return resp.success
+}
+
+func regintAddClient(req *registryRequest, resp *registryResponse) {
+ _, exists := clientList[req.hostname]
+ if exists {
+ resp.success = false
+ } else {
+ regInternalAdd(req.hostname)
+ resp.success = true
+ }
+}
+
+func ClientDelete(hostname string) (success bool) {
+ r := newRequest(true)
+ r.operation = requestDeleteClient
+ r.hostname = hostname
+ chanRegistryRequest <- r
+ resp := <- r.responseChannel
+
+ return resp.success
+}
+
+func regintDeleteClient(req *registryRequest, resp *registryResponse) {
+ _, exists := clientList[req.hostname]
+ if exists {
+ resp.success = true
+ regInternalDel(req.hostname)
+ } else {
+ resp.success = false
+ }
+}
+
+func ClientGet(hostname string) (info *ClientInfo) {
+ r := newRequest(true)
+ r.operation = requestGetClient
+ r.hostname = hostname
+ chanRegistryRequest <- r
+ resp := <- r.responseChannel
+ if resp.success {
+ return resp.info
+ }
+ return nil
+}
+
+func regintGetClient(req *registryRequest, resp *registryResponse) {
+ clinfo, exists := clientList[req.hostname]
+ if exists {
+ resp.success = true
+ resp.info = clinfo
+ } else {
+ resp.success = false
+ }
+}
+
+func ClientUpdateKnown(hostnames []string) {
+ /* this is an asynchronous, we feed it into the registry
+ * and it'll look after itself.
+ */
+ r := newRequest(false)
+ r.operation = requestSyncClients
+ r.hostlist = hostnames
+ chanRegistryRequest <- r
+}
+
+func regintSyncClients(req *registryRequest, resp *registryResponse) {
+ // we need to make sure the registered clients matches the
+ // hostlist we're given.
+ //
+ // First, we transform the array into a map
+ newhosts := make(map[string]bool)
+ for k,_ := range req.hostlist {
+ newhosts[req.hostlist[k]] = true
+ }
+ // now, scan the current list, checking to see if they exist.
+ // Remove them from the newhosts map if they do exist.
+ for k,_ := range clientList {
+ _, exists := newhosts[k]
+ if exists {
+ // remove it from the newhosts map
+ newhosts[k] = false, false
+ } else {
+ regInternalDel(k)
+ }
+ }
+ // now that we're finished, we should only have new clients in
+ // the newhosts list left.
+ for k,_ := range newhosts {
+ regInternalAdd(k)
+ }
+ // and we're done.
+}
+
+// Add a Job to the registry. Return true if successful, returns
+// false if the job is lacking critical information (such as a JobId)
+// and can't be registered.
+func JobAdd(job *JobRequest) bool {
+ rr := newRequest(true)
+ rr.operation = requestAddJob
+ rr.job = job
+
+ chanRegistryRequest <- rr
+ resp := <- rr.responseChannel
+ return resp.success
+}
+
+func regintAddJob(req *registryRequest, resp *registryResponse) {
+ if nil == req.job {
+ return
+ }
+ // ensure that the players are sorted!
+ sort.Strings(req.job.Players)
+ // update the state
+ req.job.updateState()
+ // and register the job
+ _, overwrite := jobRegister[req.job.Id]
+ if !overwrite {
+ jobRegister[req.job.Id] = req.job
+ // force a queue update.
+ req.job.UpdateInSpool()
+ if req.job.State.Finished() {
+ regInternalMarkJobForExpiry(req.job)
+ }
+ resp.success = true
+ }
+}
+
+// Get a Job from the registry. Returns the job if successful,
+// returns nil if the job couldn't be found.
+func JobGet(id uint64) *JobRequest {
+ rr := newRequest(true)
+ rr.operation = requestGetJob
+ rr.id = id
+
+ chanRegistryRequest <- rr
+ resp := <- rr.responseChannel
+ if resp.jobs == nil {
+ return nil
+ }
+ return resp.jobs[0]
+}
+
+func regintGetJob(req *registryRequest, resp *registryResponse) {
+ job, exists := jobRegister[req.id]
+ resp.success = exists
+ if exists {
+ resp.jobs = make([]*JobRequest, 1)
+ resp.jobs[0] = job
+ } else {
+ o.Warn("Received Request for job%d which is not in memory", req.id)
+ go regintGetJobDeferred(req.id, req.responseChannel)
+ // mask out the responseChannel so the deferred handler can use it.
+ req.responseChannel = nil
+ }
+}
+
+func regintGetJobDeferred(jobid uint64, responseChannel chan<- *registryResponse) {
+ resp := new(registryResponse)
+ resp.success = false
+ defer func (resp *registryResponse, rChan chan<- *registryResponse) {
+ rChan <- resp;
+ }(resp, responseChannel)
+
+ req, err := LoadFromFinished(jobid)
+ if err != nil {
+ o.Warn("Couldn't load job%d from disk. Doesn't exist?", jobid)
+ return
+ }
+ // fix up the state, and stuff it back into the system
+ RestoreJobState(req)
+ resp.jobs = make([]*JobRequest, 1)
+ resp.jobs[0] = req
+ resp.success = true
+}
+
+// Attach a result to a Job in the Registry
+//
+// This exists in order to prevent nasty concurrency problems
+// when trying to put results back onto the job. Reading a job is far
+// less of a problem than writing to it.
+func JobAddResult(playername string, task *TaskResponse) bool {
+ rr := newRequest(true)
+ rr.operation = requestAddJobResult
+ rr.tresp = task
+ rr.hostname = playername
+ chanRegistryRequest <- rr
+ resp := <- rr.responseChannel
+ return resp.success
+}
+
+func regintAddJobResult(req *registryRequest, resp *registryResponse) {
+ job, exists := jobRegister[req.tresp.id]
+ resp.success = exists
+ if exists {
+ job.Results[req.hostname] = req.tresp
+ // force a queue update.
+ job.UpdateInSpool()
+ }
+}
+
+// Get a result from the registry
+func JobGetResult(id uint64, playername string) (tresp *TaskResponse) {
+ rr := newRequest(true)
+ rr.operation = requestGetJobResult
+ rr.id = id
+ rr.hostname = playername
+ chanRegistryRequest <- rr
+ resp := <- rr.responseChannel
+ return resp.tresp
+}
+
+func regintGetJobResult(req *registryRequest, resp *registryResponse) {
+ job, exists := jobRegister[req.id]
+ if exists {
+ result, exists := job.Results[req.hostname]
+ resp.success = exists
+ if exists {
+ resp.tresp = result
+ }
+ } else {
+ resp.success = false
+ }
+}
+
+// Get a list of names we have results for against a given job.
+func JobGetResultNames(id uint64) (names []string) {
+ rr := newRequest(true)
+ rr.operation = requestGetJobResultNames
+ rr.id = id
+
+ chanRegistryRequest <- rr
+ resp := <- rr.responseChannel
+ return resp.names
+}
+
+func regintGetJobResultNames(req *registryRequest, resp *registryResponse) {
+ job, exists := jobRegister[req.id]
+ resp.success = exists
+ if exists {
+ resp.names = make([]string, len(job.Results))
+ idx := 0
+ for k, _ := range job.Results {
+ resp.names[idx] = k
+ idx++
+ }
+ }
+}
+
+// Disqualify a player from servicing a job
+func JobDisqualifyPlayer(id uint64, playername string) bool {
+ rr := newRequest(true)
+ rr.operation = requestDisqualifyPlayer
+ rr.id = id
+ rr.hostname = playername
+
+ chanRegistryRequest <- rr
+ resp := <- rr.responseChannel
+
+ return resp.success
+}
+
+func regintDisqualifyPlayer(req *registryRequest, resp *registryResponse) {
+ job, exists := jobRegister[req.id]
+ if exists {
+ idx := sort.Search(len(job.Players), func(idx int) bool { return job.Players[idx] >= req.hostname })
+ if (job.Players[idx] == req.hostname) {
+ resp.success = true
+ newplayers := make([]string, len(job.Players)-1)
+ copy(newplayers[0:idx], job.Players[0:idx])
+ copy(newplayers[idx:len(job.Players)-1], job.Players[idx+1:len(job.Players)])
+ job.Players = newplayers
+ job.updateState()
+ // force a queue update.
+ job.UpdateInSpool()
+ } else {
+ resp.success = false
+ }
+ } else {
+ resp.success = false
+ }
+}
+
+func JobReviewState(id uint64) bool {
+ rr := newRequest(true)
+ rr.operation = requestReviewJobStatus
+ rr.id = id
+
+ chanRegistryRequest <- rr
+ resp := <- rr.responseChannel
+
+ return resp.success
+}
+
+func regintReviewJobStatus(req *registryRequest, resp *registryResponse) {
+ job, exists := jobRegister[req.id]
+ resp.success = exists
+ if exists {
+ job.updateState()
+ // force a queue update.
+ job.UpdateInSpool()
+ }
+}
+
+func JobWriteUpdate(id uint64) {
+ rr := newRequest(false)
+ rr.operation = requestWriteJobUpdate
+ rr.id = id
+ chanRegistryRequest <- rr
+}
+
+func regintWriteJobUpdate(req *registryRequest, resp *registryResponse) {
+ job, exists := jobRegister[req.id]
+ resp.success = exists
+ if exists {
+ job.UpdateInSpool()
+ }
+}
+
+func JobWriteAll() bool {
+ rr := newRequest(true)
+ rr.operation = requestWriteJobAll
+
+ chanRegistryRequest <- rr
+ resp := <-rr.responseChannel
+
+ return resp.success
+}
+
+func regintWriteJobAll(req *registryRequest, resp *registryResponse) {
+ for _, job := range jobRegister {
+ job.UpdateInSpool()
+ }
+ resp.success = true
+}
+
+// Ugh.
+func (job *JobRequest) updateState() {
+ if job.Results == nil {
+ o.Assert("job.Results nil for jobid %d", job.Id)
+ return
+ }
+ was_finished := job.State.Finished()
+ switch job.Scope {
+ case SCOPE_ONEOF:
+ // look for a success (any success) in the responses
+ var success bool = false
+ for host, res := range job.Results {
+ if res == nil {
+ o.Debug("nil result for %s?", host)
+ continue
+ }
+ if res.State == RESP_FINISHED {
+ success = true
+ break
+ }
+ }
+ // update the job state based upon these findings
+ if success {
+ job.State = JOB_SUCCESSFUL
+ } else {
+ if len(job.Players) < 1 {
+ job.State = JOB_FAILED
+ } else {
+ job.State = JOB_PENDING
+ }
+ }
+ case SCOPE_ALLOF:
+ var success int = 0
+ var failed int = 0
+
+ for pidx := range job.Players {
+ p := job.Players[pidx]
+ resp, exists := job.Results[p]
+ if exists {
+ if resp.DidFail() {
+ failed++
+ } else if resp.State == RESP_FINISHED {
+ success++
+ }
+ }
+ }
+ if (success + failed) < len(job.Players) {
+ job.State = JOB_PENDING
+ } else if success == len(job.Players) {
+ job.State = JOB_SUCCESSFUL
+ } else if failed == len(job.Players) {
+ job.State = JOB_FAILED
+ } else {
+ job.State = JOB_FAILED_PARTIAL
+ }
+ }
+ if !was_finished && job.State.Finished() {
+ o.Debug("job%d: Finished - Setting Expiry Time", job.Id)
+ regInternalMarkJobForExpiry(job)
+ }
+}