17 // changing this will result in fire. you have been warned.
20 var spoolDirectory = ""
22 func SetSpoolDirectory(spooldir string) {
23 if spoolDirectory == "" {
24 spoolDirectory = spooldir
26 if spooldir != spoolDirectory {
27 o.Warn("Spool Directory Not Changed.")
32 func GetSpoolDirectory() string {
33 if spoolDirectory == "" {
34 o.Assert("GetSpoolDirectory() called before set")
41 IdCheckpointSafetySkip = 10e4 // Skip 10e4 entries if orchestra didn't shutdown cleanly for safety.
46 func checkpointPath() string {
47 return path.Join(spoolDirectory, "last_id.checkpoint")
50 func savePath() string {
51 return path.Join(spoolDirectory, "last_id")
55 fh, err := os.Open(checkpointPath())
59 // we have a checkpoint file. blah.
60 cbio := bufio.NewReader(fh)
61 l, err := cbio.ReadString('\n')
62 lastId, err = strconv.Atoui64(strings.TrimSpace(l))
64 o.Fail("Couldn't read Last ID from checkpoint file. Aborting for safety.")
66 lastId += IdCheckpointSafetySkip
68 pe, ok := err.(*os.PathError)
69 if !ok || pe.Error != os.ENOENT {
70 o.Fail("Found checkpoint file, but couldn't open it: %s", err)
72 fh,err := os.Open(savePath())
74 pe, ok = err.(*os.PathError)
75 if !ok || pe.Error == os.ENOENT {
79 o.MightFail(err, "Couldn't open last_id file")
82 cbio := bufio.NewReader(fh)
83 l, err := cbio.ReadString('\n')
84 lastId, err = strconv.Atoui64(strings.TrimSpace(l))
86 o.Fail("Couldn't read Last ID from last_id. Aborting for safety.")
92 func writeIdCheckpoint() {
93 fh, err := os.OpenFile(checkpointPath(), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
95 o.Warn("Failed to create checkpoint file: %s", err)
99 fmt.Fprintf(fh, "%d\n", lastId)
103 fh, err := os.OpenFile(savePath(), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
105 o.Warn("Failed to create last ID save file: %s", err)
109 fmt.Fprintf(fh, "%d\n", lastId)
110 os.Remove(checkpointPath())
113 func NextRequestId() uint64 {
114 //FIXME: we should do this periodically, not on every new job.
115 defer writeIdCheckpoint()
116 return atomic.AddUint64(&lastId, 1)
119 func FilenameForJobId(jobid uint64) (fullpath string) {
120 fnbits := make([]string, bucketDepth+1)
121 for i := 0; i < bucketDepth; i++ {
122 fnbits[i] = fmt.Sprintf("%01X", (jobid >> uint(i*4)) & 0xF)
124 fnbits[bucketDepth] = fmt.Sprintf("%016X", jobid)
126 return path.Join(fnbits...)
129 func makeSpoolDirInner(prefix string, depth int) {
130 for i := 0; i < 16; i++ {
131 dirname := path.Join(prefix, fmt.Sprintf("%01X", i))
133 err := os.MkdirAll(dirname, 0700)
134 o.MightFail(err, "Couldn't make directory building spool tree")
136 makeSpoolDirInner(dirname, depth-1)
141 func MakeSpoolDir() {
142 makeSpoolDirInner(path.Join(spoolDirectory, "active"), bucketDepth)
143 makeSpoolDirInner(path.Join(spoolDirectory, "finished"), bucketDepth)
144 os.MkdirAll(path.Join(spoolDirectory, "corrupt"), 0700)
147 func UnlinkNodesForJobId(jobid uint64) {
148 suffix := FilenameForJobId(jobid)
150 os.Remove(path.Join(spoolDirectory, "active", suffix))
151 os.Remove(path.Join(spoolDirectory, "finished", suffix))
154 func shuffleToCorrupted(abspath, reason string) {
155 basename := path.Base(abspath)
156 targetname := path.Join(spoolDirectory, "corrupt", basename)
157 // make sure there's nothing in the target name.
158 os.Remove(targetname)
159 err := os.Rename(abspath, targetname)
160 o.MightFail(err, "Couldn't bin corrupt spoolfile %s", abspath)
161 o.Warn("Moved \"%s\" to corrupted spool: %s", abspath, reason)
164 func loadSpoolFiles(dirname string, depth int) {
165 dh, err := os.Open(dirname)
166 o.MightFail(err, "Couldn't open %s", dirname)
167 nodes, err := dh.Readdir(-1)
168 o.MightFail(err, "Couldn't readdir on %s", dirname)
170 for _, n := range nodes {
171 abspath := path.Join(dirname, n.Name)
173 // if not a single character, it's not a spool node.
174 if len(n.Name) != 1 {
178 // we're not interested in .
181 nrunes := []int(n.Name)
182 if unicode.Is(unicode.ASCII_Hex_Digit, nrunes[0]) {
183 loadSpoolFiles(abspath, depth-1)
185 o.Warn("Foreign dirent %s found in spool tree", abspath)
190 // depth == 0 - only interested in files.
191 for _, n := range nodes {
192 abspath := path.Join(dirname, n.Name)
194 if len(n.Name) != 16 {
195 shuffleToCorrupted(abspath, "Filename incorrect length")
198 id, err := strconv.Btoui64(n.Name, 16)
200 shuffleToCorrupted(abspath, "Invalid Filename")
203 fh, err := os.Open(abspath)
205 shuffleToCorrupted(abspath, "Couldn't open")
209 jr, err := JobRequestFromReader(fh)
210 if err != nil || jr.Id != id {
211 o.Warn("Couldn't parse?! %s", err)
212 shuffleToCorrupted(abspath, "Parse Failure")
215 // Add the request to the registry directly.
216 if !RestoreJobState(jr) {
217 shuffleToCorrupted(abspath, "Job State Invalid")
224 // This takes an unmarshall'd job and stuffs it back into the job state.
225 func RestoreJobState(job *JobRequest) bool {
226 // check the valid players list.
227 var playersout []string = nil
228 resultsout := make(map[string]*TaskResponse)
229 for _, p := range job.Players {
230 if HostAuthorised(p) {
231 playersout = append(playersout, p)
232 // fix the result too.
233 resout, exists := job.Results[p]
234 if exists && resout != nil {
236 resultsout[p] = resout
238 // remove it so we can sweep it in pass2 for
239 // results from old hosts that matter.
240 job.Results[p] = nil, false
243 job.Players = playersout
244 if len(job.Players) == 0 {
245 // If there are no players left at this point, discard
246 // the job as corrupt.
249 // now, do pass 2 over the remaining results.
250 for k, v := range job.Results {
252 // if the results indicate completion, we
253 // always retain them.
254 if v.State.Finished() {
256 resultsout[k].id = job.Id
260 job.Results = resultsout
262 // now, check the task data. ONEOF jobs are allowed to
263 // reset tasks that have never been sent.
264 var tasksout []*TaskRequest = nil
265 for _, t := range job.Tasks {
266 // rebuild the return link
268 // finished tasks we don't care about.
269 if t.State.Finished() {
270 tasksout = append(tasksout, t)
273 if job.Scope == SCOPE_ONEOF {
274 if t.Player != "" && (t.State == TASK_QUEUED || !HostAuthorised(t.Player)) {
275 t.State = TASK_QUEUED
278 tasksout = append(tasksout, t)
281 if HostAuthorised(t.Player) {
282 tasksout = append(tasksout, t)
287 if len(job.Tasks) == 0 {
288 o.Debug("Empty tasks in deserialised job")
289 // Tasks should never be empty.
292 // put the job back into the system.
294 JobReviewState(job.Id)
295 if (!job.State.Finished()) {
296 // now, redispatch anything that's not actually finished.
297 for _, t := range job.Tasks {
298 if !t.State.Finished() {
308 dirname := path.Join(spoolDirectory, "active")
309 loadSpoolFiles(dirname, bucketDepth)