initialise repo
[debian/orchestra.git] / src / conductor / persistence.go
1 // persistence.go
2 //
3 package main
4
5 import (
6         "fmt"
7         "path"
8         "os"
9         "unicode"
10         "strconv"
11         "sync/atomic"
12         "bufio"
13         "strings"
14         o "orchestra"
15 )
16
17 // changing this will result in fire.  you have been warned.
18 const bucketDepth = 2
19
20 var spoolDirectory = ""
21
22 func SetSpoolDirectory(spooldir string) {
23         if spoolDirectory == "" {
24                 spoolDirectory = spooldir
25         } else {
26                 if spooldir != spoolDirectory {
27                         o.Warn("Spool Directory Not Changed.")
28                 }
29         }
30 }
31
32 func GetSpoolDirectory() string {
33         if spoolDirectory == "" {
34                 o.Assert("GetSpoolDirectory() called before set")
35         }
36         return spoolDirectory
37 }
38
39
40 const (
41         IdCheckpointSafetySkip = 10e4  // Skip 10e4 entries if orchestra didn't shutdown cleanly for safety.
42 )
43
44 var lastId uint64 = 0
45
46 func checkpointPath() string {
47         return path.Join(spoolDirectory, "last_id.checkpoint")
48 }
49
50 func savePath() string {
51         return path.Join(spoolDirectory, "last_id")
52 }
53
54 func loadLastId() {
55         fh, err := os.Open(checkpointPath())
56         if err == nil {
57                 defer fh.Close()
58
59                 // we have a checkpoint file.  blah.
60                 cbio := bufio.NewReader(fh)
61                 l, err := cbio.ReadString('\n')
62                 lastId, err = strconv.Atoui64(strings.TrimSpace(l))
63                 if err != nil {
64                         o.Fail("Couldn't read Last ID from checkpoint file.  Aborting for safety.")
65                 }
66                 lastId += IdCheckpointSafetySkip
67         } else {
68                 pe, ok := err.(*os.PathError)
69                 if !ok || pe.Error != os.ENOENT {
70                         o.Fail("Found checkpoint file, but couldn't open it: %s", err)
71                 }
72                 fh,err := os.Open(savePath())
73                 if err != nil {
74                         pe, ok = err.(*os.PathError)
75                         if !ok || pe.Error == os.ENOENT {
76                                 lastId = 0;
77                                 return;
78                         }
79                         o.MightFail(err, "Couldn't open last_id file")
80                 }
81                 defer fh.Close()
82                 cbio := bufio.NewReader(fh)
83                 l, err := cbio.ReadString('\n')
84                 lastId, err = strconv.Atoui64(strings.TrimSpace(l))
85                 if err != nil {
86                         o.Fail("Couldn't read Last ID from last_id.  Aborting for safety.")
87                 }
88         }
89         writeIdCheckpoint()
90 }
91
92 func writeIdCheckpoint() {
93         fh, err := os.OpenFile(checkpointPath(), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
94         if err != nil {
95                 o.Warn("Failed to create checkpoint file: %s", err)
96                 return
97         }
98         defer fh.Close()
99         fmt.Fprintf(fh, "%d\n", lastId)
100 }
101
102 func saveLastId() {
103         fh, err := os.OpenFile(savePath(), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
104         if err != nil {
105                 o.Warn("Failed to create last ID save file: %s", err)
106                 return
107         }
108         defer fh.Close()
109         fmt.Fprintf(fh, "%d\n", lastId)
110         os.Remove(checkpointPath())
111 }
112
113 func NextRequestId() uint64 {
114         //FIXME: we should do this periodically, not on every new job.
115         defer writeIdCheckpoint()
116         return atomic.AddUint64(&lastId, 1)
117 }
118
119 func FilenameForJobId(jobid uint64) (fullpath string) {
120         fnbits := make([]string, bucketDepth+1)
121         for i := 0; i < bucketDepth; i++ {
122                 fnbits[i] = fmt.Sprintf("%01X", (jobid >> uint(i*4)) & 0xF)
123         }
124         fnbits[bucketDepth] = fmt.Sprintf("%016X", jobid)
125
126         return path.Join(fnbits...)
127 }
128
129 func makeSpoolDirInner(prefix string, depth int) {
130         for i := 0; i < 16; i++ {
131                 dirname := path.Join(prefix, fmt.Sprintf("%01X", i))
132                 if (depth == 1) {
133                         err := os.MkdirAll(dirname, 0700)
134                         o.MightFail(err, "Couldn't make directory building spool tree")
135                 } else {
136                         makeSpoolDirInner(dirname, depth-1)
137                 }
138         }
139 }
140
141 func MakeSpoolDir() {
142         makeSpoolDirInner(path.Join(spoolDirectory, "active"), bucketDepth)
143         makeSpoolDirInner(path.Join(spoolDirectory, "finished"), bucketDepth)
144         os.MkdirAll(path.Join(spoolDirectory, "corrupt"), 0700)
145 }
146
147 func UnlinkNodesForJobId(jobid uint64) {
148         suffix := FilenameForJobId(jobid)
149
150         os.Remove(path.Join(spoolDirectory, "active", suffix))
151         os.Remove(path.Join(spoolDirectory, "finished", suffix))
152 }
153
154 func shuffleToCorrupted(abspath, reason string) {
155         basename := path.Base(abspath)
156         targetname := path.Join(spoolDirectory, "corrupt", basename)
157         // make sure there's nothing in the target name.
158         os.Remove(targetname)
159         err := os.Rename(abspath, targetname)
160         o.MightFail(err, "Couldn't bin corrupt spoolfile %s", abspath)
161         o.Warn("Moved \"%s\" to corrupted spool: %s", abspath, reason)
162 }
163
164 func loadSpoolFiles(dirname string, depth int) {
165         dh, err := os.Open(dirname)
166         o.MightFail(err, "Couldn't open %s", dirname)
167         nodes, err := dh.Readdir(-1)
168         o.MightFail(err, "Couldn't readdir on %s", dirname)
169         if depth > 0 {
170                 for _, n := range nodes {
171                         abspath := path.Join(dirname, n.Name)
172                         if n.IsDirectory() {
173                                 // if not a single character, it's not a spool node.
174                                 if len(n.Name) != 1 {
175                                         continue;
176                                 }
177                                 if n.Name == "." {
178                                         // we're not interested in .
179                                         continue;
180                                 }
181                                 nrunes := []int(n.Name)
182                                 if unicode.Is(unicode.ASCII_Hex_Digit, nrunes[0]) {
183                                         loadSpoolFiles(abspath, depth-1)
184                                 } else {
185                                         o.Warn("Foreign dirent %s found in spool tree", abspath)
186                                 }
187                         }
188                 }
189         } else {
190                 // depth == 0 - only interested in files.
191                 for _, n := range nodes {
192                         abspath := path.Join(dirname, n.Name)
193                         if n.IsRegular() {
194                                 if len(n.Name) != 16 {
195                                         shuffleToCorrupted(abspath, "Filename incorrect length")
196                                         continue
197                                 }
198                                 id, err := strconv.Btoui64(n.Name, 16)
199                                 if err != nil {
200                                         shuffleToCorrupted(abspath, "Invalid Filename")
201                                         continue
202                                 }
203                                 fh, err := os.Open(abspath)
204                                 if err != nil {
205                                         shuffleToCorrupted(abspath, "Couldn't open")
206                                         continue
207                                 }
208                                 defer fh.Close()
209                                 jr, err := JobRequestFromReader(fh)
210                                 if err != nil || jr.Id != id {
211                                         o.Warn("Couldn't parse?! %s", err)
212                                         shuffleToCorrupted(abspath, "Parse Failure")
213                                         continue
214                                 }
215                                 // Add the request to the registry directly.
216                                 if !RestoreJobState(jr) {
217                                         shuffleToCorrupted(abspath, "Job State Invalid")
218                                 }
219                         }
220                 }
221         }
222 }
223
224 // This takes an unmarshall'd job and stuffs it back into the job state.
225 func RestoreJobState(job *JobRequest) bool {
226         // check the valid players list.
227         var playersout []string = nil
228         resultsout := make(map[string]*TaskResponse)
229         for _, p := range job.Players {
230                 if HostAuthorised(p) {
231                         playersout = append(playersout, p)
232                         // fix the result too.
233                         resout, exists := job.Results[p]
234                         if exists && resout != nil {
235                                 resout.id = job.Id
236                                 resultsout[p] = resout
237                         }
238                         // remove it so we can sweep it in pass2 for
239                         // results from old hosts that matter.
240                         job.Results[p] = nil, false
241                 }
242         }
243         job.Players = playersout
244         if len(job.Players) == 0 {
245                 // If there are no players left at this point, discard
246                 // the job as corrupt.
247                 return false
248         }
249         // now, do pass 2 over the remaining results.
250         for k, v := range job.Results {
251                 if v != nil {
252                         // if the results indicate completion, we
253                         // always retain them.
254                         if v.State.Finished() {
255                                 resultsout[k] = v
256                                 resultsout[k].id = job.Id
257                         }
258                 }
259         }
260         job.Results = resultsout
261
262         // now, check the task data.  ONEOF jobs are allowed to
263         // reset tasks that have never been sent.
264         var tasksout []*TaskRequest = nil
265         for _, t := range job.Tasks {
266                 // rebuild the return link
267                 t.job = job
268                 // finished tasks we don't care about.
269                 if t.State.Finished() {
270                         tasksout = append(tasksout, t)
271                         continue
272                 }
273                 if job.Scope == SCOPE_ONEOF {
274                         if t.Player != "" && (t.State == TASK_QUEUED || !HostAuthorised(t.Player)) {
275                                 t.State = TASK_QUEUED
276                                 t.Player = ""
277                         }
278                         tasksout = append(tasksout, t)
279                         continue
280                 } else {
281                         if HostAuthorised(t.Player) {
282                                 tasksout = append(tasksout, t)
283                         }
284                 }
285         }
286         job.Tasks = tasksout
287         if len(job.Tasks) == 0 {
288                 o.Debug("Empty tasks in deserialised job")
289                 // Tasks should never be empty.
290                 return false
291         }
292         // put the job back into the system.
293         JobAdd(job)
294         JobReviewState(job.Id)
295         if (!job.State.Finished()) {
296                 // now, redispatch anything that's not actually finished.
297                 for _, t := range job.Tasks {
298                         if !t.State.Finished() {
299                                 DispatchTask(t)
300                         }
301                 }
302         }
303         return true
304 }
305
306 func LoadState() {
307         loadLastId()
308         dirname := path.Join(spoolDirectory, "active")
309         loadSpoolFiles(dirname, bucketDepth)
310 }
311
312 func SaveState() {
313         JobWriteAll()
314         saveLastId()
315 }