reset changelog for debian package
[debian/orchestra.git] / src / conductor / audience.go
1 /* audience.go
2 */
3
4 package main
5
6 import (
7         "io"
8         "json"
9         "net"
10         "os"
11         o "orchestra"
12         "strings"
13 )
14
15 type GenericJsonRequest struct {
16         Op              *string         `json:"op"`
17         Score           *string         `json:"score"`
18         Players         []string        `json:"players"`
19         Scope           *JobScope       `json:"scope"`
20         Params          map[string]string       `json:"params"`
21         Id              *uint64         `json:"id"`
22 }
23
24 type JsonPlayerStatus struct {
25         Status          ResponseState           `json:"status"`
26         Response        map[string]string       `json:"response"`
27 }
28
29 type JsonStatusResponse struct {
30         Status          JobState                        `json:"status"`
31         Players         map[string]*JsonPlayerStatus    `json:"players"`
32 }
33
34 func NewJsonStatusResponse() (jsr *JsonStatusResponse) {
35         jsr = new(JsonStatusResponse)
36         jsr.Players = make(map[string]*JsonPlayerStatus)
37         
38         return jsr
39 }
40
41 func NewJsonPlayerStatus() (jps *JsonPlayerStatus) {
42         jps = new(JsonPlayerStatus)
43         jps.Response = make(map[string]string)
44
45         return jps      
46 }
47
48 func handleAudienceRequest(c net.Conn) {
49         defer c.Close()
50
51         c.SetTimeout(0)
52         r, _ := c.(io.Reader)
53         w, _ := c.(io.Writer)
54         dec := json.NewDecoder(r)
55         enc := json.NewEncoder(w)
56
57         outobj := new(GenericJsonRequest)
58         err := dec.Decode(outobj)
59         if err != nil {
60                 o.Warn("Error decoding JSON talking to audience: %s", err)
61                 return
62         }
63
64         if nil == outobj.Op {
65                 o.Warn("Malformed JSON message talking to audience.  Missing Op")
66                 return
67         }
68         switch *(outobj.Op) {
69         case "status":
70                 if nil == outobj.Id {
71                         o.Warn("Malformed Status message talking to audience. Missing Job ID")
72                         return
73                 }
74                 job := JobGet(*outobj.Id)
75                 jresp := new([2]interface{})
76                 if nil != job {
77                         jresp[0] = "OK"
78                         iresp := NewJsonStatusResponse()
79                         iresp.Status = job.State
80                         resnames := JobGetResultNames(*outobj.Id)
81                         for i := range resnames {
82                                 tr := JobGetResult(*outobj.Id, resnames[i])
83                                 if nil != tr {
84                                         presp := NewJsonPlayerStatus()
85                                         presp.Status = tr.State
86                                         for k,v:=range(tr.Response) {
87                                                 presp.Response[k] = v
88                                         }
89                                         iresp.Players[resnames[i]] = presp
90                                 }
91                 
92                         }
93                         jresp[1] = iresp
94                 } else {
95                         jresp[0] = "Error"
96                         jresp[1] = nil
97                 }
98                 enc.Encode(jresp)
99                 o.Debug("Status...")
100         case "queue":
101                 if nil == outobj.Score {
102                         o.Warn("Malformed Queue message talking to audience. Missing Score")
103                         sendQueueFailureResponse("Missing Score", enc)
104                         return
105                 }
106                 if nil == outobj.Scope {
107                         o.Warn("Malformed Queue message talking to audience. Missing Scope")
108                         sendQueueFailureResponse("Missing Scope", enc)
109                         return
110                 }
111                 if nil == outobj.Players || len(outobj.Players) < 1 {
112                         o.Warn("Malformed Queue message talking to audience. Missing Players")
113                         sendQueueFailureResponse("Missing Players", enc)
114                         return
115                 }
116                 for _, player := range outobj.Players {
117                         if !HostAuthorised(player) {
118                                 o.Warn("Malformed Queue message - unknown player %s specified.", player)
119                                 sendQueueFailureResponse("Invalid Player", enc)
120                                 return
121                         }
122                 }
123                 job := NewRequest()
124                 job.Score = *outobj.Score
125                 job.Scope = *outobj.Scope
126                 job.Players = outobj.Players
127                 job.Params = outobj.Params
128
129                 QueueJob(job)
130                 sendQueueSuccessResponse(job, enc)
131         default:
132                 o.Warn("Unknown operation talking to audience: \"%s\"", *(outobj.Op))
133                 return
134         }
135
136         _ = enc
137 }
138
139 func sendQueueSuccessResponse(job *JobRequest, enc *json.Encoder) {
140         resp := make([]interface{},2)
141         resperr := new(string)
142         *resperr = "OK"
143         resp[0] = resperr
144
145         // this probably looks odd, but all numbers cross through float64 when being json encoded.  d'oh!
146         jobid := new(uint64)
147         *jobid = uint64(job.Id)
148         resp[1] = jobid
149
150         err := enc.Encode(resp)
151         if nil != err {
152                 o.Warn("Couldn't encode response to audience: %s", err)
153         }
154 }
155
156 func sendQueueFailureResponse(reason string, enc *json.Encoder) {
157         resp := make([]interface{},2)
158         resperr := new(string)
159         *resperr = "Error"
160         resp[0] = resperr
161         if reason != "" {
162                 resp[1] = &reason
163         }
164         err := enc.Encode(resp)
165         if nil != err {
166                 o.Warn("Couldn't encode response to audience: %s", err)
167         }
168 }
169
170 func AudienceListener(l net.Listener) {
171         for {
172                 c, err := l.Accept()
173                 if err != nil {
174                         o.Warn("Accept() failed on Audience Listenter.")
175                         break
176                 }
177                 go handleAudienceRequest(c)
178         }
179 }
180
181 func UnixAudienceListener(sockaddr string) {
182         fi, err := os.Stat(sockaddr)
183         if err == nil {
184                 if fi.IsSocket() {
185                         o.Warn("Removing stale socket at %s", sockaddr)
186                         os.Remove(sockaddr)
187                 } else {
188                         o.Fail("%s exists and is not a socket", sockaddr)
189                 }
190         }
191         laddr, err := net.ResolveUnixAddr("unix", sockaddr)
192         o.MightFail(err, "Couldn't resolve audience socket address")
193         l, err := net.ListenUnix("unix", laddr)
194         o.MightFail(err, "Couldn't start audience unixsock listener")
195         // Fudge the permissions on the unixsock!
196         fi, err = os.Stat(sockaddr)
197         if err == nil {
198                 os.Chmod(sockaddr, fi.Mode | 0777)
199         } else {
200                 o.Warn("Couldn't fudge permission on audience socket: %s", err)
201         }
202         
203         // make sure we clean up the unix socket when we die.
204         defer l.Close()
205         defer os.Remove(sockaddr)
206         AudienceListener(l)     
207 }
208
209 func StartAudienceSock() {
210         audienceSockPath := strings.TrimSpace(GetStringOpt("audience socket path"))
211         go UnixAudienceListener(audienceSockPath)
212 }