initialise repo
[debian/orchestra.git] / src / conductor / job_request.go
1 // job_request.go
2 //
3
4 package main
5
6 import (
7         "sort"
8         "json"
9         "path"
10         "os"
11         "io"
12         o "orchestra"
13 )
14
15 type JobRequest struct {
16         Score           string                          `json:"score"`
17         Scope           JobScope                        `json:"scope"`
18         Players         []string                        `json:"players"`
19         Id              uint64                          `json:"id"`
20         State           JobState                        `json:"state"`          
21         Params          map[string]string               `json:"params"`
22         Tasks           []*TaskRequest                  `json:"tasks"`
23         // you need to use the registry to access these - only public for
24         // marshalling use.
25         Results         map[string]*TaskResponse        `json:"results"`
26         // private:
27
28         // Timeout for autoexpiry.  Only valid if State if
29         // job.State.Finished() is true.
30         expirytime      int64
31 }
32
33 func NewJobRequest() (req *JobRequest) {
34         req = new(JobRequest)
35         req.Results = make(map[string]*TaskResponse)
36         return req
37 }
38
39 func JobRequestFromReader(src io.Reader) (req *JobRequest, err os.Error) {
40         req = NewJobRequest()
41         jdec := json.NewDecoder(src)
42
43         err = jdec.Decode(req)
44         if err == nil {
45                 if req.Results == nil {
46                         req.Results = make(map[string]*TaskResponse)
47                 }
48         }
49
50         return req, err
51 }
52
53 func (req *JobRequest) normalise() {
54         if (len(req.Players) > 1) {
55                 /* sort targets so search works */
56                 sort.Strings(req.Players)
57         } else {
58                 if (req.Scope == SCOPE_ONEOF) {
59                         req.Scope = SCOPE_ALLOF
60                 }
61         }
62 }
63
64 func (req *JobRequest) MakeTasks() (tasks []*TaskRequest) {
65         req.normalise()
66
67         var numtasks int
68         
69         switch (req.Scope) {
70         case SCOPE_ONEOF:
71                 numtasks = 1
72         case SCOPE_ALLOF:
73                 numtasks = len(req.Players)
74         }
75         tasks = make([]*TaskRequest, numtasks)
76         
77         for c := 0; c < numtasks; c++ {
78                 t := NewTaskRequest()
79                 t.job = req
80                 if (req.Scope == SCOPE_ALLOF) {
81                         t.Player = req.Players[c]
82                 }
83                 tasks[c] = t
84         }
85         return tasks
86 }
87
88 func (req *JobRequest) Valid() bool {
89         if (len(req.Players) <= 0) {
90                 return false
91         }
92         return true
93 }
94
95 func (req *JobRequest) FilenameForSpool() string {
96         if (req.State == JOB_PENDING) {
97                 return path.Join(GetSpoolDirectory(), "active", FilenameForJobId(req.Id))
98         }
99         return path.Join(GetSpoolDirectory(), "finished", FilenameForJobId(req.Id))
100 }
101
102 // dump the bytestream in buf into the serialisation file for req.
103 func (req *JobRequest) doSerialisation(buf []byte) {
104         // first up, clean up old state.
105         UnlinkNodesForJobId(req.Id)
106         outpath := req.FilenameForSpool()
107         fh, err := os.OpenFile(outpath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600)
108         if err != nil {
109                 o.Warn("Could not create persistence file %s: %s", outpath, err)
110                 return
111         }
112         defer fh.Close()
113         fh.Write(buf)
114 }
115
116 func (req *JobRequest) UpdateInSpool()  {
117         buf, err := json.MarshalIndent(req, "", "  ")
118         o.MightFail(err, "Failed to marshal job %d", req.Id)
119         //FIXME: should try to do this out of the registry's thread.
120         req.doSerialisation(buf)
121 }
122
123 // deserialise the job record from the finished spool
124 func LoadFromFinished(jobid uint64) (req *JobRequest, err os.Error) {
125         fpath := path.Join(GetSpoolDirectory(), "finished", FilenameForJobId(jobid))
126         fh, err := os.Open(fpath)
127         if err != nil {
128                 return nil, err
129         }
130         defer fh.Close()
131         
132         req, err = JobRequestFromReader(fh)
133         return req, err
134 }