12 func ExecuteTask(task *TaskRequest) <-chan *TaskResponse {
13 complete := make(chan *TaskResponse, 1)
14 go doExecution(task, complete)
19 func batchLogger(jobid uint64, errpipe *os.File) {
22 r := bufio.NewReader(errpipe)
24 lb, _, err := r.ReadLine()
29 o.Warn("executionLogger failed: %s", err)
32 o.Info("JOB %d:STDERR:%s", jobid, string(lb))
36 func peSetEnv(env []string, key string, value string) []string {
39 for i, v := range env {
40 if strings.HasPrefix(v, mkey) {
41 env[i] = key+"="+value
47 env = append(env, key+"="+value)
52 func doExecution(task *TaskRequest, completionChannel chan<- *TaskResponse) {
53 // we must notify the parent when we exit.
54 defer func(c chan<- *TaskResponse, task *TaskRequest) { c <- task.MyResponse }(completionChannel,task)
56 // first of all, verify that the score exists at all.
57 score, exists := Scores[task.Score]
59 o.Warn("job%d: Request for unknown score \"%s\"", task.Id, task.Score)
60 task.MyResponse.State = RESP_FAILED_UNKNOWN_SCORE
63 si := NewScoreInterface(task)
65 o.Warn("job%d: Couldn't initialise Score Interface", task.Id)
66 task.MyResponse.State = RESP_FAILED_HOST_ERROR
70 o.Warn("job%d: Couldn't Prepare Score Interface", task.Id)
71 task.MyResponse.State = RESP_FAILED_HOST_ERROR
76 eenv := si.SetupProcess()
77 task.MyResponse.State = RESP_RUNNING
79 procenv := new(os.ProcAttr)
80 // Build the default environment.
81 procenv.Env = peSetEnv(procenv.Env, "PATH", "/usr/bin:/usr/sbin:/bin:/sbin")
82 procenv.Env = peSetEnv(procenv.Env, "IFS", " \t\n")
83 pwd, err := os.Getwd()
85 task.MyResponse.State = RESP_FAILED_HOST_ERROR
86 o.Warn("job%d: Couldn't resolve PWD: %s", task.Id, err)
89 procenv.Env = peSetEnv(procenv.Env, "PWD", pwd)
90 // copy in the environment overrides
91 for k, v := range eenv.Environment {
92 procenv.Env = peSetEnv(procenv.Env, k, v)
95 // attach FDs to procenv.
96 procenv.Files = make([]*os.File, 3)
98 // first off, attach /dev/null to stdin and stdout
99 devNull, err := os.OpenFile(os.DevNull, os.O_RDWR | os.O_APPEND, 0666)
100 o.MightFail(err, "couldn't open DevNull")
101 defer devNull.Close()
102 for i := 0; i < 2; i++ {
103 procenv.Files[i] = devNull
105 // attach STDERR to to our logger via pipe.
106 lr, lw, err := os.Pipe()
107 o.MightFail(err, "Couldn't create pipe")
109 // lr will be closed by the logger.
110 procenv.Files[2] = lw
111 // check the environment's configuration and allow it to override stdin, stdout, and FDs 3+
112 if nil != eenv.Files {
113 for i := range eenv.Files {
115 procenv.Files[i] = eenv.Files[i]
117 procenv.Files = append(procenv.Files, eenv.Files[i])
121 var args []string = nil
122 args = append(args, eenv.Arguments...)
124 o.Info("job%d: Executing %s", task.Id, score.Executable)
125 go batchLogger(task.Id, lr)
126 proc, err := os.StartProcess(score.Executable, args, procenv)
128 o.Warn("job%d: Failed to start processs", task.Id)
129 task.MyResponse.State = RESP_FAILED_HOST_ERROR
132 wm, err := proc.Wait(0)
134 o.Warn("job%d: Error waiting for process", task.Id)
135 task.MyResponse.State = RESP_FAILED_UNKNOWN
136 // Worse of all, we don't even know if we succeeded.
139 if !(wm.WaitStatus.Signaled() || wm.WaitStatus.Exited()) {
140 o.Assert("Non Terminal notification received when not expected.")
143 if wm.WaitStatus.Signaled() {
144 o.Warn("job%d: Process got signalled", task.Id)
145 task.MyResponse.State = RESP_FAILED_UNKNOWN
148 if wm.WaitStatus.Exited() {
149 if 0 == wm.WaitStatus.ExitStatus() {
150 o.Warn("job%d: Process exited OK", task.Id)
151 task.MyResponse.State = RESP_FINISHED
153 o.Warn("job%d: Process exited with failure", task.Id)
154 task.MyResponse.State = RESP_FAILED
158 o.Assert("Should never get here.")