initialise repo
authorSteven McDonald <steven@steven-mcdonald.id.au>
Sun, 25 Sep 2011 13:55:24 +0000 (23:55 +1000)
committerSteven McDonald <steven@steven-mcdonald.id.au>
Sun, 25 Sep 2011 13:55:24 +0000 (23:55 +1000)
71 files changed:
.gitignore [new file with mode: 0644]
LICENSE [new file with mode: 0644]
Makefile [new file with mode: 0644]
README [new file with mode: 0644]
clientlibs/python/README.md [new file with mode: 0644]
clientlibs/python/audience.py [new file with mode: 0644]
debian/changelog [new file with mode: 0644]
debian/compat [new file with mode: 0644]
debian/control [new file with mode: 0644]
debian/orchestra-conductor.conffile [new file with mode: 0644]
debian/orchestra-conductor.default [new file with mode: 0644]
debian/orchestra-conductor.dirs [new file with mode: 0644]
debian/orchestra-conductor.init [new file with mode: 0755]
debian/orchestra-conductor.install [new file with mode: 0644]
debian/orchestra-player-go.conffile [new file with mode: 0644]
debian/orchestra-player-go.default [new file with mode: 0644]
debian/orchestra-player-go.dirs [new file with mode: 0644]
debian/orchestra-player-go.init [new file with mode: 0755]
debian/orchestra-player-go.install [new file with mode: 0644]
debian/rules [new file with mode: 0755]
debian/source/format [new file with mode: 0644]
debian/source/local-options [new file with mode: 0644]
doc/audience_api.txt [new file with mode: 0644]
doc/orchestra.tex [new file with mode: 0644]
doc/score_pipe_interface.txt [new file with mode: 0644]
go-patches/json-unmarshal-immediate.diff [new file with mode: 0644]
go-patches/syslog-auto-reconnect.diff [new file with mode: 0644]
samples/conductor.conf [new file with mode: 0644]
samples/player.conf [new file with mode: 0644]
samples/players [new file with mode: 0644]
src/conductor/Makefile [new file with mode: 0644]
src/conductor/audience.go [new file with mode: 0644]
src/conductor/client.go [new file with mode: 0644]
src/conductor/conductor.go [new file with mode: 0644]
src/conductor/config.go [new file with mode: 0644]
src/conductor/dispatch.go [new file with mode: 0644]
src/conductor/http.go [new file with mode: 0644]
src/conductor/job_request.go [new file with mode: 0644]
src/conductor/job_scope.go [new file with mode: 0644]
src/conductor/job_state.go [new file with mode: 0644]
src/conductor/persistence.go [new file with mode: 0644]
src/conductor/registry.go [new file with mode: 0644]
src/conductor/resp_state.go [new file with mode: 0644]
src/conductor/server.go [new file with mode: 0644]
src/conductor/signal.go [new file with mode: 0644]
src/conductor/task_request.go [new file with mode: 0644]
src/conductor/task_response.go [new file with mode: 0644]
src/conductor/task_state.go [new file with mode: 0644]
src/getstatus/Makefile [new file with mode: 0644]
src/getstatus/getstatus.go [new file with mode: 0644]
src/orchestra/Makefile [new file with mode: 0644]
src/orchestra/marshal.go [new file with mode: 0644]
src/orchestra/orchestra.pb.go [new file with mode: 0644]
src/orchestra/orchestra.proto [new file with mode: 0644]
src/orchestra/shared.go [new file with mode: 0644]
src/orchestra/wire.go [new file with mode: 0644]
src/player/Makefile [new file with mode: 0644]
src/player/config.go [new file with mode: 0644]
src/player/execution.go [new file with mode: 0644]
src/player/if_env.go [new file with mode: 0644]
src/player/if_pipe.go [new file with mode: 0644]
src/player/interface.go [new file with mode: 0644]
src/player/player.go [new file with mode: 0644]
src/player/registry.go [new file with mode: 0644]
src/player/resp_state.go [new file with mode: 0644]
src/player/scores.go [new file with mode: 0644]
src/player/signal.go [new file with mode: 0644]
src/player/task_request.go [new file with mode: 0644]
src/player/task_response.go [new file with mode: 0644]
src/submitjob/Makefile [new file with mode: 0644]
src/submitjob/submitjob.go [new file with mode: 0644]

diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..98ca431
--- /dev/null
@@ -0,0 +1,10 @@
+.\#*
+*~
+\#*
+*.6
+*.8
+
+**/build.out
+**/_obj
+bin/**
+pkg/**
diff --git a/LICENSE b/LICENSE
new file mode 100644 (file)
index 0000000..7eca0f5
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,24 @@
+Copyright (c) 2011, Anchor Systems Pty Ltd
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+    * Redistributions of source code must retain the above copyright
+      notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above copyright
+      notice, this list of conditions and the following disclaimer in the
+      documentation and/or other materials provided with the distribution.
+    * Neither the name of Anchor Systems Pty Ltd nor the
+      names of its contributors may be used to endorse or promote products
+      derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL ANCHOR SYSTEMS PTY LTD BE LIABLE FOR ANY
+DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/Makefile b/Makefile
new file mode 100644 (file)
index 0000000..6f2fd53
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,70 @@
+#
+# version of Orchestra
+#
+VERSION=0.3.0
+
+#
+# packaging revision.
+#
+REVISION=1
+
+# remove at your own peril.
+#
+# This tells goinstall to work against the local directory as the
+# build/source path, and not use the system directories.
+#
+GOPATH=$(PWD)/build-tree:$(PWD)
+GOINSTALL_FLAGS=-dashboard=false -clean=true -u=false -make=false
+
+export GOPATH
+
+all: build
+
+build: build-tree
+       goinstall $(GOINSTALL_FLAGS) conductor
+       goinstall $(GOINSTALL_FLAGS) player
+       goinstall $(GOINSTALL_FLAGS) submitjob
+       goinstall $(GOINSTALL_FLAGS) getstatus
+
+build-tree:
+       mkdir -p build-tree/src
+       mkdir -p build-tree/bin
+       mkdir -p build-tree/pkg
+
+clean:
+       -$(RM) -r build-tree/pkg
+       -$(RM) -r build-tree/bin
+
+distclean:
+       -$(RM) -r build-tree
+
+### NOTE:  Make sure the checkouts have the correct tags in the lines below!
+deps:  distclean build-tree
+       mkdir -p build-tree/src/github.com/kuroneko && cd build-tree/src/github.com/kuroneko && git clone http://github.com/kuroneko/configureit.git && cd configureit && git checkout v0.1
+       mkdir -p build-tree/src/goprotobuf.googlecode.com && cd build-tree/src/goprotobuf.googlecode.com && hg clone -r go.r60 http://goprotobuf.googlecode.com/hg
+
+archive.deps:  deps
+       tar czf ../orchestra-$(VERSION).build-tree.tar.gz -C build-tree --exclude .git --exclude .hg . 
+
+archive.release:       archive.deps
+       git archive --format=tar --prefix=orchestra-$(VERSION)/ v$(VERSION) | gzip -9c > ../orchestra-$(VERSION).tgz
+
+.PHONY : debian debian.orig debian.debian debian.build-tree archive.deps archive.release archive.head
+
+archive.head:
+       git archive --format=tar --prefix=orchestra/ HEAD | gzip -9c > ../orchestra-HEAD.tgz
+
+DEBIAN_VERSION=$(shell dpkg-parsechangelog | grep -e 'Version:' | awk '{ print $$2 }')
+DEBIAN_SRC_VERSION=$(shell echo $(DEBIAN_VERSION) | cut -d- -f 1)
+
+debian:        debian.orig debian.debian debian.build-tree clean
+       cd .. && dpkg-source -b $(PWD)
+
+debian.orig:
+       git archive --format=tar --prefix=orchestra-$(DEBIAN_SRC_VERSION)/ v$(DEBIAN_SRC_VERSION) | gzip -9c > ../orchestra_$(DEBIAN_SRC_VERSION).orig.tar.gz
+
+debian.debian:
+       tar zcf ../orchestra_$(DEBIAN_VERSION).debian.tar.gz -C debian .
+
+debian.build-tree:     deps
+       tar zcf ../orchestra_$(DEBIAN_SRC_VERSION).orig-build-tree.tar.gz -C build-tree --exclude .git --exclude .hg .
diff --git a/README b/README
new file mode 100644 (file)
index 0000000..229f25a
--- /dev/null
+++ b/README
@@ -0,0 +1,124 @@
+A 5 Minute Guide to Orcehstra
+-----------------------------
+
+What is it?
+===========
+
+Orchestra is a series of tools for Getting Shit Run.
+
+It consists of a Conductor, which is the coordinating process, and
+Players, which are the actual daemons running on nodes to do the work.
+
+To prevent arbitrary execution of code, Players can only execute
+predefined scores which have to be installed on them seperately.  You
+can use puppet, cfengine or other configuration management system to
+do this.
+
+Canonically, entities requesting work to be done are known as the
+Audience.
+
+Please read the Orchestra paper (in doc/) for more information.
+
+License
+=======
+
+Copyright (c) 2011, Anchor Systems Pty Ltd
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+    * Redistributions of source code must retain the above copyright
+      notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above copyright
+      notice, this list of conditions and the following disclaimer in the
+      documentation and/or other materials provided with the distribution.
+    * Neither the name of Anchor Systems Pty Ltd nor the
+      names of its contributors may be used to endorse or promote products
+      derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
+DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+Building
+========
+
+Install the Go Release 59 compiler.  Prior to compilation, apply the
+patches in go-patches/ in order to fix a few package bugs.
+
+From the top level directory, run make.  It will use goinstall to
+build the binaries in bin/
+
+Source Layout
+=============
+
+src/    -- All the go sources for the conductor, player, and the 
+          submitjob and getstatus sample implementations.
+
+doc/    -- Documentation about Orchestra and it's implementation.
+
+samples/ -- Sample configuration files.
+
+clientlibs/ -- Sample client libraries for communicating with the
+              Conductor as the Audience.
+
+go-patches/ -- Patches against the Release version of Go to fix
+              critical bugs or missing features encountered when
+              implementing Orchestra
+
+Patches Included
+================
+
+ * json-unmarshal-immediate.diff : Fix for Go Bug #2170
+ * syslog-auto-reconnect.diff : Fix for Go Bug #2264
+
+New In This Release
+===================
+
+v0.3.0:
+ * BUGFIX: Fixed conductor ignoring the last_id checkpoint file after
+     clean shutdowns.
+ * FEATUREFIX: Fix the exported fieldnames in the audience interface 
+     so they no longer contain capitals.  Refactor slightly to reuse
+     state types defined for persistence.
+ * Separation of some of the more esoteric shared code from the common
+     library
+ * Conductor Queue Persistence.
+ * Patches against the Go standard packages. :(
+
+v0.2.0:
+ * First public release.
+
+Known Issues
+============
+
+ * There is no clean up of job data, or persistance of results at this
+   time.  This is intended to be implemented as soon as time permits.
+
+ * getstatus gets back a lot more information than it displays.
+
+ * No efficient 'wait for job' interface yet.  You have to poll the
+   audience interface for completion results for now.  (The polls are,
+   however, stupidly cheap)
+
+ * Disconnect/Reconnect behaviour for players is not particualrly well
+   tested.  Annecdotal evidence suggests that this is relatively
+   robust however.
+
+ * Jobs will be left dangling if a player is removed from the
+   conductor's configuration and the conductor HUP'd whilst there is
+   still work pending for that player.
+
+ * Some of the more advanced score scheduling ideas that we've had
+   remain unimplemented, resulting in Orchestra looking a lot blander
+   than it really is meant to be.
+
+ * There is no support for CRLs yet.
diff --git a/clientlibs/python/README.md b/clientlibs/python/README.md
new file mode 100644 (file)
index 0000000..6af781d
--- /dev/null
@@ -0,0 +1,16 @@
+# pyaudience
+
+This is a very simple interface stub for python to talk to a running
+conductor.
+
+submit_job submits a job.  It does no argument checking (it probably
+should).
+
+get_status gets data for a job.  If successful, it returns the unmarshall'd
+json result.
+
+Both methods throw exceptions if anything goes wrong.  There is a
+ServerError exception type which will be raised if the server
+complains about anything.
+
+For more information about this API, please see doc/audience_api.txt
diff --git a/clientlibs/python/audience.py b/clientlibs/python/audience.py
new file mode 100644 (file)
index 0000000..27106d0
--- /dev/null
@@ -0,0 +1,58 @@
+# audience.py
+
+import socket
+import json
+
+DEFAULT_SOCKET_PATH="/var/spool/orchestra/conductor.sock"
+
+class ServerError(Exception):
+    pass
+
+def submit_job(score, scope, target, args=None, sockname=DEFAULT_SOCKET_PATH):
+    reqObj = {
+        'op': 'queue',
+        'score': score,
+        'scope': scope,
+        'players': None,
+        'params': {}
+    }
+
+    reqObj['players'] = list(target)
+
+    if args is not None:
+        reqObj['params'] = args
+
+    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+    sock.connect(sockname)
+    f = sock.makefile()
+    try:
+        f.write(json.dumps(reqObj))
+        f.flush()
+        resp = json.load(f)
+
+        if resp[0] == 'OK':
+            return resp[1]
+        else:
+            raise ServerError(resp[1])
+    finally:
+        sock.close()
+
+def get_status(jobid, sockname=DEFAULT_SOCKET_PATH):
+    reqObj = {
+        'op': 'status',
+        'id': jobid
+    }
+    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+    sock.connect(sockname)
+    f = sock.makefile()
+    try:
+        f.write(json.dumps(reqObj))
+        f.flush()
+        resp = json.load(f)
+
+        if resp[0] == 'OK':
+            return resp[1]
+        else:
+            raise ServerError(resp[1])
+    finally:
+        sock.close()
diff --git a/debian/changelog b/debian/changelog
new file mode 100644 (file)
index 0000000..83f9462
--- /dev/null
@@ -0,0 +1,59 @@
+orchestra (0.3.0-0anchor1) unstable; urgency=low
+
+  * Update to golang r60.
+  * Update to upstream 0.3.0
+
+ -- Chris Collins <chris.collins@anchor.net.au>  Tue, 20 Sep 2011 14:27:37 +1000
+
+orchestra (0.2.0-0anchor1) unstable; urgency=low
+
+  * Update to golang r59.
+  * New build chain.
+
+ -- Chris Collins <chris.collins@anchor.net.au>  Thu, 04 Aug 2011 15:53:40 +1000
+
+orchestra (0.1.4-1) unstable; urgency=low
+
+  * Fix bug that crept in with fixes for #218.
+
+ -- Chris Collins <chris.collins@anchor.net.au>  Tue, 26 Jul 2011 14:17:58 +1000
+
+orchestra (0.1.3-1) unstable; urgency=low
+
+  * Update job state when disqualifying players. (fixes #208 & #218)
+
+  * Validate requests better.  (fixes #218)
+  
+  * Fix handling of last_id (fixes #216)
+  
+  * Initialise Response Maps correctly (fixes #217)
+  
+  * Filter non-executable files in the scores directory from consideration
+
+ -- Chris Collins <chris.collins@anchor.net.au>  Tue, 26 Jul 2011 12:01:48 +1000
+
+orchestra (0.1.2-2) unstable; urgency=low
+
+  * Add missing /var/spool/orchestra directory to package management.
+
+ -- Chris Collins <chris.collins@anchor.net.au>  Tue, 19 Jul 2011 14:58:13 +1000
+
+orchestra (0.1.2-1) unstable; urgency=low
+
+  * Update for 0.1.2 bugfixes and 'pipe' interface.
+
+ -- Chris Collins <chris.collins@anchor.net.au>  Tue, 19 Jul 2011 14:35:52 +1000
+
+orchestra (0.1.1-1) unstable; urgency=low
+
+  * Update for 0.1.1 bugfixes.
+  
+  * Default to disabled.
+
+ -- Chris Collins <chris.collins@anchor.net.au>  Thu, 07 Jul 2011 16:03:56 +1000
+
+orchestra (0.1-1) unstable; urgency=low
+
+  * Initial Version
+
+ -- Chris Collins <chris.collins@anchor.net.au>  Mon, 04 Jul 2011 11:53:26 +1000
diff --git a/debian/compat b/debian/compat
new file mode 100644 (file)
index 0000000..7f8f011
--- /dev/null
@@ -0,0 +1 @@
+7
diff --git a/debian/control b/debian/control
new file mode 100644 (file)
index 0000000..62319a5
--- /dev/null
@@ -0,0 +1,30 @@
+Source: orchestra
+Maintainer: Chris Collins <chris.collins@anchor.net.au>
+Section: admin
+Priority: extra
+Build-Depends: debhelper (>= 7.0.50~), golang (>= 1:60), golang (<< 1:61)
+Standards-Version: 3.9.1
+
+Package: orchestra-conductor
+Architecture: i386 amd64
+Depends: ssl-cert, ${misc:Depends}
+Description: The Orchestra management server
+ Orchestra is a system for managing distributed job execution over a
+ group of servers.
+ .
+ This package contains the Conductor which manages the request queues
+ and dispatches work to the servers.  It also contains the job
+ submission tools.
+
+Package: orchestra-player-go
+Architecture: i386 amd64
+Description: The Orchestra execution agent
+ Orchestra is a system for managing distributed job execution over a
+ group of servers.
+ .
+ This package contains the go implementation of the Player which
+ retrieves the work queue from the Conductor and performs the actual
+ task execution.
+ .
+ As this particular implemention is written in Go, it only works on
+ x86 architecture systems.
diff --git a/debian/orchestra-conductor.conffile b/debian/orchestra-conductor.conffile
new file mode 100644 (file)
index 0000000..949d739
--- /dev/null
@@ -0,0 +1,2 @@
+/etc/orchestra/player
+/etc/orchestra/conductor.conf
diff --git a/debian/orchestra-conductor.default b/debian/orchestra-conductor.default
new file mode 100644 (file)
index 0000000..fcde44a
--- /dev/null
@@ -0,0 +1,8 @@
+#
+# Defaults for orchestra-conductor
+#
+
+#
+# set ENABLED=yes to enable the conductor.
+#
+ENABLED=no
diff --git a/debian/orchestra-conductor.dirs b/debian/orchestra-conductor.dirs
new file mode 100644 (file)
index 0000000..b3bb146
--- /dev/null
@@ -0,0 +1,4 @@
+/usr/sbin
+/usr/bin
+/etc/orchestra
+/var/spool/orchestra
diff --git a/debian/orchestra-conductor.init b/debian/orchestra-conductor.init
new file mode 100755 (executable)
index 0000000..192bbf7
--- /dev/null
@@ -0,0 +1,66 @@
+#!/bin/sh
+
+### BEGIN INIT INFO
+# Provides:             orchestra-conductor
+# Required-Start:       networking
+# Required-Stop:        networking
+# Default-Start:        2 3 4 5
+# Default-Stop:                0 1 6
+# Short-Description:    Conductor queue manager for Orchestra
+### END INIT INFO
+
+set -e
+
+export PATH="/usr/bin:/bin:/usr/sbin:/sbin"
+
+CONDUCTOR=/usr/sbin/conductor
+
+test -x "${CONDUCTOR}" || exit 0
+
+CONDUCTOR_ARGS=""
+ENABLED=no
+PIDFILE=/var/run/orchestra-conductor.pid
+
+if test -f /etc/default/orchestra-conductor; then
+   . /etc/default/orchestra-conductor
+fi
+
+. /lib/lsb/init-functions
+
+if [ "${ENABLED}" != "yes" ]; then
+    exit 0
+fi
+
+case "$1" in
+    start)
+       log_daemon_msg "Starting the Conductor"
+       if start-stop-daemon --start --quiet --oknodo --background --pidfile "${PIDFILE}" --make-pidfile --exec "${CONDUCTOR}" -- ${CONDUCTOR_ARGS}; then
+           log_end_msg 0
+       else
+           log_end_msg 1
+       fi
+       ;;
+    stop)
+       log_daemon_msg "Stopping the Conductor"
+       if start-stop-daemon --stop --quiet --oknodo --pidfile "${PIDFILE}" -x "${CONDUCTOR}"; then
+           log_end_msg 0
+       else
+           log_end_msg 1
+       fi
+       ;;
+    reload)
+       log_daemon_msg "Asking the Conductor to Reload Configuration"
+       if start-stop-daemon --stop --quiet --signal HUP --pidfile "${PIDFILE}" -x "${CONDUCTOR}"; then
+           log_end_msg 0
+       else
+           log_end_msg 1
+       fi
+       ;;
+    status)
+       status_of_proc -p "${PIDFILE}" "${CONDUCTOR}" conductor && exit 0 || exit $?
+       ;;
+    *)
+       log_action_msg "Usage: /etc/init.d/orchestra-conductor {start|stop|reload|status}"
+       exit 1
+       ;;
+esac
diff --git a/debian/orchestra-conductor.install b/debian/orchestra-conductor.install
new file mode 100644 (file)
index 0000000..c267556
--- /dev/null
@@ -0,0 +1,5 @@
+bin/conductor  /usr/sbin
+bin/getstatus  /usr/bin
+bin/submitjob  /usr/bin
+samples/players        /etc/conductor
+samples/conductor.conf /etc/conductor
\ No newline at end of file
diff --git a/debian/orchestra-player-go.conffile b/debian/orchestra-player-go.conffile
new file mode 100644 (file)
index 0000000..e9ea019
--- /dev/null
@@ -0,0 +1 @@
+/etc/orchestra/player.conf
diff --git a/debian/orchestra-player-go.default b/debian/orchestra-player-go.default
new file mode 100644 (file)
index 0000000..d4c5c35
--- /dev/null
@@ -0,0 +1,8 @@
+#
+# Defaults for orchestra-player-go
+#
+
+#
+# set ENABLED=yes to enable the player.
+#
+ENABLED=no
diff --git a/debian/orchestra-player-go.dirs b/debian/orchestra-player-go.dirs
new file mode 100644 (file)
index 0000000..d175ca6
--- /dev/null
@@ -0,0 +1,3 @@
+/usr/sbin
+/usr/lib/orchestra/scores
+/etc/orchestra
diff --git a/debian/orchestra-player-go.init b/debian/orchestra-player-go.init
new file mode 100755 (executable)
index 0000000..933912a
--- /dev/null
@@ -0,0 +1,67 @@
+#!/bin/sh
+
+### BEGIN INIT INFO
+# Provides:             orchestra-player-go
+# Required-Start:       networking
+# Required-Stop:        networking
+# Default-Start:        2 3 4 5
+# Default-Stop:                0 1 6
+# Short-Description:    Player execuation agent for Orchestra
+### END INIT INFO
+
+set -e
+
+export PATH="/usr/bin:/bin:/usr/sbin:/sbin"
+
+PLAYER=/usr/sbin/player
+
+test -x "${PLAYER}" || exit 0
+
+PLAYER_ARGS=""
+ENABLED=no
+
+PIDFILE=/var/run/orchestra-player-go.pid
+
+if test -f /etc/default/orchestra-player-go; then
+   . /etc/default/orchestra-player-go
+fi
+
+. /lib/lsb/init-functions
+
+if [ "${ENABLED}" != "yes" ]; then
+    exit 0
+fi
+
+case "$1" in
+    start)
+       log_daemon_msg "Starting the Player"
+       if start-stop-daemon --start --quiet --oknodo --background --pidfile "${PIDFILE}" --make-pidfile --exec "${PLAYER}" -- ${PLAYER_ARGS}; then
+           log_end_msg 0
+       else
+           log_end_msg 1
+       fi
+       ;;
+    stop)
+       log_daemon_msg "Stopping the Player"
+       if start-stop-daemon --stop --quiet --oknodo --pidfile "${PIDFILE}" -x "${PLAYER}"; then
+           log_end_msg 0
+       else
+           log_end_msg 1
+       fi
+       ;; 
+    reload)
+       log_daemon_msg "Asking the Player to Reload Configuration"
+       if start-stop-daemon --stop --quiet --signal HUP --pidfile "${PIDFILE}" -x "${PLAYER}"; then
+           log_end_msg 0
+       else
+           log_end_msg 1
+       fi
+       ;;
+    status)
+       status_of_proc -p "${PIDFILE}" "${PLAYER}" player && exit 0 || exit $?
+       ;;
+    *)
+       log_action_msg "Usage: /etc/init.d/orchestra-player-go {start|stop|status}"
+       exit 1
+       ;;
+esac
diff --git a/debian/orchestra-player-go.install b/debian/orchestra-player-go.install
new file mode 100644 (file)
index 0000000..52ba624
--- /dev/null
@@ -0,0 +1,2 @@
+bin/player             /usr/sbin
+samples/player.conf    /etc/conductor
diff --git a/debian/rules b/debian/rules
new file mode 100755 (executable)
index 0000000..33b82a2
--- /dev/null
@@ -0,0 +1,78 @@
+#!/usr/bin/make -f
+# Sample debian/rules that uses debhelper.
+# This file is public domain software, originally written by Joey Hess. 
+#
+# This version is for packages that are architecture dependent.
+
+# Uncomment this to turn on verbose mode.
+#export DH_VERBOSE=1
+
+build: build-stamp
+build-stamp:
+       dh_testdir
+
+       make build
+
+       touch build-stamp
+
+clean:
+       dh_testdir
+       dh_testroot
+       rm -f build-stamp
+
+       make clean
+
+       dh_clean
+
+install: build
+       dh_testdir
+       dh_testroot
+       dh_prep
+       dh_installdirs
+
+       # Add here commands to install the package into debian/<packagename>
+       #$(MAKE) prefix=`pwd`/debian/`dh_listpackages`/usr install
+
+# Build architecture-independent files here.
+binary-indep: build install
+# We have nothing to do by default.
+
+# Build architecture-dependent files here.
+binary-arch: build install
+       dh_testdir
+       dh_testroot
+       dh_installchangelogs
+       dh_installdocs
+       dh_installexamples
+       dh_install
+#      dh_installmenu
+#      dh_installdebconf       
+#      dh_installlogrotate
+#      dh_installemacsen
+#      dh_installcatalogs
+#      dh_installpam
+#      dh_installmime
+       dh_installinit
+#      dh_installcron
+#      dh_installinfo
+#      dh_installwm
+#      dh_installudev
+#      dh_lintian
+#      dh_bugfiles
+#      dh_undocumented
+       dh_installman
+       dh_link
+# Strip baaaad when go programs are involved.
+#      dh_strip
+       dh_compress
+       dh_fixperms
+#      dh_perl
+#      dh_makeshlibs
+       dh_installdeb
+       dh_shlibdeps
+       dh_gencontrol
+       dh_md5sums
+       dh_builddeb
+
+binary: binary-indep binary-arch
+.PHONY: build clean binary-indep binary-arch binary install
diff --git a/debian/source/format b/debian/source/format
new file mode 100644 (file)
index 0000000..163aaf8
--- /dev/null
@@ -0,0 +1 @@
+3.0 (quilt)
diff --git a/debian/source/local-options b/debian/source/local-options
new file mode 100644 (file)
index 0000000..2340d6e
--- /dev/null
@@ -0,0 +1,2 @@
+tar-ignore = .git .hg
+
diff --git a/doc/audience_api.txt b/doc/audience_api.txt
new file mode 100644 (file)
index 0000000..e6c6824
--- /dev/null
@@ -0,0 +1,50 @@
+Audience API:
+
+So:  Our Audience needs a way to do stuff.
+
+All key names (quoted below) are case sensitive.
+
+Overview:
+  Connect to the Unix Socket.
+  Send JSON.
+  Read JSON response back.
+  Close socket.
+
+QUEUE JOB:
+
+Request:
+- dict:
+  - 'op': 'queue'
+  - 'score':  Score Name
+  - 'players':  Array
+    - playername
+  - 'scope': either 'all' or 'one'
+  - 'params': dict
+    - k/v's passed through to job.
+
+Response:
+- array:
+[error, jobid]
+
+GET STATUS:
+Request:
+- dict:
+  - 'op': 'status'
+  - 'id': jobid
+
+Response:
+- array:
+[error, dict]
+
+dict is:
+- 'status': aggregated result "OK/Failure"
+- 'players': dict - individual results
+  - hostname: dict
+    - 'status': individual OK/Failure
+    - 'response': dict
+
+error is 'OK' if successful.  jobid is the JobID if sucessful.
+
+Error is otherwise an error mesage.
+
+
diff --git a/doc/orchestra.tex b/doc/orchestra.tex
new file mode 100644 (file)
index 0000000..63c6c71
--- /dev/null
@@ -0,0 +1,239 @@
+\documentclass[a4paper]{article}
+\usepackage[T1]{fontenc}
+\usepackage{textcomp}
+\usepackage{mathptmx}
+\begin{document}
+\title{Orchestra}
+\author{Christopher Collins}
+\maketitle
+
+\section{Introduction}
+
+Orchestra is a suite for managing the reliable execution of tasks over
+a number of hosts.  It is intended to be simple and secure, leaving
+more complicated coordination and tasks to tools better suited for
+those jobs.
+
+To accomodate the needs of web interfaces and other transactional
+interface methods, the main mode of operation for clients using
+Orchestra is asynchronous with respect to the execution of the jobs.
+
+\section{Operating Principles}
+
+The fundamental ideas behing Orchestra is that a client (the
+``Audience'') requests that a job (the ``Score'') is excuted by one or
+more agents (the ``Players'').  This process is managed by the
+``Conductor'' which mediates between the Audience and the Players.
+
+A request consists of the name of the Score to execute, and a scope of
+execution which defines if the score is to be executed on a single
+player or multiple players, and which players are allowed to service
+the request.
+
+Once a request has been received by a conductor, it is broken down
+into one or more tasks which represent a single execution of the
+requests score on a single machine.  These tasks are communicated to
+the players, which perform the task, and report back outcomes, which
+are then collated by the conductor back into a response.
+
+\section{Scores}
+
+Scores are an executable object, be it a script or binary executable,
+which is optionally accompanied by a configuration file.
+
+In order for a Score to be considered for execution by a Player, it
+must have the executable bit set, and must be in the Score Directory
+({\tt /usr/lib/orchestra/scores} by default).  The filename of the
+Score itself is used to identify it in Job requests.
+
+The optional configuration file contains directives which may tell the
+Player how to behave when executing the score.
+
+Currently, the score configuration file allows you to set which
+communication interface to use when exchanging information with the
+Score, the initial working directory and the initial path for the
+score.
+
+Intended future features includes the ability to have the Player
+change the effective User ID prior to execution of the score.
+
+Because we allow bidirectional communication between Scores and
+Players, we need to establish a mechanism to do this.  The interface
+option controls this, and currently can be set to either ``{\tt env}''
+or ``{\tt pipe}''.
+
+The Player only loads score information at start up, or when prompted
+to by a {\tt SIGHUP}.  Adding or removing files from this directory
+will not change what the Player considers valid, or the parameters for
+executing a score, until a {\tt SIGHUP} has been issued.
+
+\subsection{env Interface}
+
+The ``{\tt env}'' interface is a one-way communication interface which
+allows the Player to communicate paramaters to the Score, but not
+vice-versa.
+
+It does this by preprending ``{\tt ORC\_}'' to the key name, and
+setting it in the environment appropriately.
+
+The ``{\tt env}'' interface connects the Score's {\tt STDIN} and {\tt
+  STDOUT} to {\tt /dev/null}.
+
+If the process exits with a exit status of 0, it is treated as a
+success.  All other outcomes are treated as failures.
+
+\subsection{pipe Interface}
+
+The ``{\tt pipe}'' interface is a two-way communication interface that
+operates in a very similar manner to the {\tt env} interface.
+
+Rather than connecting {\tt STDOUT} to {\tt /dev/null}, it's connected
+to a pipe which it listens on for lines of the format ``{\tt
+  <key>=<value>}''.  When it receives such a line, it is set into the
+Response set, replacing any existing values for {\tt <key>}.
+
+Just as with {\tt env}, if the process exits with a exit status of 0,
+it is treated as a success.  All other outcomes are treated as
+failures.
+
+\section{Audience Requests}
+
+At present, there are only two operations available to the audience:
+
+\begin{itemize}
+\item Submit Job
+\item Query Status of Job
+\end{itemize}
+
+\subsection{Submit Job}
+
+The Submit Job operation allows an audience member to submit a request
+for a score to be executed.
+
+The request contains the Score's name, a scope (``One of'' or ``All
+of''), the valid players list, and a series of key/value parameters to
+be passed to the score.
+
+The Conductor responds to this by either rejecting the request with an
+error, or by returning the ID for this request.
+
+Once accepted, the job is then completed as per the considerations
+listed under ``Job Execution''.
+
+\subsection{Query Status of Job}
+
+The Query Status of Job operation allows an audience member to get the
+status of a previous job sumission.
+
+The request contains the ID for the job to check.
+
+The Conductor responds to this by either rejecting the request with an
+error, or by returning an aggregate status for the Job, and detailed
+individual response data.
+
+The Aggregate status is either a Success, Partial Success or Failure
+status.
+
+With a ``One Of'' job, the aggregate status is simply the outcome from
+the execution attempt.
+
+With an ``All Of'' job, the aggregate status only indicates success or
+failure if the result is unanimous.  Partial Success is returned for
+any mixed outcome jobs.
+
+\section{Job Execution}
+
+When a Job is accepted, it is refactored as a series of single machine
+tasks which are then appened to a FIFO queue.
+
+Players, when idle, send requests for tasks to the Conductor, which
+then drains the first task from the queue which the Player can
+service, and sends the data to that Player, committing the Task to
+that specific player if it has ``One of'' scope.
+
+The Player then attempts to execute a score according to the details
+in the task.  This returns a result upon completion or error, and if
+successful, may also contain a key/value pair set response.  This is
+then reported back to the Conductor.
+
+Score results are either Hard outcomes (such as Success, Failure,
+Internal Error, Host Error During Execution) or Soft outcomes (Error
+starting score, Score unknown).  When a soft failure outcome is
+reported back to the Conductor for a ``One Of'' job, the conductor
+records the result, removes the player from the valid destinations
+list, and reschedules the task for execution at the head of the queue.
+
+In the interest of security, all network communication performed
+between the Conductor and Player is encrypted using TLS.
+
+\section{The Conductor}
+
+The Conductor is the single most imporant process in an Orchestra
+deployment.  It manages the work queue, and services requests from the
+Audience.
+
+Access to the Conductor by Players is controlled by a combination of
+the player list, a simple text file which lists the full hostname of
+all authorised players, and verification of the Player's TLS
+certificate.
+
+\subsection{Audience Interface}
+
+The Conductor listens for audience requests on a Unix Domain Socket.
+The path can be configured via the {\tt conductor.conf} configuration
+file, but defaults to {\tt /var/run/audience.sock}.
+
+The audience socket accepts a connection from any local user, and
+expects the client to send a single JSON encoded object.  It will then
+either send back a JSON encoded response or close the socket
+immediately if it couldn't understand the request.  
+
+It is expected that the service time for such requests is fast enough
+that it will not interfere with web applications polling for job
+execution status.
+
+Further documentation about this interface can be found in {\tt
+  doc/audience\_api.txt}.
+
+\subsection{Status Interface}
+
+The Condcutor implements a very basic human readable status interface
+accessible via HTTP on port 2259.  This interface tells you how many
+tasks are currently pending dispatch, and which hosts are currently
+idle, pending Tasks.
+
+\subsection{Player Interface}
+
+The Player interface is a TCP TLS listener on port 2258 which
+implements the Orchestra Player/Conductor protocol.
+
+The Protocol is a binary protocol that uses a well defined 3 byte
+header combined with message bodies encoded using the Google Protocol
+Buffers toolkit.
+
+The protocol was constructed this way in order to make it easy to
+implement a new Player in another language.  This was to allow for the
+implementation of Orchestra Players on systems that do not have a Go
+compiler.
+
+\section{Security Considerations}
+
+To ensure security, the Players and Conductor mutually authenticate
+each other using X.509 certificates - the players verify that the
+conductor's name matches it's certificate, and that it has a trust
+chain to a trusted Certificate Authority, and the conductor verifies
+that each connecting player's certificate matches who it claims to be
+and that the certificate has a trust chain back to it's trusted
+Certificate Authorities.  This allows both parties to be assured that
+they're communicating with the correct entity.
+
+As the whole concept of Orchestra is to allow the remote execution of
+work, it was an intentional design decision to restrict execution to
+existing scripts or executables on the system which have been
+explicitly configured to be invocable.  The distribution or
+installation of these materials is left for either manual deployment,
+or for other automated management systems to handle (such as Chef, or
+Puppet).
+
+\end{document}
+
diff --git a/doc/score_pipe_interface.txt b/doc/score_pipe_interface.txt
new file mode 100644 (file)
index 0000000..12a5020
--- /dev/null
@@ -0,0 +1,22 @@
+The Score 'pipe' Interface
+==========================
+
+So, there was a 'env' interface which provided K/V data to the score via
+the environment.
+
+That works for sending information to the score, but not for getting
+information back.
+
+To make that step, the 'pipe' interface has been built.
+
+pipe reads line output in stdout from the executing process.  If the
+line contains a '=', it's split at the first '=' for the Key and
+Value, and they are stored in the responses.  All other output is
+ignored.
+
+To use this interface, the score's .conf file should contain:
+----
+interface=pipe
+----
+
+
diff --git a/go-patches/json-unmarshal-immediate.diff b/go-patches/json-unmarshal-immediate.diff
new file mode 100644 (file)
index 0000000..e734ca0
--- /dev/null
@@ -0,0 +1,21 @@
+diff -r bacb4ed5791c src/pkg/json/decode.go
+--- a/src/pkg/json/decode.go   Thu Aug 04 16:38:18 2011 +1000
++++ b/src/pkg/json/decode.go   Mon Aug 22 16:50:36 2011 +1000
+@@ -549,6 +549,17 @@
+       item := d.data[start:d.off]
+       // Check for unmarshaler.
++      // first, check if we can do an immediate unmarshalling.
++      if v.CanAddr() {
++              unmarshaler, ok := v.Addr().Interface().(Unmarshaler)
++              if ok {
++                      err := unmarshaler.UnmarshalJSON(item)
++                      if err != nil {
++                              d.error(err)
++                      }
++                      return
++              }
++      }
+       wantptr := item[0] == 'n' // null
+       unmarshaler, pv := d.indirect(v, wantptr)
+       if unmarshaler != nil {
diff --git a/go-patches/syslog-auto-reconnect.diff b/go-patches/syslog-auto-reconnect.diff
new file mode 100644 (file)
index 0000000..a6ce1b6
--- /dev/null
@@ -0,0 +1,385 @@
+diff -r b0819469a6df src/pkg/syslog/syslog.go
+--- a/src/pkg/syslog/syslog.go Thu Sep 08 12:16:42 2011 +1000
++++ b/src/pkg/syslog/syslog.go Tue Sep 20 11:29:08 2011 +1000
+@@ -17,23 +17,66 @@
+ type Priority int
+ const (
+-      // From /usr/include/sys/syslog.h.
+-      // These are the same on Linux, BSD, and OS X.
+-      LOG_EMERG Priority = iota
+-      LOG_ALERT
+-      LOG_CRIT
+-      LOG_ERR
+-      LOG_WARNING
+-      LOG_NOTICE
+-      LOG_INFO
+-      LOG_DEBUG
++      // these are internal and are used to detect when overrides
++      // are in place.
++      SeverityPresent = Priority(0x100)
++      SeverityMask    = Priority(0x7)
++      FacilityPresent = Priority(0x200)
++      FacilityMask    = Priority(0x1f << 3)
++
++      // From the RFCs.  Names lifted from C.
++      LOG_EMERG   = Priority(0) | SeverityPresent
++      LOG_ALERT   = Priority(1) | SeverityPresent
++      LOG_CRIT    = Priority(2) | SeverityPresent
++      LOG_ERR     = Priority(3) | SeverityPresent
++      LOG_WARNING = Priority(4) | SeverityPresent
++      LOG_NOTICE  = Priority(5) | SeverityPresent
++      LOG_INFO    = Priority(6) | SeverityPresent
++      LOG_DEBUG   = Priority(7) | SeverityPresent
++
++      LOG_KERN     = Priority(0<<3) | FacilityPresent
++      LOG_USER     = Priority(1<<3) | FacilityPresent
++      LOG_MAIL     = Priority(2<<3) | FacilityPresent
++      LOG_DAEMON   = Priority(3<<3) | FacilityPresent
++      LOG_AUTH     = Priority(4<<3) | FacilityPresent
++      LOG_SYSLOG   = Priority(5<<3) | FacilityPresent
++      LOG_LPR      = Priority(6<<3) | FacilityPresent
++      LOG_NEWS     = Priority(7<<3) | FacilityPresent
++      LOG_UUCP     = Priority(8<<3) | FacilityPresent
++      LOG_CRON     = Priority(9<<3) | FacilityPresent
++      LOG_AUTHPRIV = Priority(10<<3) | FacilityPresent
++      LOG_FTP      = Priority(11<<3) | FacilityPresent
++      LOG_LOCAL0   = Priority(16<<3) | FacilityPresent
++      LOG_LOCAL1   = Priority(17<<3) | FacilityPresent
++      LOG_LOCAL2   = Priority(18<<3) | FacilityPresent
++      LOG_LOCAL3   = Priority(19<<3) | FacilityPresent
++      LOG_LOCAL4   = Priority(20<<3) | FacilityPresent
++      LOG_LOCAL5   = Priority(21<<3) | FacilityPresent
++      LOG_LOCAL6   = Priority(22<<3) | FacilityPresent
++      LOG_LOCAL7   = Priority(23<<3) | FacilityPresent
+ )
++// splicePriority takes origPri, and mixes in a severity or facility
++// if present in mixPri.
++func splicePriority(origPri, mixPri Priority) (newPri Priority) {
++      newPri = origPri
++      if (mixPri & SeverityPresent) == SeverityPresent {
++              newPri = (newPri & ^SeverityMask) | (mixPri & SeverityMask)
++      }
++      if (mixPri & FacilityPresent) == FacilityPresent {
++              newPri = (newPri & ^FacilityMask) | (mixPri & FacilityMask)
++      }
++      return newPri
++}
++
+ // A Writer is a connection to a syslog server.
+ type Writer struct {
+       priority Priority
+       prefix   string
+       conn     serverConn
++      // persist the configured peer so we can reconnect when things really catch on fire.
++      network string
++      raddr   string
+ }
+ type serverConn interface {
+@@ -53,38 +96,104 @@
+       return Dial("", "", priority, prefix)
+ }
+-// Dial establishes a connection to a log daemon by connecting
+-// to address raddr on the network net.
++// Dial sets up a connection to a log daemon.  The connection attempt
++// will be deferred until the first log message is sent.
+ // Each write to the returned writer sends a log message with
+ // the given priority and prefix.
++// If the prefix is empty, the binary's name - will be used in it's place.
+ func Dial(network, raddr string, priority Priority, prefix string) (w *Writer, err os.Error) {
+       if prefix == "" {
+               prefix = os.Args[0]
+       }
+-      var conn serverConn
+-      if network == "" {
+-              conn, err = unixSyslog()
++      return &Writer{priority & 0xFF, prefix, nil, network, raddr}, err
++}
++
++// Actually perform a real reconnect, closing any connections that may be open.
++func (w *Writer) Reconnect() (err os.Error) {
++      if w.conn != nil {
++              log.Printf("writer.Reconnect() on old connection.\n")
++              w.conn.close()
++              w.conn = nil
++      }
++      if w.network == "" {
++              w.conn, err = unixSyslog()
+       } else {
+               var c net.Conn
+-              c, err = net.Dial(network, raddr)
+-              conn = netConn{c}
++              c, err = net.Dial(w.network, w.raddr)
++              w.conn = &netConn{c}
+       }
+-      return &Writer{priority, prefix, conn}, err
++      return err
++}
++
++func canRetry(err os.Error) bool {
++      // not an error?  can't retry.
++      if err == nil {
++              return false
++      }
++      oe, ok := err.(*net.OpError)
++      if ok {
++              if oe.Error == os.ECONNREFUSED {
++                      return true
++              }
++      }
++      return false
+ }
+ // Write sends a log message to the syslog daemon.
+-func (w *Writer) Write(b []byte) (int, os.Error) {
+-      if w.priority > LOG_DEBUG || w.priority < LOG_EMERG {
+-              return 0, os.EINVAL
++func (w *Writer) Write(b []byte) (bout int, err os.Error) {
++      if w.conn == nil {
++              err = w.Reconnect()
++              if err != nil {
++                      return 0, err
++              }
+       }
+-      return w.conn.writeBytes(w.priority, w.prefix, b)
++      retried := false
++retry:
++      bout, err = w.conn.writeBytes(w.priority, w.prefix, b)
++
++      if canRetry(err) && !retried {
++              retried = true
++              err = w.Reconnect()
++              if err != nil {
++                      return 0, err
++              }
++              goto retry
++      }
++      return bout, err
+ }
+-func (w *Writer) writeString(p Priority, s string) (int, os.Error) {
+-      return w.conn.writeString(p, w.prefix, s)
++func (w *Writer) writeString(p Priority, s string) (bout int, err os.Error) {
++      msgpriority := splicePriority(w.priority, p)
++      if w.conn == nil {
++              err := w.Reconnect()
++              if err != nil {
++                      return 0, err
++              }
++      }
++      retried := false
++retry:
++      bout, err = w.conn.writeString(msgpriority, w.prefix, s)
++      if canRetry(err) && !retried {
++              log.Printf("Retrying: %s", err)
++              if err == os.ECONNREFUSED {
++                      log.Printf("Hit!")
++              }
++              retried = true
++              err = w.Reconnect()
++              if err != nil {
++                      return 0, err
++              }
++              goto retry
++      }
++      return bout, err
+ }
+-func (w *Writer) Close() os.Error { return w.conn.close() }
++func (w *Writer) Close() os.Error {
++      if w.conn != nil {
++              return w.conn.close()
++      }
++      return nil
++}
+ // Emerg logs a message using the LOG_EMERG priority.
+ func (w *Writer) Emerg(m string) (err os.Error) {
+@@ -124,15 +233,15 @@
+       return err
+ }
+-func (n netConn) writeBytes(p Priority, prefix string, b []byte) (int, os.Error) {
+-      return fmt.Fprintf(n.conn, "<%d>%s: %s\n", p, prefix, b)
++func (n *netConn) writeBytes(p Priority, prefix string, b []byte) (int, os.Error) {
++      return fmt.Fprintf(n.conn, "<%d>%s: %s\n", p&0xFF, prefix, b)
+ }
+-func (n netConn) writeString(p Priority, prefix string, s string) (int, os.Error) {
+-      return fmt.Fprintf(n.conn, "<%d>%s: %s\n", p, prefix, s)
++func (n *netConn) writeString(p Priority, prefix string, s string) (int, os.Error) {
++      return fmt.Fprintf(n.conn, "<%d>%s: %s\n", p&0xFF, prefix, s)
+ }
+-func (n netConn) close() os.Error {
++func (n *netConn) close() os.Error {
+       return n.conn.Close()
+ }
+diff -r b0819469a6df src/pkg/syslog/syslog_test.go
+--- a/src/pkg/syslog/syslog_test.go    Thu Sep 08 12:16:42 2011 +1000
++++ b/src/pkg/syslog/syslog_test.go    Tue Sep 20 11:29:08 2011 +1000
+@@ -7,11 +7,14 @@
+       "io"
+       "log"
+       "net"
++      "os"
+       "testing"
+ )
+ var serverAddr string
++const testSocketAddress = "/tmp/fakelog"
++
+ func runSyslog(c net.PacketConn, done chan<- string) {
+       var buf [4096]byte
+       var rcvd string = ""
+@@ -22,10 +25,12 @@
+               }
+               rcvd += string(buf[0:n])
+       }
++
++      c.Close()
+       done <- rcvd
+ }
+-func startServer(done chan<- string) {
++func startUDPServer(done chan<- string) {
+       c, e := net.ListenPacket("udp", "127.0.0.1:0")
+       if e != nil {
+               log.Fatalf("net.ListenPacket failed udp :0 %v", e)
+@@ -35,6 +40,39 @@
+       go runSyslog(c, done)
+ }
++func runUDPSyslog(c *net.UDPConn, done chan<- string) {
++      var buf [4096]byte
++      var rcvd string = ""
++      for {
++              n, err := c.Read(buf[0:])
++              if err != nil || n == 0 {
++                      break
++              }
++              rcvd += string(buf[0:n])
++      }
++
++      c.Close()
++      done <- rcvd
++}
++
++func startUnixServer(done chan<- string) (ready <-chan int) {
++      syslogready := make(chan int, 1)
++      os.Remove(testSocketAddress)
++      uaddr, e := net.ResolveUnixAddr("unixgram", testSocketAddress)
++      if e != nil {
++              log.Fatalf("net.ResolveUnixAddr failed: %v", e)
++      }
++      c, e := net.ListenUnixgram("unixgram", uaddr)
++      if e != nil {
++              log.Fatalf("net.ListenPacket failed unixgram /tmp/fakelog %v", e)
++      }
++
++      c.SetReadTimeout(1000e6) // 100ms
++      go runUDPSyslog(c, done)
++      syslogready <- 1
++      return syslogready
++}
++
+ func skipNetTest(t *testing.T) bool {
+       if testing.Short() {
+               // Depends on syslog daemon running, and sometimes it's not.
+@@ -44,6 +82,57 @@
+       return false
+ }
++func TestFakeUnixSyslog(t *testing.T) {
++      done := make(chan string)
++      startUnixServer(done)
++      s, err := Dial("unixgram", "/tmp/fakelog", LOG_INFO|LOG_LOCAL0, "syslog_test")
++      if err != nil {
++              t.Fatalf("Dial() failed: %s", err)
++      }
++      err = s.Debug("Moo")
++      if err != nil {
++              t.Fatalf("s.Debug() failed: %s", err)
++      }
++      expected := "<135>syslog_test: Moo\n"
++      rcvd := <-done
++      if rcvd != expected {
++              t.Fatalf("s.Debug() = '%q', but wanted '%q'", rcvd, expected)
++      }
++      s.Close()
++}
++
++func TestFlap(t *testing.T) {
++      done := make(chan string)
++      startUnixServer(done)
++      s, err := Dial("unixgram", "/tmp/fakelog", LOG_INFO|LOG_LOCAL0, "syslog_test")
++      if err != nil {
++              t.Fatalf("Dial() failed: %s", err)
++      }
++      err = s.Debug("Moo")
++      if err != nil {
++              t.Fatalf("s.Debug() failed: %s", err)
++      }
++      expected := "<135>syslog_test: Moo\n"
++      rcvd := <-done
++      if rcvd != expected {
++              t.Fatalf("s.Debug() = '%q', but wanted '%q'", rcvd, expected)
++      }
++      // restart the server.
++      <-startUnixServer(done)
++
++      // and try retransmitting.
++      err = s.Debug("Re-Moo")
++      if err != nil {
++              t.Fatalf("s.Debug() failed: %s", err)
++      }
++      expected = "<135>syslog_test: Re-Moo\n"
++      rcvd = <-done
++      if rcvd != expected {
++              t.Fatalf("s.Info() = '%q', but wanted '%q'", rcvd, expected)
++      }
++      s.Close()
++}
++
+ func TestNew(t *testing.T) {
+       if skipNetTest(t) {
+               return
+@@ -79,8 +168,10 @@
+ func TestUDPDial(t *testing.T) {
+       done := make(chan string)
+-      startServer(done)
+-      l, err := Dial("udp", serverAddr, LOG_INFO, "syslog_test")
++      startUDPServer(done)
++      // it's important that the Dial priority is different to the
++      // actual message sent - this tests priority splicing.
++      l, err := Dial("udp", serverAddr, LOG_DEBUG, "syslog_test")
+       if err != nil {
+               t.Fatalf("syslog.Dial() failed: %s", err)
+       }
+@@ -95,7 +186,7 @@
+ func TestWrite(t *testing.T) {
+       done := make(chan string)
+-      startServer(done)
++      startUDPServer(done)
+       l, err := Dial("udp", serverAddr, LOG_ERR, "syslog_test")
+       if err != nil {
+               t.Fatalf("syslog.Dial() failed: %s", err)
+diff -r b0819469a6df src/pkg/syslog/syslog_unix.go
+--- a/src/pkg/syslog/syslog_unix.go    Thu Sep 08 12:16:42 2011 +1000
++++ b/src/pkg/syslog/syslog_unix.go    Tue Sep 20 11:29:08 2011 +1000
+@@ -23,7 +23,7 @@
+                       if err != nil {
+                               continue
+                       } else {
+-                              return netConn{conn}, nil
++                              return &netConn{conn}, nil
+                       }
+               }
+       }
diff --git a/samples/conductor.conf b/samples/conductor.conf
new file mode 100644 (file)
index 0000000..255deb7
--- /dev/null
@@ -0,0 +1,29 @@
+###
+### conductor.conf
+###
+### System-wide configuration for the conductor
+###
+
+### Specify the TLS private key and certificate
+# x509 private key = /etc/orchestra/conductor_key.pem
+# x509 certificate = /etc/orchestra/conductor_crt.pem
+
+### Specify the trusted CAs for clients.
+###
+### this is a ':' delimited list of pem files.
+# ca certificates =
+
+### Manually specify the conductor's host name.
+# server name = 
+
+### Override the server's bind address
+# bind address =
+
+### Set the path for the audience socket.
+# audience socket path = /var/spool/orchestra/conductor.sock
+
+### Set the directory to store the conductor's state in
+# conductor state path = /var/spool/orchestra
+
+### Set the path of the authorised players file.
+# player file path = /etc/orchestra/players
diff --git a/samples/player.conf b/samples/player.conf
new file mode 100644 (file)
index 0000000..83bc9fa
--- /dev/null
@@ -0,0 +1,31 @@
+###
+### player.conf
+###
+### System-wide configuration for the player
+###
+
+### Specify the TLS private key and certificate
+# x509 private key = /etc/orchestra/player_key.pem
+# x509 certificate = /etc/orchestra/player_crt.pem
+
+### Specify the trusted CAs of your conductor/s
+###
+### this is a ':' delimited list of pem files.
+# ca certificates =
+
+### Manually specify the player's name (typically host name).
+###
+### This must match the CN in the certificates or else the conductor
+### will reject the connection
+# player name = 
+
+### The host name of the conductor to connect to.  This should
+### match the conductor's certificate CN.
+# master = conductor
+
+### Path to the scores directory
+###
+### The specified directory will be searched for executables and
+### matching configuration files.  These will be made available as
+### scores.
+# score directory = /usr/lib/orchestra/scores
diff --git a/samples/players b/samples/players
new file mode 100644 (file)
index 0000000..25d1a09
--- /dev/null
@@ -0,0 +1,6 @@
+# /etc/conductor/players
+#
+# In this file you list the name of all servers you list the name of
+# all servers you're willing to arbitrate jobs for.
+#
+# empty lines and lines starting with '#' are ignored.
\ No newline at end of file
diff --git a/src/conductor/Makefile b/src/conductor/Makefile
new file mode 100644 (file)
index 0000000..55f9b6a
--- /dev/null
@@ -0,0 +1,36 @@
+include $(GOROOT)/src/Make.inc
+
+DEPS=../orchestra
+
+TARG=conductor
+#GCIMPORTS=-I../pkg/_obj
+#LDIMPORTS=-L../pkg/_obj
+
+GOFILES=\
+       conductor.go\
+       dispatch.go\
+       server.go\
+       http.go\
+       registry.go\
+       client.go\
+       signal.go\
+       config.go\
+       audience.go\
+       job_scope.go\
+       job_state.go\
+       resp_state.go\
+       task_state.go\
+       job_request.go\
+       task_response.go\
+
+
+include $(GOROOT)/src/Make.cmd
+
+testkey:       conductor_crt.pem conductor_key.pem
+
+conductor_key.pem:
+       openssl genrsa -out conductor_key.pem
+
+HOSTNAME=$(shell hostname --fqdn)
+conductor_crt.pem:     conductor_key.pem
+       openssl req -new -x509 -key $< -outform pem -days 365 -out $@ -subj "/C=AU/ST=New South Wales/L=Sydney/CN=$(HOSTNAME)/"
diff --git a/src/conductor/audience.go b/src/conductor/audience.go
new file mode 100644 (file)
index 0000000..5aaf95f
--- /dev/null
@@ -0,0 +1,212 @@
+/* audience.go
+*/
+
+package main
+
+import (
+       "io"
+       "json"
+       "net"
+       "os"
+       o "orchestra"
+       "strings"
+)
+
+type GenericJsonRequest struct {
+       Op              *string         `json:"op"`
+       Score           *string         `json:"score"`
+       Players         []string        `json:"players"`
+       Scope           *JobScope       `json:"scope"`
+       Params          map[string]string       `json:"params"`
+       Id              *uint64         `json:"id"`
+}
+
+type JsonPlayerStatus struct {
+       Status          ResponseState           `json:"status"`
+       Response        map[string]string       `json:"response"`
+}
+
+type JsonStatusResponse struct {
+       Status          JobState                        `json:"status"`
+       Players         map[string]*JsonPlayerStatus    `json:"players"`
+}
+
+func NewJsonStatusResponse() (jsr *JsonStatusResponse) {
+       jsr = new(JsonStatusResponse)
+       jsr.Players = make(map[string]*JsonPlayerStatus)
+       
+       return jsr
+}
+
+func NewJsonPlayerStatus() (jps *JsonPlayerStatus) {
+       jps = new(JsonPlayerStatus)
+       jps.Response = make(map[string]string)
+
+       return jps      
+}
+
+func handleAudienceRequest(c net.Conn) {
+       defer c.Close()
+
+       c.SetTimeout(0)
+       r, _ := c.(io.Reader)
+       w, _ := c.(io.Writer)
+       dec := json.NewDecoder(r)
+       enc := json.NewEncoder(w)
+
+       outobj := new(GenericJsonRequest)
+       err := dec.Decode(outobj)
+       if err != nil {
+               o.Warn("Error decoding JSON talking to audience: %s", err)
+               return
+       }
+
+       if nil == outobj.Op {
+               o.Warn("Malformed JSON message talking to audience.  Missing Op")
+               return
+       }
+       switch *(outobj.Op) {
+       case "status":
+               if nil == outobj.Id {
+                       o.Warn("Malformed Status message talking to audience. Missing Job ID")
+                       return
+               }
+               job := JobGet(*outobj.Id)
+               jresp := new([2]interface{})
+               if nil != job {
+                       jresp[0] = "OK"
+                       iresp := NewJsonStatusResponse()
+                       iresp.Status = job.State
+                       resnames := JobGetResultNames(*outobj.Id)
+                       for i := range resnames {
+                               tr := JobGetResult(*outobj.Id, resnames[i])
+                               if nil != tr {
+                                       presp := NewJsonPlayerStatus()
+                                       presp.Status = tr.State
+                                       for k,v:=range(tr.Response) {
+                                               presp.Response[k] = v
+                                       }
+                                       iresp.Players[resnames[i]] = presp
+                               }
+               
+                       }
+                       jresp[1] = iresp
+               } else {
+                       jresp[0] = "Error"
+                       jresp[1] = nil
+               }
+               enc.Encode(jresp)
+               o.Debug("Status...")
+       case "queue":
+               if nil == outobj.Score {
+                       o.Warn("Malformed Queue message talking to audience. Missing Score")
+                       sendQueueFailureResponse("Missing Score", enc)
+                       return
+               }
+               if nil == outobj.Scope {
+                       o.Warn("Malformed Queue message talking to audience. Missing Scope")
+                       sendQueueFailureResponse("Missing Scope", enc)
+                       return
+               }
+               if nil == outobj.Players || len(outobj.Players) < 1 {
+                       o.Warn("Malformed Queue message talking to audience. Missing Players")
+                       sendQueueFailureResponse("Missing Players", enc)
+                       return
+               }
+               for _, player := range outobj.Players {
+                       if !HostAuthorised(player) {
+                               o.Warn("Malformed Queue message - unknown player %s specified.", player)
+                               sendQueueFailureResponse("Invalid Player", enc)
+                               return
+                       }
+               }
+               job := NewRequest()
+               job.Score = *outobj.Score
+               job.Scope = *outobj.Scope
+               job.Players = outobj.Players
+               job.Params = outobj.Params
+
+               QueueJob(job)
+               sendQueueSuccessResponse(job, enc)
+       default:
+               o.Warn("Unknown operation talking to audience: \"%s\"", *(outobj.Op))
+               return
+       }
+
+       _ = enc
+}
+
+func sendQueueSuccessResponse(job *JobRequest, enc *json.Encoder) {
+       resp := make([]interface{},2)
+       resperr := new(string)
+       *resperr = "OK"
+       resp[0] = resperr
+
+       // this probably looks odd, but all numbers cross through float64 when being json encoded.  d'oh!
+       jobid := new(uint64)
+       *jobid = uint64(job.Id)
+       resp[1] = jobid
+
+       err := enc.Encode(resp)
+       if nil != err {
+               o.Warn("Couldn't encode response to audience: %s", err)
+       }
+}
+
+func sendQueueFailureResponse(reason string, enc *json.Encoder) {
+       resp := make([]interface{},2)
+       resperr := new(string)
+       *resperr = "Error"
+       resp[0] = resperr
+       if reason != "" {
+               resp[1] = &reason
+       }
+       err := enc.Encode(resp)
+       if nil != err {
+               o.Warn("Couldn't encode response to audience: %s", err)
+       }
+}
+
+func AudienceListener(l net.Listener) {
+       for {
+               c, err := l.Accept()
+               if err != nil {
+                       o.Warn("Accept() failed on Audience Listenter.")
+                       break
+               }
+               go handleAudienceRequest(c)
+       }
+}
+
+func UnixAudienceListener(sockaddr string) {
+       fi, err := os.Stat(sockaddr)
+       if err == nil {
+               if fi.IsSocket() {
+                       o.Warn("Removing stale socket at %s", sockaddr)
+                       os.Remove(sockaddr)
+               } else {
+                       o.Fail("%s exists and is not a socket", sockaddr)
+               }
+       }
+       laddr, err := net.ResolveUnixAddr("unix", sockaddr)
+       o.MightFail(err, "Couldn't resolve audience socket address")
+       l, err := net.ListenUnix("unix", laddr)
+       o.MightFail(err, "Couldn't start audience unixsock listener")
+       // Fudge the permissions on the unixsock!
+       fi, err = os.Stat(sockaddr)
+       if err == nil {
+               os.Chmod(sockaddr, fi.Mode | 0777)
+       } else {
+               o.Warn("Couldn't fudge permission on audience socket: %s", err)
+       }
+       
+       // make sure we clean up the unix socket when we die.
+       defer l.Close()
+       defer os.Remove(sockaddr)
+       AudienceListener(l)     
+}
+
+func StartAudienceSock() {
+       audienceSockPath := strings.TrimSpace(GetStringOpt("audience socket path"))
+       go UnixAudienceListener(audienceSockPath)
+}
\ No newline at end of file
diff --git a/src/conductor/client.go b/src/conductor/client.go
new file mode 100644 (file)
index 0000000..9dcf2e8
--- /dev/null
@@ -0,0 +1,393 @@
+/* client.go
+ *
+ * Client Handling
+*/
+
+package main
+import (
+       o "orchestra"
+       "net"
+       "time"
+       "os"
+       "crypto/tls"
+       "crypto/x509"
+)
+
+const (
+       KeepaliveDelay =        200e9 // once every 200 seconds.
+       RetryDelay     =        10e9 // retry every 10 seconds.  Must be smaller than the keepalive to avoid channel race.
+       OutputQueueDepth =      10 // This needs to be large enough that we don't deadlock on ourself.
+)
+
+
+type ClientInfo struct {
+       Player          string
+       PktOutQ         chan *o.WirePkt
+       PktInQ          chan *o.WirePkt
+       abortQ          chan int
+       TaskQ           chan *TaskRequest
+       connection      net.Conn
+       pendingTasks    map[uint64]*TaskRequest
+}
+
+func NewClientInfo() (client *ClientInfo) {
+       client = new(ClientInfo)
+       client.abortQ = make(chan int, 2)
+       client.PktOutQ = make(chan *o.WirePkt, OutputQueueDepth)
+       client.PktInQ = make(chan *o.WirePkt)
+       client.TaskQ = make(chan *TaskRequest)
+
+       return client
+}
+
+func (client *ClientInfo) Abort() {
+       PlayerDied(client)
+       reg := ClientGet(client.Player)
+       if reg != nil {
+               reg.Disassociate()
+       }
+       client.abortQ <- 1;
+}
+
+func (client *ClientInfo) Name() (name string) {
+       if client.Player == "" {
+               return "UNK:" + client.connection.RemoteAddr().String()
+       }
+       return client.Player
+}
+
+// Must not be used from inside of handlers.
+func (client *ClientInfo) Send(p *o.WirePkt) {
+       client.PktOutQ <- p
+}
+
+// Can only be used form inside of handlers and the main client loop.
+func (client *ClientInfo) sendNow(p *o.WirePkt) {
+       _, err := p.Send(client.connection)
+       if err != nil {
+               o.Warn("Error sending pkt to %s: %s.  Terminating connection.", client.Name(), err)
+               client.Abort()
+       }
+}
+
+func (client *ClientInfo) SendTask(task *TaskRequest) {
+       tr := task.Encode()
+       p, err := o.Encode(tr)
+       o.MightFail(err, "Couldn't encode task for client.")
+       client.Send(p)
+       task.RetryTime = time.Nanoseconds() + RetryDelay
+}
+
+func (client *ClientInfo) GotTask(task *TaskRequest) {
+       /* first up, look at the task state */
+       switch (task.State) {
+       case TASK_QUEUED:
+               fallthrough
+       case TASK_PENDINGRESULT:
+               /* this is a new task.  We should send it straight */
+               task.Player = client.Player
+               task.State = TASK_PENDINGRESULT
+               client.pendingTasks[task.job.Id] = task
+               client.SendTask(task)
+               // request a update to the spool so the PENDING flag is stored.
+               JobWriteUpdate(task.job.Id)
+       case TASK_FINISHED:
+               /* discard.  We don't care about tasks that are done. */                
+       }
+}
+
+// reset the task state so it can be requeued.
+func CleanTask(task *TaskRequest) {
+       task.State = TASK_QUEUED
+       task.Player = ""
+}
+
+// this merges the state from the registry record into the client it's called against.
+// it also copies back the active communication channels to the registry record.
+func (client *ClientInfo) MergeState(regrecord *ClientInfo) {
+       client.Player = regrecord.Player
+       client.pendingTasks = regrecord.pendingTasks
+
+       regrecord.TaskQ = client.TaskQ
+       regrecord.abortQ = client.abortQ
+       regrecord.PktOutQ = client.PktOutQ
+       regrecord.PktInQ = client.PktInQ
+       regrecord.connection = client.connection
+}
+
+// Sever the connection state from the client (used against registry records only)
+func (client *ClientInfo) Disassociate() {
+       client.TaskQ = nil
+       client.abortQ = nil
+       client.PktInQ = nil
+       client.PktOutQ = nil
+       client.connection = nil
+}
+
+func handleNop(client *ClientInfo, message interface{}) {
+       o.Debug("Client %s: NOP Received", client.Name())
+}
+
+func handleIdentify(client *ClientInfo, message interface{}) {
+       if client.Player != "" {
+               o.Warn("Client %s: Tried to reintroduce itself. Terminating Connection.", client.Name())
+               client.Abort()
+               return
+       }
+       ic, _ := message.(*o.IdentifyClient)
+       o.Info("Client %s: Identified Itself As \"%s\"", client.Name(), *ic.Hostname)
+       client.Player = *ic.Hostname
+       if (!HostAuthorised(client.Player)) {
+               o.Warn("Client %s: Not Authorised.  Terminating Connection.", client.Name())
+               client.Abort()
+               return
+       }
+
+       /* if we're TLS, verify the client's certificate given the name it used */
+       tlsc, ok := client.connection.(*tls.Conn)
+       if ok && !*DontVerifyPeer {
+               intermediates := x509.NewCertPool()
+
+               o.Debug("Connection is TLS.")
+               o.Debug("Checking Connection State")
+               cs := tlsc.ConnectionState()
+               vo := x509.VerifyOptions{
+               Roots: CACertPool,
+               Intermediates: intermediates,
+               DNSName: client.Player,
+               }
+               if cs.PeerCertificates == nil || cs.PeerCertificates[0] == nil {
+                       o.Warn("Peer didn't provide a certificate. Aborting Connection.")
+                       client.Abort()
+                       return
+               }
+               // load any intermediate certificates from the chain
+               // into the intermediates pool so we can verify that
+               // the chain can be rebuilt.
+               //
+               // All we care is that we can reach an authorised CA.
+               //
+               //FIXME: Need CRL handling.
+               if len(cs.PeerCertificates) > 1 {
+                       for i := 1; i < len(cs.PeerCertificates); i++ {
+                               intermediates.AddCert(cs.PeerCertificates[i])
+                       }
+               }
+               _, err := cs.PeerCertificates[0].Verify(vo)
+               if err != nil {
+                       o.Warn("couldn't verify client certificate: %s", err)
+                       client.Abort()
+                       return
+               }
+       }
+       reg := ClientGet(client.Player)
+       if nil == reg {
+               o.Warn("Couldn't register client %s.  aborting connection.", client.Name())
+               client.Abort()
+               return
+       }
+       client.MergeState(reg)
+}
+
+func handleReadyForTask(client *ClientInfo, message interface{}) {
+       o.Debug("Client %s: Asked for Job", client.Name())
+       PlayerWaitingForJob(client)
+}
+
+func handleIllegal(client *ClientInfo, message interface{}) {
+       o.Warn("Client %s: Sent Illegal Message")
+       client.Abort()
+}
+
+func handleResult(client *ClientInfo, message interface{}){
+       jr, _ := message.(*o.ProtoTaskResponse)
+       r := ResponseFromProto(jr)
+       // at this point in time, we only care about terminal
+       // condition codes.  a Job that isn't finished is just
+       // prodding us back to let us know it lives.
+       if r.IsFinished() {
+               job := JobGet(r.id)
+               if nil == job {
+                       o.Warn("Client %s: NAcking for Job %d - couldn't find job data.", client.Name(), r.id)
+                       nack := o.MakeNack(r.id)
+                       client.sendNow(nack)
+               } else {
+                       job := JobGet(r.id)
+                       if job != nil {
+                               o.Debug("Got Response.  Acking.")
+                               /* if the job exists, Ack it. */
+                               ack := o.MakeAck(r.id)
+                               client.sendNow(ack)
+                       }
+                       // now, we only accept the results if we were
+                       // expecting the results (ie: it was pending)
+                       // and expunge the task information from the
+                       // pending list so we stop bugging the client for it.
+                       task, exists := client.pendingTasks[r.id]
+                       if exists {
+                               o.Debug("Storing results for Job %d", r.id)
+                               // store the result.
+                               if !JobAddResult(client.Player, r) {
+                                       o.Assert("Couldn't add result for pending task")
+                               }
+
+                               // next, work out if the job is a retryable failure or not
+                               var didretry bool = false
+
+                               if r.DidFail() {
+                                       o.Info("Client %s reports failure for Job %d", client.Name(), r.id)
+                                       if r.CanRetry() {
+                                               job := JobGet(r.id)
+                                               if job.Scope == SCOPE_ONEOF {
+                                                       // right, we're finally deep enough to work out what's going on!
+                                                       JobDisqualifyPlayer(r.id, client.Player)
+                                                       if len(job.Players) >= 1 {
+                                                               // still players left we can try?  then go for it!
+                                                               CleanTask(task)
+                                                               DispatchTask(task)
+                                                               didretry = true
+                                                       }
+                                               }
+                                       }
+                               }
+                               if !didretry {
+                                       // if we didn't retry, the task needs to be marked as finished.
+                                       task.State = TASK_FINISHED
+                               }
+                               // update the job state.
+                               JobReviewState(r.id)
+
+                               client.pendingTasks[r.id] = nil, false
+                       }
+               }
+       }
+}
+
+
+var dispatcher = map[uint8] func(*ClientInfo,interface{}) {
+       o.TypeNop:              handleNop,
+       o.TypeIdentifyClient:   handleIdentify,
+       o.TypeReadyForTask:     handleReadyForTask,
+       o.TypeTaskResponse:     handleResult,
+       /* C->P only messages, should never appear on the wire. */
+       o.TypeTaskRequest:      handleIllegal,
+
+}
+
+var loopFudge int64 = 10e6; /* 10 ms should be enough fudgefactor */
+func clientLogic(client *ClientInfo) {
+       loop := true
+       for loop {
+               var     retryWait <-chan int64 = nil
+               var     retryTask *TaskRequest = nil
+               if (client.Player != "") {
+                       var waitTime int64 = 0
+                       var now int64 = 0
+                       cleanPass := false
+                       attempts := 0
+                       for !cleanPass && attempts < 10 {
+                               /* reset our state for the pass */
+                               waitTime = 0
+                               retryTask = nil
+                               attempts++
+                               cleanPass = true
+                               now = time.Nanoseconds() + loopFudge
+                               // if the client is correctly associated,
+                               // evaluate all jobs for outstanding retries,
+                               // and work out when our next retry is due.
+                               for _,v := range client.pendingTasks {
+                                       if v.RetryTime < now {
+                                               client.SendTask(v)
+                                               cleanPass = false
+                                       } else {
+                                               if waitTime == 0 || v.RetryTime < waitTime {
+                                                       retryTask = v
+                                                       waitTime = v.RetryTime
+                                               }
+                                       }
+                               }
+                       }
+                       if (attempts > 10) {
+                               o.Fail("Couldn't find next timeout without restarting excessively.")
+                       }
+                       if (retryTask != nil) {
+                               retryWait = time.After(waitTime-time.Nanoseconds())
+                       }
+               }
+               select {
+               case <-retryWait:
+                       client.SendTask(retryTask)
+               case p := <-client.PktInQ:
+                       /* we've received a packet.  do something with it. */
+                       if client.Player == "" && p.Type != o.TypeIdentifyClient {
+                               o.Warn("Client %s didn't Identify self - got type %d instead!  Terminating Connection.", client.Name(), p.Type)
+                               client.Abort()
+                               break
+                       }
+                       var upkt interface {} = nil
+                       if p.Length > 0 {
+                               var err os.Error
+
+                               upkt, err = p.Decode()
+                               if err != nil {
+                                       o.Warn("Error unmarshalling message from Client %s: %s.  Terminating Connection.", client.Name(), err)
+                                       client.Abort()
+                                       break
+                               }
+                       }
+                       handler, exists := dispatcher[p.Type]
+                       if (exists) {
+                               handler(client, upkt)
+                       } else {
+                               o.Warn("Unhandled Pkt Type %d", p.Type)
+                       }
+               case p := <-client.PktOutQ:
+                       if p != nil {
+                               client.sendNow(p)
+                       }
+               case t := <-client.TaskQ:
+                       client.GotTask(t)
+               case <-client.abortQ:
+                       o.Debug("Client %s connection has been told to abort!", client.Name())
+                       loop = false
+               case <-time.After(KeepaliveDelay):
+                       p := o.MakeNop()
+                       o.Debug("Sending Keepalive to %s", client.Name())
+                       _, err := p.Send(client.connection)
+                       if err != nil {
+                               o.Warn("Error sending pkt to %s: %s.  Terminating Connection.", client.Name(), err)     
+                               client.Abort()
+                       }
+               }
+       }
+       client.connection.Close()
+}
+
+func clientReceiver(client *ClientInfo) {
+       conn := client.connection
+
+
+       loop := true
+       for loop {
+               pkt, err := o.Receive(conn)
+               if nil != err {
+                       o.Warn("Error receiving pkt from %s: %s", conn.RemoteAddr().String(), err)
+                       client.Abort()
+                       client.connection.Close()
+                       loop = false
+               } else {
+                       client.PktInQ <- pkt
+               }
+       }
+       o.Debug("Client %s connection reader has exited it's loop!", conn.RemoteAddr().String())
+}
+
+/* The Main Server loop calls this method to hand off connections to us */
+func HandleConnection(conn net.Conn) {
+       /* this is a temporary client info, we substitute it for the real
+        * one once we ID the connection correctly */
+       c := NewClientInfo()
+       c.connection = conn
+       go clientReceiver(c)
+       go clientLogic(c)
+}
diff --git a/src/conductor/conductor.go b/src/conductor/conductor.go
new file mode 100644 (file)
index 0000000..61e6245
--- /dev/null
@@ -0,0 +1,51 @@
+/* conductor.go
+*/
+
+package main
+
+import (
+       "flag"
+       "os"
+       o       "orchestra"
+)
+
+var (
+       ConfigFile = flag.String("config-file", "/etc/orchestra/conductor.conf", "File containing the conductor configuration")
+       DontVerifyPeer          = flag.Bool("dont-verify-peer", false, "Ignore TLS verification for the peer")
+       // this is used a lot for marshalling. I just can't stuff it
+       // anywhere else.
+       InvalidValueError = os.NewError("Invalid value")
+)
+
+
+func main() {
+       o.SetLogName("conductor")
+
+       // parse command line options.
+       flag.Parse()
+
+       // Start the client registry - configuration parsing will block indefinately
+       // if the registry listener isn't working
+       StartRegistry()
+
+       // do an initial configuration load - must happen before
+       // MakeSpoolDir and InitDispatch()
+       ConfigLoad()
+
+       // Build the Spool Tree if necessary
+       MakeSpoolDir()
+
+       // Load any old state we had.
+       LoadState()
+       defer SaveState()
+
+       // start the master dispatch system
+       InitDispatch()
+
+       // start the status listener
+       StartHTTP()
+       // start the audience listener
+       StartAudienceSock()
+       // service TLS requests.
+       ServiceRequests()
+}
diff --git a/src/conductor/config.go b/src/conductor/config.go
new file mode 100644 (file)
index 0000000..1a32da1
--- /dev/null
@@ -0,0 +1,106 @@
+package main
+
+import (
+       "os"
+       "bufio"
+       o "orchestra"
+       "strings"
+       "github.com/kuroneko/configureit"
+)
+
+var configFile *configureit.Config = configureit.New()
+
+func init() {
+       configFile.Add("x509 certificate", configureit.NewStringOption("/etc/orchestra/conductor_crt.pem"))
+       configFile.Add("x509 private key", configureit.NewStringOption("/etc/orchestra/conductor_key.pem"))
+       configFile.Add("ca certificates", configureit.NewPathListOption(nil))
+       configFile.Add("bind address", configureit.NewStringOption(""))
+       configFile.Add("server name", configureit.NewStringOption(""))
+       configFile.Add("audience socket path", configureit.NewStringOption("/var/spool/orchestra/conductor.sock"))
+       configFile.Add("conductor state path", configureit.NewStringOption("/var/spool/orchestra"))
+       configFile.Add("player file path", configureit.NewStringOption("/etc/orchestra/players"))
+}
+
+func GetStringOpt(key string) string {
+       cnode := configFile.Get(key)
+       if cnode == nil {
+               o.Assert("tried to get a configuration option that doesn't exist.")
+       }
+       sopt, ok := cnode.(*configureit.StringOption)
+       if !ok {
+               o.Assert("tried to get a non-string configuration option with GetStringOpt")
+       }
+       return strings.TrimSpace(sopt.Value)
+}
+
+
+func GetCACertList() []string {
+       cnode := configFile.Get("ca certificates")
+       if cnode == nil {
+               o.Assert("tried to get a configuration option that doesn't exist.")
+       }
+       plopt, _ := cnode.(*configureit.PathListOption)
+       return plopt.Values
+}
+
+func ConfigLoad() {
+       // attempt to open the configuration file.
+       fh, err := os.Open(*ConfigFile)
+       if nil == err {
+               defer fh.Close()
+               // reset the config File data, then reload it.
+               configFile.Reset()
+               ierr := configFile.Read(fh, 1)
+               o.MightFail(ierr, "Couldn't parse configuration")
+       } else {
+               o.Warn("Couldn't open configuration file: %s.  Proceeding anyway.", err)
+       }
+
+       playerpath := strings.TrimSpace(GetStringOpt("player file path"))
+       pfh, err := os.Open(playerpath)
+       o.MightFail(err, "Couldn't open \"%s\"", playerpath)
+
+       pbr := bufio.NewReader(pfh)
+
+       ahmap := make(map[string]bool)
+       for err = nil; err == nil; {
+               var lb          []byte
+               var prefix      bool
+
+               lb, prefix, err = pbr.ReadLine()
+
+               if nil == lb {
+                       break;
+               }
+               if prefix {
+                       o.Fail("ConfigLoad: Short Read (prefix only)!")
+               }
+               
+               line := strings.TrimSpace(string(lb))
+               if line == "" {
+                       continue;
+               }
+               if line[0] == '#' {
+                       continue;
+               }
+               ahmap[line] = true
+       }
+       // convert newAuthorisedHosts to a slice
+       authorisedHosts := make([]string, len(ahmap))
+       idx := 0
+       for k,_ := range ahmap {
+               authorisedHosts[idx] = k
+               idx++
+       }
+       ClientUpdateKnown(authorisedHosts)
+
+       // set the spool directory
+       SetSpoolDirectory(GetStringOpt("conductor state path"))
+}
+
+
+func HostAuthorised(hostname string) (r bool) {
+       /* if we haven't loaded the configuration, nobody is authorised */
+       ci := ClientGet(hostname)
+       return ci != nil
+}
diff --git a/src/conductor/dispatch.go b/src/conductor/dispatch.go
new file mode 100644 (file)
index 0000000..945c618
--- /dev/null
@@ -0,0 +1,140 @@
+/* dispatch.go
+*/
+
+package main
+
+import (
+       "container/list"
+       o "orchestra"
+)
+
+func NewRequest() (req *JobRequest) {
+       req = NewJobRequest()
+
+       return req
+}
+
+const messageBuffer = 10
+
+var newJob             = make(chan *JobRequest, messageBuffer)
+var rqTask             = make(chan *TaskRequest, messageBuffer)
+var playerIdle         = make(chan *ClientInfo, messageBuffer)
+var playerDead         = make(chan *ClientInfo, messageBuffer)
+var statusRequest      = make(chan(chan *QueueInformation))
+
+func PlayerWaitingForJob(player *ClientInfo) {
+       playerIdle <- player
+}
+
+func PlayerDied(player *ClientInfo) {
+       playerDead <- player
+}
+
+func DispatchTask(task *TaskRequest) {
+       rqTask <- task
+}
+
+type QueueInformation struct {
+       idlePlayers     []string
+       waitingTasks    int
+}
+
+func DispatchStatus() (waitingTasks int, waitingPlayers []string) {
+       r := make(chan *QueueInformation)
+
+       statusRequest <- r
+       s := <- r
+
+       return s.waitingTasks, s.idlePlayers
+}
+
+func InitDispatch() {
+       go masterDispatch(); // go!
+}
+
+func QueueJob(job *JobRequest) {
+       /* first, allocate the Job it's ID */
+       job.Id = NextRequestId()
+       /* first up, split the job up into it's tasks. */
+       job.Tasks = job.MakeTasks()
+       /* add it to the registry */
+       JobAdd(job)
+       /* an enqueue all of the tasks */
+       for i := range job.Tasks {
+               DispatchTask(job.Tasks[i])
+       }
+}
+
+func masterDispatch() {
+       pq := list.New()
+       tq := list.New()
+
+       for {
+               select {
+               case player := <-playerIdle:
+                       o.Debug("Dispatch: Player")
+                       /* first, scan to see if we have anything for this player */
+                       i := tq.Front()
+                       for {
+                               if (nil == i) {
+                                       /* Out of items! */
+                                       /* Append this player to the waiting players queue */
+                                       pq.PushBack(player)
+                                       break;
+                               }
+                               t,_ := i.Value.(*TaskRequest)
+                               if t.IsTarget(player.Player) {
+                                       /* Found a valid job. Send it to the player, and remove it from our pending 
+                                        * list */
+                                       tq.Remove(i)
+                                       player.TaskQ <- t
+                                       break;
+                               }
+                               i = i.Next()
+                       }
+               case player := <-playerDead:
+                       o.Debug("Dispatch: Dead Player")
+                       for i := pq.Front(); i != nil; i = i.Next() {
+                               p, _ := i.Value.(*ClientInfo)
+                               if player.Player == p.Player {
+                                       pq.Remove(i)
+                                       break;
+                               }
+                       }
+               case task := <-rqTask:
+                       o.Debug("Dispatch: Task")
+                       /* first, scan to see if we have valid pending player for this task */
+                       i := pq.Front()
+                       for {
+                               if (nil == i) {
+                                       /* Out of players! */
+                                       /* Append this task to the waiting tasks queue */
+                                       tq.PushBack(task)
+                                       break;
+                               }
+                               p,_ := i.Value.(*ClientInfo)
+                               if task.IsTarget(p.Player) {
+                                       /* Found it. */
+                                       pq.Remove(i)
+                                       p.TaskQ <- task
+                                       break;
+                               }
+                               i = i.Next();
+                       }
+               case respChan := <-statusRequest:
+                       o.Debug("Status!")
+                       response := new(QueueInformation)
+                       response.waitingTasks = tq.Len()
+                       pqLen := pq.Len()
+                       response.idlePlayers = make([]string, pqLen)
+                       
+                       idx := 0
+                       for i := pq.Front(); i != nil; i = i.Next() {
+                               player,_ := i.Value.(*ClientInfo)
+                               response.idlePlayers[idx] = player.Player
+                               idx++
+                       }
+                       respChan <- response
+               }
+       }
+}
diff --git a/src/conductor/http.go b/src/conductor/http.go
new file mode 100644 (file)
index 0000000..30f4109
--- /dev/null
@@ -0,0 +1,39 @@
+/* http.go
+ *
+ * HTTP status server.
+*/
+
+package main
+
+import (
+       "fmt"
+       "http"
+       "orchestra"
+)
+
+/* default ports are all in server.go */
+
+func StartHTTP() {
+       go httpServer()
+}
+
+func returnStatus(w http.ResponseWriter, r *http.Request) {
+       tasks, players := DispatchStatus()
+       fmt.Fprintf(w, "<p>Tasks Waiting: %d</p>\n", tasks)
+       fmt.Fprintf(w, "<p>Players Idle:</p>\n<ul>\n")
+       var i int
+       for i = 0; i < len(players); i++ {
+               fmt.Fprintf(w, "<li>%s</li>\n", players[i])
+       }
+       if (i == 0) {
+               fmt.Fprintf(w, "<li>none</li>")
+       }
+       fmt.Fprintf(w, "</ul>")
+}
+
+func httpServer() {
+       laddr := fmt.Sprintf(":%d", orchestra.DefaultHTTPPort)
+       http.HandleFunc("/", returnStatus)
+       http.ListenAndServe(laddr, nil)
+}
+
diff --git a/src/conductor/job_request.go b/src/conductor/job_request.go
new file mode 100644 (file)
index 0000000..971b648
--- /dev/null
@@ -0,0 +1,134 @@
+// job_request.go
+//
+
+package main
+
+import (
+       "sort"
+       "json"
+       "path"
+       "os"
+       "io"
+       o "orchestra"
+)
+
+type JobRequest struct {
+       Score           string                          `json:"score"`
+       Scope           JobScope                        `json:"scope"`
+       Players         []string                        `json:"players"`
+       Id              uint64                          `json:"id"`
+       State           JobState                        `json:"state"`          
+       Params          map[string]string               `json:"params"`
+       Tasks           []*TaskRequest                  `json:"tasks"`
+       // you need to use the registry to access these - only public for
+       // marshalling use.
+       Results         map[string]*TaskResponse        `json:"results"`
+       // private:
+
+       // Timeout for autoexpiry.  Only valid if State if
+       // job.State.Finished() is true.
+       expirytime      int64
+}
+
+func NewJobRequest() (req *JobRequest) {
+       req = new(JobRequest)
+       req.Results = make(map[string]*TaskResponse)
+       return req
+}
+
+func JobRequestFromReader(src io.Reader) (req *JobRequest, err os.Error) {
+       req = NewJobRequest()
+       jdec := json.NewDecoder(src)
+
+       err = jdec.Decode(req)
+       if err == nil {
+               if req.Results == nil {
+                       req.Results = make(map[string]*TaskResponse)
+               }
+       }
+
+       return req, err
+}
+
+func (req *JobRequest) normalise() {
+       if (len(req.Players) > 1) {
+               /* sort targets so search works */
+               sort.Strings(req.Players)
+       } else {
+               if (req.Scope == SCOPE_ONEOF) {
+                       req.Scope = SCOPE_ALLOF
+               }
+       }
+}
+
+func (req *JobRequest) MakeTasks() (tasks []*TaskRequest) {
+       req.normalise()
+
+       var numtasks int
+       
+       switch (req.Scope) {
+       case SCOPE_ONEOF:
+               numtasks = 1
+       case SCOPE_ALLOF:
+               numtasks = len(req.Players)
+       }
+       tasks = make([]*TaskRequest, numtasks)
+       
+       for c := 0; c < numtasks; c++ {
+               t := NewTaskRequest()
+               t.job = req
+               if (req.Scope == SCOPE_ALLOF) {
+                       t.Player = req.Players[c]
+               }
+               tasks[c] = t
+       }
+       return tasks
+}
+
+func (req *JobRequest) Valid() bool {
+       if (len(req.Players) <= 0) {
+               return false
+       }
+       return true
+}
+
+func (req *JobRequest) FilenameForSpool() string {
+       if (req.State == JOB_PENDING) {
+               return path.Join(GetSpoolDirectory(), "active", FilenameForJobId(req.Id))
+       }
+       return path.Join(GetSpoolDirectory(), "finished", FilenameForJobId(req.Id))
+}
+
+// dump the bytestream in buf into the serialisation file for req.
+func (req *JobRequest) doSerialisation(buf []byte) {
+       // first up, clean up old state.
+       UnlinkNodesForJobId(req.Id)
+       outpath := req.FilenameForSpool()
+       fh, err := os.OpenFile(outpath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600)
+       if err != nil {
+               o.Warn("Could not create persistence file %s: %s", outpath, err)
+               return
+       }
+       defer fh.Close()
+       fh.Write(buf)
+}
+
+func (req *JobRequest) UpdateInSpool()  {
+       buf, err := json.MarshalIndent(req, "", "  ")
+       o.MightFail(err, "Failed to marshal job %d", req.Id)
+       //FIXME: should try to do this out of the registry's thread.
+       req.doSerialisation(buf)
+}
+
+// deserialise the job record from the finished spool
+func LoadFromFinished(jobid uint64) (req *JobRequest, err os.Error) {
+       fpath := path.Join(GetSpoolDirectory(), "finished", FilenameForJobId(jobid))
+       fh, err := os.Open(fpath)
+       if err != nil {
+               return nil, err
+       }
+       defer fh.Close()
+       
+       req, err = JobRequestFromReader(fh)
+       return req, err
+}
\ No newline at end of file
diff --git a/src/conductor/job_scope.go b/src/conductor/job_scope.go
new file mode 100644 (file)
index 0000000..45270a8
--- /dev/null
@@ -0,0 +1,53 @@
+// job_scope.go
+
+package main
+
+import (
+       "os"
+       "json"
+)
+
+const (
+       SCOPE_INVALID           = JobScope(iota)
+       SCOPE_ONEOF
+       SCOPE_ALLOF
+)
+
+type JobScope int
+
+func (js JobScope) String() (strout string) {
+       switch js {
+       case SCOPE_ONEOF:
+               strout = "one"
+       case SCOPE_ALLOF:
+               strout = "all"
+       default:
+               strout = ""
+       }
+       return strout
+}
+
+func (js JobScope) MarshalJSON() (out []byte, err os.Error) {
+       strout := js.String()
+       if strout != "" {
+               return json.Marshal(strout)
+       }
+       return nil, InvalidValueError
+}
+
+func (js *JobScope) UnmarshalJSON(in []byte) (err os.Error) {
+       var scopestr string
+       err = json.Unmarshal(in, &scopestr)
+       if err != nil {
+               return err
+       }
+       switch scopestr {
+       case "one":
+               *js = SCOPE_ONEOF
+       case "all":
+               *js = SCOPE_ALLOF
+       default:
+               return InvalidValueError
+       }
+       return nil
+}
diff --git a/src/conductor/job_state.go b/src/conductor/job_state.go
new file mode 100644 (file)
index 0000000..171ef47
--- /dev/null
@@ -0,0 +1,76 @@
+// job_state.go
+
+package main
+
+import (
+       "os"
+       "json"
+)
+
+type JobState int
+
+const (
+       JOB_STATE_INVALID       = JobState(iota)
+       // Job is pending resolution
+       JOB_PENDING
+       // Job has completed and has no failures.
+       JOB_SUCCESSFUL
+       // Job has completed and has mixed results.
+       JOB_FAILED_PARTIAL
+       // Job has completed and has completely failed.
+       JOB_FAILED
+)
+
+func (js JobState) Finished() bool {
+       if js == JOB_PENDING {
+               return false
+       }
+       return true
+}
+
+func (js JobState) String() (strout string) {
+       switch js {
+       case JOB_PENDING:
+               strout = "PENDING"
+       case JOB_SUCCESSFUL:
+               strout = "OK"
+       case JOB_FAILED:
+               strout = "FAIL"
+       case JOB_FAILED_PARTIAL:
+               strout = "PARTIAL_FAIL"
+       default:
+               strout = ""
+       }
+       return strout
+
+}
+
+func (js JobState) MarshalJSON() (out []byte, err os.Error) {
+       strout := js.String()
+       if strout != "" {
+               return json.Marshal(strout)
+       }
+       return nil, InvalidValueError
+}
+
+func (js *JobState) UnmarshalJSON(in []byte) (err os.Error) {
+       var statestr string
+       err = json.Unmarshal(in, &statestr)
+       if err != nil {
+               return err
+       }
+       switch statestr {
+       case "PENDING":
+               *js = JOB_PENDING
+       case "OK":
+               *js = JOB_SUCCESSFUL
+       case "FAIL":
+               *js = JOB_FAILED
+       case "PARTIAL_FAIL":
+               *js = JOB_FAILED_PARTIAL
+       default:
+               return InvalidValueError
+       }
+       return nil
+}
+
diff --git a/src/conductor/persistence.go b/src/conductor/persistence.go
new file mode 100644 (file)
index 0000000..7db4d04
--- /dev/null
@@ -0,0 +1,315 @@
+// persistence.go
+//
+package main
+
+import (
+       "fmt"
+       "path"
+       "os"
+       "unicode"
+       "strconv"
+       "sync/atomic"
+       "bufio"
+       "strings"
+       o "orchestra"
+)
+
+// changing this will result in fire.  you have been warned.
+const bucketDepth = 2
+
+var spoolDirectory = ""
+
+func SetSpoolDirectory(spooldir string) {
+       if spoolDirectory == "" {
+               spoolDirectory = spooldir
+       } else {
+               if spooldir != spoolDirectory {
+                       o.Warn("Spool Directory Not Changed.")
+               }
+       }
+}
+
+func GetSpoolDirectory() string {
+       if spoolDirectory == "" {
+               o.Assert("GetSpoolDirectory() called before set")
+       }
+       return spoolDirectory
+}
+
+
+const (
+       IdCheckpointSafetySkip = 10e4  // Skip 10e4 entries if orchestra didn't shutdown cleanly for safety.
+)
+
+var lastId uint64 = 0
+
+func checkpointPath() string {
+       return path.Join(spoolDirectory, "last_id.checkpoint")
+}
+
+func savePath() string {
+       return path.Join(spoolDirectory, "last_id")
+}
+
+func loadLastId() {
+       fh, err := os.Open(checkpointPath())
+       if err == nil {
+               defer fh.Close()
+
+               // we have a checkpoint file.  blah.
+               cbio := bufio.NewReader(fh)
+               l, err := cbio.ReadString('\n')
+               lastId, err = strconv.Atoui64(strings.TrimSpace(l))
+               if err != nil {
+                       o.Fail("Couldn't read Last ID from checkpoint file.  Aborting for safety.")
+               }
+               lastId += IdCheckpointSafetySkip
+       } else {
+               pe, ok := err.(*os.PathError)
+               if !ok || pe.Error != os.ENOENT {
+                       o.Fail("Found checkpoint file, but couldn't open it: %s", err)
+               }
+               fh,err := os.Open(savePath())
+               if err != nil {
+                       pe, ok = err.(*os.PathError)
+                       if !ok || pe.Error == os.ENOENT {
+                               lastId = 0;
+                               return;
+                       }
+                       o.MightFail(err, "Couldn't open last_id file")
+               }
+               defer fh.Close()
+               cbio := bufio.NewReader(fh)
+               l, err := cbio.ReadString('\n')
+               lastId, err = strconv.Atoui64(strings.TrimSpace(l))
+               if err != nil {
+                       o.Fail("Couldn't read Last ID from last_id.  Aborting for safety.")
+               }
+       }
+       writeIdCheckpoint()
+}
+
+func writeIdCheckpoint() {
+       fh, err := os.OpenFile(checkpointPath(), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
+       if err != nil {
+               o.Warn("Failed to create checkpoint file: %s", err)
+               return
+       }
+       defer fh.Close()
+       fmt.Fprintf(fh, "%d\n", lastId)
+}
+
+func saveLastId() {
+       fh, err := os.OpenFile(savePath(), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
+       if err != nil {
+               o.Warn("Failed to create last ID save file: %s", err)
+               return
+       }
+       defer fh.Close()
+       fmt.Fprintf(fh, "%d\n", lastId)
+       os.Remove(checkpointPath())
+}
+
+func NextRequestId() uint64 {
+       //FIXME: we should do this periodically, not on every new job.
+       defer writeIdCheckpoint()
+       return atomic.AddUint64(&lastId, 1)
+}
+
+func FilenameForJobId(jobid uint64) (fullpath string) {
+       fnbits := make([]string, bucketDepth+1)
+       for i := 0; i < bucketDepth; i++ {
+               fnbits[i] = fmt.Sprintf("%01X", (jobid >> uint(i*4)) & 0xF)
+       }
+       fnbits[bucketDepth] = fmt.Sprintf("%016X", jobid)
+
+       return path.Join(fnbits...)
+}
+
+func makeSpoolDirInner(prefix string, depth int) {
+       for i := 0; i < 16; i++ {
+               dirname := path.Join(prefix, fmt.Sprintf("%01X", i))
+               if (depth == 1) {
+                       err := os.MkdirAll(dirname, 0700)
+                       o.MightFail(err, "Couldn't make directory building spool tree")
+               } else {
+                       makeSpoolDirInner(dirname, depth-1)
+               }
+       }
+}
+
+func MakeSpoolDir() {
+       makeSpoolDirInner(path.Join(spoolDirectory, "active"), bucketDepth)
+       makeSpoolDirInner(path.Join(spoolDirectory, "finished"), bucketDepth)
+       os.MkdirAll(path.Join(spoolDirectory, "corrupt"), 0700)
+}
+
+func UnlinkNodesForJobId(jobid uint64) {
+       suffix := FilenameForJobId(jobid)
+
+       os.Remove(path.Join(spoolDirectory, "active", suffix))
+       os.Remove(path.Join(spoolDirectory, "finished", suffix))
+}
+
+func shuffleToCorrupted(abspath, reason string) {
+       basename := path.Base(abspath)
+       targetname := path.Join(spoolDirectory, "corrupt", basename)
+       // make sure there's nothing in the target name.
+       os.Remove(targetname)
+       err := os.Rename(abspath, targetname)
+       o.MightFail(err, "Couldn't bin corrupt spoolfile %s", abspath)
+       o.Warn("Moved \"%s\" to corrupted spool: %s", abspath, reason)
+}
+
+func loadSpoolFiles(dirname string, depth int) {
+       dh, err := os.Open(dirname)
+       o.MightFail(err, "Couldn't open %s", dirname)
+       nodes, err := dh.Readdir(-1)
+       o.MightFail(err, "Couldn't readdir on %s", dirname)
+       if depth > 0 {
+               for _, n := range nodes {
+                       abspath := path.Join(dirname, n.Name)
+                       if n.IsDirectory() {
+                               // if not a single character, it's not a spool node.
+                               if len(n.Name) != 1 {
+                                       continue;
+                               }
+                               if n.Name == "." {
+                                       // we're not interested in .
+                                       continue;
+                               }
+                               nrunes := []int(n.Name)
+                               if unicode.Is(unicode.ASCII_Hex_Digit, nrunes[0]) {
+                                       loadSpoolFiles(abspath, depth-1)
+                               } else {
+                                       o.Warn("Foreign dirent %s found in spool tree", abspath)
+                               }
+                       }
+               }
+       } else {
+               // depth == 0 - only interested in files.
+               for _, n := range nodes {
+                       abspath := path.Join(dirname, n.Name)
+                       if n.IsRegular() {
+                               if len(n.Name) != 16 {
+                                       shuffleToCorrupted(abspath, "Filename incorrect length")
+                                       continue
+                               }
+                               id, err := strconv.Btoui64(n.Name, 16)
+                               if err != nil {
+                                       shuffleToCorrupted(abspath, "Invalid Filename")
+                                       continue
+                               }
+                               fh, err := os.Open(abspath)
+                               if err != nil {
+                                       shuffleToCorrupted(abspath, "Couldn't open")
+                                       continue
+                               }
+                               defer fh.Close()
+                               jr, err := JobRequestFromReader(fh)
+                               if err != nil || jr.Id != id {
+                                       o.Warn("Couldn't parse?! %s", err)
+                                       shuffleToCorrupted(abspath, "Parse Failure")
+                                       continue
+                               }
+                               // Add the request to the registry directly.
+                               if !RestoreJobState(jr) {
+                                       shuffleToCorrupted(abspath, "Job State Invalid")
+                               }
+                       }
+               }
+       }
+}
+
+// This takes an unmarshall'd job and stuffs it back into the job state.
+func RestoreJobState(job *JobRequest) bool {
+       // check the valid players list.
+       var playersout []string = nil
+       resultsout := make(map[string]*TaskResponse)
+       for _, p := range job.Players {
+               if HostAuthorised(p) {
+                       playersout = append(playersout, p)
+                       // fix the result too.
+                       resout, exists := job.Results[p]
+                       if exists && resout != nil {
+                               resout.id = job.Id
+                               resultsout[p] = resout
+                       }
+                       // remove it so we can sweep it in pass2 for
+                       // results from old hosts that matter.
+                       job.Results[p] = nil, false
+               }
+       }
+       job.Players = playersout
+       if len(job.Players) == 0 {
+               // If there are no players left at this point, discard
+               // the job as corrupt.
+               return false
+       }
+       // now, do pass 2 over the remaining results.
+       for k, v := range job.Results {
+               if v != nil {
+                       // if the results indicate completion, we
+                       // always retain them.
+                       if v.State.Finished() {
+                               resultsout[k] = v
+                               resultsout[k].id = job.Id
+                       }
+               }
+       }
+       job.Results = resultsout
+
+       // now, check the task data.  ONEOF jobs are allowed to
+       // reset tasks that have never been sent.
+       var tasksout []*TaskRequest = nil
+       for _, t := range job.Tasks {
+               // rebuild the return link
+               t.job = job
+               // finished tasks we don't care about.
+               if t.State.Finished() {
+                       tasksout = append(tasksout, t)
+                       continue
+               }
+               if job.Scope == SCOPE_ONEOF {
+                       if t.Player != "" && (t.State == TASK_QUEUED || !HostAuthorised(t.Player)) {
+                               t.State = TASK_QUEUED
+                               t.Player = ""
+                       }
+                       tasksout = append(tasksout, t)
+                       continue
+               } else {
+                       if HostAuthorised(t.Player) {
+                               tasksout = append(tasksout, t)
+                       }
+               }
+       }
+       job.Tasks = tasksout
+       if len(job.Tasks) == 0 {
+               o.Debug("Empty tasks in deserialised job")
+               // Tasks should never be empty.
+               return false
+       }
+       // put the job back into the system.
+       JobAdd(job)
+       JobReviewState(job.Id)
+       if (!job.State.Finished()) {
+               // now, redispatch anything that's not actually finished.
+               for _, t := range job.Tasks {
+                       if !t.State.Finished() {
+                               DispatchTask(t)
+                       }
+               }
+       }
+       return true
+}
+
+func LoadState() {
+       loadLastId()
+       dirname := path.Join(spoolDirectory, "active")
+       loadSpoolFiles(dirname, bucketDepth)
+}
+
+func SaveState() {
+       JobWriteAll()
+       saveLastId()
+}
diff --git a/src/conductor/registry.go b/src/conductor/registry.go
new file mode 100644 (file)
index 0000000..184b816
--- /dev/null
@@ -0,0 +1,586 @@
+// registry.go
+//
+// The Registry provides a 'threadsafe' interface to various global information stores.
+//
+// The registry dispatch thread is forbidden from performing any work that is likely to block.
+// Result channels must be buffered with enough space for the full set of results.
+
+package main
+
+import (
+       o "orchestra"
+       "sort"
+       "time"
+       "container/list"
+)
+
+// Request Types
+const (
+       requestAddClient = iota
+       requestGetClient
+       requestDeleteClient
+       requestSyncClients
+
+       requestAddJob
+       requestGetJob
+       requestAddJobResult
+       requestGetJobResult
+       requestGetJobResultNames
+       requestDisqualifyPlayer
+       requestReviewJobStatus
+
+       requestWriteJobUpdate
+       requestWriteJobAll
+
+       requestQueueSize                = 10
+
+       jobLingerTime                   = int64(30e9)
+)
+
+type registryRequest struct {
+       operation               int
+       id                      uint64
+       hostname                string
+       hostlist                []string
+       job                     *JobRequest
+       tresp                   *TaskResponse
+       responseChannel         chan *registryResponse
+}
+
+type registryResponse struct {
+       success                 bool
+       info                    *ClientInfo
+       tresp                   *TaskResponse
+       names                   []string
+       jobs                    []*JobRequest
+}
+
+var (
+       chanRegistryRequest     = make(chan *registryRequest, requestQueueSize)
+       clientList              = make(map[string]*ClientInfo)
+       jobRegister             = make(map[uint64]*JobRequest)
+       expiryChan              <-chan int64
+       expiryJobid             uint64
+       expiryList              *list.List
+
+       expiryLoopFudge         int64 = 10e6; /* 10 ms should be enough fudgefactor */
+)
+
+func init() {
+       expiryList = list.New()
+}
+
+func regInternalAdd(hostname string) {
+       o.Warn("Registry: New Host \"%s\"", hostname)
+       clientList[hostname] = NewClientInfo()
+       // do this initialisation here since it'll help unmask sequencing errors
+       clientList[hostname].pendingTasks = make(map[uint64]*TaskRequest)
+       clientList[hostname].Player = hostname
+}
+
+func regInternalDel(hostname string) {
+       o.Warn("Registry: Deleting Host \"%s\"", hostname)
+       /* remove it from the registry */
+       clientList[hostname] = nil, false
+}
+
+func regInternalExpireJob(jobid uint64) {
+       job, exists := jobRegister[jobid]
+       if exists {
+               if job.State.Finished() {
+                       jobRegister[jobid] = nil, false
+               } else {
+                       o.Assert("Tried to expire incomplete job.")
+               }
+       }
+}
+
+func regInternalFindNextExpiry() {
+       if expiryChan != nil {
+               o.Assert("Attempted to Find Next Expiry avenue with expiry timer active.")
+       }
+       // if there's nothing to expire, do nothing.
+       if expiryList.Len() == 0 {
+               return
+       }
+
+       for expiryChan == nil && expiryList.Len() > 0 {
+               jobif := expiryList.Remove(expiryList.Front())
+               req, ok := jobif.(*JobRequest)
+               if !ok {
+                       o.Assert("item in expiryList not a *JobRequest")
+               }
+               if (time.Nanoseconds() + expiryLoopFudge) > req.expirytime {
+                       regInternalExpireJob(req.Id)
+               } else {
+                       expiryChan = time.After(req.expirytime - time.Nanoseconds())
+                       expiryJobid = req.Id
+               }
+       }
+}
+
+func regInternalMarkJobForExpiry(job *JobRequest) {
+       job.expirytime = time.Nanoseconds() + jobLingerTime
+       expiryList.PushBack(job)
+       // if there is no job pending expiry, feed it into the delay loop
+       if expiryChan == nil {
+               regInternalFindNextExpiry()
+       }
+}
+
+var registryHandlers = map[int] func(*registryRequest, *registryResponse) {
+requestAddClient:      regintAddClient,
+requestGetClient:      regintGetClient,
+requestDeleteClient:   regintDeleteClient,
+requestSyncClients:    regintSyncClients,
+requestAddJob:         regintAddJob,
+requestGetJob:         regintGetJob,
+requestAddJobResult:   regintAddJobResult,
+requestGetJobResult:   regintGetJobResult,
+requestGetJobResultNames:      regintGetJobResultNames,
+requestDisqualifyPlayer:       regintDisqualifyPlayer,
+requestReviewJobStatus:        regintReviewJobStatus,
+requestWriteJobUpdate: regintWriteJobUpdate,
+requestWriteJobAll:    regintWriteJobAll,
+}
+
+func manageRegistry() {
+       for {
+               select {
+               case req := <-chanRegistryRequest:
+                       resp := new(registryResponse)
+                       // by default, we failed.
+                       resp.success = false
+                       // find the operation
+                       handler, exists := registryHandlers[req.operation]
+                       if exists {
+                               handler(req, resp)
+                       }
+                       if req.responseChannel != nil {
+                               req.responseChannel <- resp
+                       }
+               case <-expiryChan:
+                       o.Debug("job%d: Expiring Job Record", expiryJobid)
+                       regInternalExpireJob(expiryJobid)
+                       expiryChan = nil
+                       regInternalFindNextExpiry()
+               }
+       }
+}
+
+func StartRegistry() {
+       go manageRegistry()
+}
+
+func newRequest(wants_response bool) (req *registryRequest) {
+       req = new(registryRequest)
+       if wants_response {
+               req.responseChannel = make(chan *registryResponse, 1)
+       }
+
+       return req
+}
+       
+func ClientAdd(hostname string) (success bool) {
+       r := newRequest(true)
+       r.operation = requestAddClient
+       r.hostname = hostname
+       chanRegistryRequest <- r
+       resp := <- r.responseChannel
+       
+       return resp.success
+}
+
+func regintAddClient(req *registryRequest, resp *registryResponse) {
+       _, exists := clientList[req.hostname]
+       if exists {
+               resp.success = false
+       } else {
+               regInternalAdd(req.hostname)
+               resp.success = true
+       }
+}
+
+func ClientDelete(hostname string) (success bool) {
+       r := newRequest(true)
+       r.operation = requestDeleteClient
+       r.hostname = hostname
+       chanRegistryRequest <- r
+       resp := <- r.responseChannel
+       
+       return resp.success
+}
+
+func regintDeleteClient(req *registryRequest, resp *registryResponse) {
+       _, exists := clientList[req.hostname]
+       if exists {
+               resp.success = true
+               regInternalDel(req.hostname)
+       } else {
+               resp.success = false
+       }
+}
+
+func ClientGet(hostname string) (info *ClientInfo) {
+       r := newRequest(true)
+       r.operation = requestGetClient
+       r.hostname = hostname
+       chanRegistryRequest <- r
+       resp := <- r.responseChannel
+       if resp.success {
+               return resp.info
+       }
+       return nil
+}
+
+func regintGetClient(req *registryRequest, resp *registryResponse) {
+       clinfo, exists := clientList[req.hostname]
+       if exists {
+               resp.success = true
+               resp.info = clinfo
+       } else {
+               resp.success = false
+       }
+}
+
+func ClientUpdateKnown(hostnames []string) {
+       /* this is an asynchronous, we feed it into the registry 
+        * and it'll look after itself.
+       */
+       r := newRequest(false)
+       r.operation = requestSyncClients
+       r.hostlist = hostnames
+       chanRegistryRequest <- r
+}
+
+func regintSyncClients(req *registryRequest, resp *registryResponse) {
+       // we need to make sure the registered clients matches the
+       // hostlist we're given.
+       //
+       // First, we transform the array into a map
+       newhosts := make(map[string]bool)
+       for k,_ := range req.hostlist {
+               newhosts[req.hostlist[k]] = true
+       }
+       // now, scan the current list, checking to see if they exist.
+       // Remove them from the newhosts map if they do exist.
+       for k,_ := range clientList {
+               _, exists := newhosts[k]
+               if exists {
+                       // remove it from the newhosts map
+                       newhosts[k] = false, false
+               } else {
+                       regInternalDel(k)
+               }
+       }
+       // now that we're finished, we should only have new clients in
+       // the newhosts list left.
+       for k,_ := range newhosts {
+               regInternalAdd(k)
+       }
+       // and we're done.
+}
+
+// Add a Job to the registry.  Return true if successful, returns
+// false if the job is lacking critical information (such as a JobId)
+// and can't be registered.
+func JobAdd(job *JobRequest) bool {
+       rr := newRequest(true)
+       rr.operation = requestAddJob
+       rr.job = job
+
+       chanRegistryRequest <- rr
+       resp := <- rr.responseChannel 
+       return resp.success
+}
+
+func regintAddJob(req *registryRequest, resp *registryResponse) {
+       if nil == req.job {
+               return
+       }
+       // ensure that the players are sorted!
+       sort.Strings(req.job.Players)
+       // update the state
+       req.job.updateState()
+       // and register the job
+       _, overwrite := jobRegister[req.job.Id]
+       if !overwrite {
+               jobRegister[req.job.Id] = req.job
+               // force a queue update.
+               req.job.UpdateInSpool()
+               if req.job.State.Finished() {
+                       regInternalMarkJobForExpiry(req.job)
+               }
+               resp.success = true
+       }
+}
+
+// Get a Job from the registry.  Returns the job if successful,
+// returns nil if the job couldn't be found.
+func JobGet(id uint64) *JobRequest {
+       rr := newRequest(true)
+       rr.operation = requestGetJob
+       rr.id = id
+
+       chanRegistryRequest <- rr
+       resp := <- rr.responseChannel
+       if resp.jobs == nil {
+               return nil
+       }
+       return resp.jobs[0]
+}
+
+func regintGetJob(req *registryRequest, resp *registryResponse) {
+       job, exists := jobRegister[req.id]
+       resp.success = exists
+       if exists {
+               resp.jobs = make([]*JobRequest, 1)
+               resp.jobs[0] = job
+       } else {
+               o.Warn("Received Request for job%d which is not in memory", req.id)
+               go regintGetJobDeferred(req.id, req.responseChannel)
+               // mask out the responseChannel so the deferred handler can use it.
+               req.responseChannel = nil
+       }
+}
+
+func regintGetJobDeferred(jobid uint64, responseChannel chan<- *registryResponse) {
+       resp := new(registryResponse)
+       resp.success = false
+       defer func (resp *registryResponse, rChan chan<- *registryResponse) {
+               rChan <- resp;
+       }(resp, responseChannel)
+
+       req, err := LoadFromFinished(jobid)
+       if err != nil {
+               o.Warn("Couldn't load job%d from disk.  Doesn't exist?", jobid)
+               return
+       }
+       // fix up the state, and stuff it back into the system
+       RestoreJobState(req)
+       resp.jobs = make([]*JobRequest, 1)
+       resp.jobs[0] = req
+       resp.success = true
+}
+
+// Attach a result to a Job in the Registry
+//
+// This exists in order to prevent nasty concurrency problems
+// when trying to put results back onto the job.  Reading a job is far
+// less of a problem than writing to it.
+func JobAddResult(playername string, task *TaskResponse) bool {
+       rr := newRequest(true)
+       rr.operation = requestAddJobResult
+       rr.tresp = task
+       rr.hostname = playername
+       chanRegistryRequest <- rr
+       resp := <- rr.responseChannel
+       return resp.success
+}
+
+func regintAddJobResult(req *registryRequest, resp *registryResponse) {
+       job, exists := jobRegister[req.tresp.id]
+       resp.success = exists
+       if exists {
+               job.Results[req.hostname] = req.tresp
+               // force a queue update.
+               job.UpdateInSpool()
+       }
+}
+
+// Get a result from the registry
+func JobGetResult(id uint64, playername string) (tresp *TaskResponse) {
+       rr := newRequest(true)
+       rr.operation = requestGetJobResult
+       rr.id = id
+       rr.hostname = playername
+       chanRegistryRequest <- rr
+       resp := <- rr.responseChannel
+       return resp.tresp
+}
+
+func regintGetJobResult(req *registryRequest, resp *registryResponse) {
+       job, exists := jobRegister[req.id]
+       if exists {
+               result, exists := job.Results[req.hostname]
+               resp.success = exists
+               if exists {
+                       resp.tresp = result
+               }
+       } else {
+               resp.success = false
+       }
+}
+
+// Get a list of names we have results for against a given job.
+func JobGetResultNames(id uint64) (names []string) {
+       rr := newRequest(true)
+       rr.operation = requestGetJobResultNames
+       rr.id = id
+
+       chanRegistryRequest <- rr
+       resp := <- rr.responseChannel 
+       return resp.names
+}
+
+func regintGetJobResultNames(req *registryRequest, resp *registryResponse) {
+       job, exists := jobRegister[req.id]
+       resp.success = exists
+       if exists {
+               resp.names = make([]string, len(job.Results))
+               idx := 0
+               for k, _ := range job.Results {
+                       resp.names[idx] = k
+                       idx++
+               }
+       }
+}
+
+//  Disqualify a player from servicing a job
+func JobDisqualifyPlayer(id uint64, playername string) bool {
+       rr := newRequest(true)
+       rr.operation = requestDisqualifyPlayer
+       rr.id = id
+       rr.hostname = playername
+
+       chanRegistryRequest <- rr
+       resp := <- rr.responseChannel
+
+       return resp.success
+}
+
+func regintDisqualifyPlayer(req *registryRequest, resp *registryResponse) {
+       job, exists := jobRegister[req.id]
+       if exists {
+               idx := sort.Search(len(job.Players), func(idx int) bool { return job.Players[idx] >= req.hostname })
+               if (job.Players[idx] == req.hostname) {
+                       resp.success = true
+                       newplayers := make([]string, len(job.Players)-1)
+                       copy(newplayers[0:idx], job.Players[0:idx])
+                       copy(newplayers[idx:len(job.Players)-1], job.Players[idx+1:len(job.Players)])
+                       job.Players = newplayers
+                       job.updateState()
+                       // force a queue update.
+                       job.UpdateInSpool()
+               } else {
+                       resp.success = false
+               }
+       } else {
+               resp.success = false
+       }
+}
+
+func JobReviewState(id uint64) bool {
+       rr := newRequest(true)
+       rr.operation = requestReviewJobStatus
+       rr.id = id
+
+       chanRegistryRequest <- rr
+       resp := <- rr.responseChannel
+
+       return resp.success
+}
+
+func regintReviewJobStatus(req *registryRequest, resp *registryResponse) {
+       job, exists := jobRegister[req.id]
+       resp.success = exists
+       if exists {
+               job.updateState()
+               // force a queue update.
+               job.UpdateInSpool()
+       }
+}
+
+func JobWriteUpdate(id uint64) {
+       rr := newRequest(false)
+       rr.operation = requestWriteJobUpdate
+       rr.id = id
+       chanRegistryRequest <- rr
+}
+
+func regintWriteJobUpdate(req *registryRequest, resp *registryResponse) {
+       job, exists := jobRegister[req.id]
+       resp.success = exists
+       if exists {
+               job.UpdateInSpool()
+       }
+}
+
+func JobWriteAll() bool {
+       rr := newRequest(true)
+       rr.operation = requestWriteJobAll
+
+       chanRegistryRequest <- rr
+       resp := <-rr.responseChannel
+
+       return resp.success
+}
+
+func regintWriteJobAll(req *registryRequest, resp *registryResponse) {
+       for _, job := range jobRegister {
+               job.UpdateInSpool()
+       }
+       resp.success = true
+}
+
+// Ugh.
+func (job *JobRequest) updateState() {
+       if job.Results == nil {
+               o.Assert("job.Results nil for jobid %d", job.Id)
+               return
+       }
+       was_finished := job.State.Finished()
+       switch job.Scope {
+       case SCOPE_ONEOF:
+               // look for a success (any success) in the responses
+               var success bool = false
+               for host, res := range job.Results {
+                       if res == nil {
+                               o.Debug("nil result for %s?", host)
+                               continue
+                       }
+                       if res.State == RESP_FINISHED {
+                               success = true
+                               break
+                       }
+               }
+               // update the job state based upon these findings
+               if success {
+                       job.State = JOB_SUCCESSFUL
+               } else {
+                       if len(job.Players) < 1 {
+                               job.State = JOB_FAILED
+                       } else {
+                               job.State = JOB_PENDING
+                       }
+               }
+       case SCOPE_ALLOF:
+               var success int = 0
+               var failed  int = 0
+               
+               for pidx := range job.Players {
+                       p := job.Players[pidx]
+                       resp, exists := job.Results[p]
+                       if exists {
+                               if resp.DidFail() {
+                                       failed++
+                               } else if resp.State == RESP_FINISHED {
+                                       success++
+                               }
+                       }
+               }
+               if (success + failed) < len(job.Players) {
+                       job.State = JOB_PENDING
+               } else if success == len(job.Players) {
+                       job.State = JOB_SUCCESSFUL
+               } else if failed == len(job.Players) {
+                       job.State = JOB_FAILED
+               } else {
+                       job.State = JOB_FAILED_PARTIAL
+               }
+       }
+       if !was_finished && job.State.Finished() {
+               o.Debug("job%d: Finished - Setting Expiry Time", job.Id)
+               regInternalMarkJobForExpiry(job)
+       }
+}
diff --git a/src/conductor/resp_state.go b/src/conductor/resp_state.go
new file mode 100644 (file)
index 0000000..146d08b
--- /dev/null
@@ -0,0 +1,113 @@
+// resp_state.go
+
+package main
+
+import (
+       "os"
+       "json"
+)
+
+type ResponseState int
+
+const (
+       // Response states
+       RESP_INVALID                    = ResponseState(iota)
+       RESP_PENDING                    // internal state, not wire.
+       RESP_RUNNING
+       RESP_FINISHED
+       RESP_FAILED
+       RESP_FAILED_UNKNOWN_SCORE
+       RESP_FAILED_HOST_ERROR
+       RESP_FAILED_UNKNOWN
+)
+
+func (rs ResponseState) String() (strout string) {
+       switch rs {
+       case RESP_RUNNING:
+               return "PENDING"
+       case RESP_FINISHED:
+               return "OK"
+       case RESP_FAILED:
+               return "FAIL"
+       case RESP_FAILED_UNKNOWN_SCORE:
+               return "UNK_SCORE"
+       case RESP_FAILED_HOST_ERROR:
+               return "HOST_ERROR"
+       case RESP_FAILED_UNKNOWN:
+               return "UNKNOWN_FAILURE"
+       }
+       return ""
+}
+
+func (rs ResponseState) MarshalJSON() (out []byte, err os.Error) {
+       strout := rs.String()
+       if strout != "" {
+               return json.Marshal(strout)
+       }
+       return nil, InvalidValueError
+}
+
+func (rs *ResponseState) UnmarshalJSON(in []byte) (err os.Error) {
+       var statestr string
+       err = json.Unmarshal(in, &statestr)
+       if err != nil {
+               return err
+       }
+       switch statestr {
+       case "PENDING":
+               *rs = RESP_PENDING
+       case "OK":
+               *rs = RESP_FINISHED
+       case "FAIL":
+               *rs = RESP_FAILED
+       case "UNK_SCORE":
+               *rs = RESP_FAILED_UNKNOWN_SCORE
+       case "HOST_ERROR":
+               *rs = RESP_FAILED_HOST_ERROR
+       case "UNKNOWN_FAILURE":
+               *rs = RESP_FAILED_UNKNOWN
+       default:
+               return InvalidValueError
+       }
+       return nil
+}
+
+func (rs ResponseState) Finished() bool {
+       switch rs {
+       case RESP_FINISHED:
+               fallthrough
+       case RESP_FAILED:
+               fallthrough
+       case RESP_FAILED_UNKNOWN_SCORE:
+               fallthrough
+       case RESP_FAILED_HOST_ERROR:
+               fallthrough
+       case RESP_FAILED_UNKNOWN:
+               return true
+       }
+       return false
+}
+
+// true if the response says the task failed.  false otherwise.
+func (rs ResponseState) Failed() bool {
+       switch rs {
+       case RESP_RUNNING:
+               fallthrough
+       case RESP_FINISHED:
+               return false
+       }
+       return true
+}
+
+// true if the task can be tried.
+// precond:  DidFail is true, job is a ONE_OF job.
+// must return false otherwise.
+func (rs ResponseState) CanRetry() bool {
+       switch rs {
+       case RESP_FAILED_UNKNOWN_SCORE:
+               fallthrough
+       case RESP_FAILED_HOST_ERROR:
+               return true
+       }
+       return false
+}
\ No newline at end of file
diff --git a/src/conductor/server.go b/src/conductor/server.go
new file mode 100644 (file)
index 0000000..1d81dfa
--- /dev/null
@@ -0,0 +1,105 @@
+/* server.go
+ *
+ * TLS and Connection Hell.
+*/
+
+package main
+
+import (
+       "net"
+       "crypto/tls"
+       "fmt"
+       "os"
+       "crypto/x509"
+       o       "orchestra"
+)
+
+var (
+       CACertPool      *x509.CertPool = nil
+)
+
+
+func ServiceRequests() {
+       var sockConfig tls.Config
+
+       // resolve the bind address
+       bindAddressStr := GetStringOpt("bind address")
+       var bindAddr *net.IPAddr = nil
+       if (bindAddressStr != "") {
+               var err os.Error
+               bindAddr, err = net.ResolveIPAddr("ip", bindAddressStr)
+               if (err != nil) {
+                       o.Warn("Ignoring bind address.  Couldn't resolve \"%s\": %s", bindAddressStr, err)
+               } else {
+                       bindAddr = nil
+               }
+       }
+       // load the x509 certificate and key, then attach it to the tls config.
+       x509CertFilename := GetStringOpt("x509 certificate")
+       x509PrivateKeyFilename := GetStringOpt("x509 private key")
+       serverCert, err := tls.LoadX509KeyPair(x509CertFilename, x509PrivateKeyFilename)
+       o.MightFail(err, "Couldn't load certificates")
+       sockConfig.Certificates = append(sockConfig.Certificates, serverCert)
+
+       // load the CA certs
+       CACertPool = x509.NewCertPool()
+       caCertNames := GetCACertList()
+       if caCertNames != nil {
+               for _, filename := range caCertNames {
+                       fh, err := os.Open(filename)
+                       if err != nil {
+                               o.Warn("Whilst parsing CA certs, couldn't open %s: %s", filename, err)
+                               continue
+                       }
+                       defer fh.Close()
+                       fi, err := fh.Stat()
+                       o.MightFail(err, "Couldn't stat CA certificate file: %s", filename)
+                       data := make([]byte, fi.Size)
+                       fh.Read(data)
+                       CACertPool.AppendCertsFromPEM(data)
+               }
+       }
+       sockConfig.RootCAs = CACertPool
+
+       // determine the server hostname.
+       servername := GetStringOpt("server name")
+       if servername != "" {
+               o.Info("Using %s as the server name", servername)
+               sockConfig.ServerName = servername
+       } else {
+               if bindAddr != nil {
+                       o.Warn("Probing for fqdn for bind address as none was provided.")
+                       hostnames, err := net.LookupAddr(bindAddr.String())
+                       o.MightFail(err, "Failed to get full hostname for bind address")
+                       sockConfig.ServerName = hostnames[0]
+               } else {
+                       o.Warn("Probing for fqdn as no server name was provided")
+                       sockConfig.ServerName = o.ProbeHostname()
+               }
+       }
+
+       // ask the client to authenticate
+       sockConfig.AuthenticateClient = true
+
+       /* convert the bindAddress to a string suitable for the Listen call */
+       var laddr string
+       if (bindAddr == nil) {
+               laddr = fmt.Sprintf(":%d", o.DefaultMasterPort)
+       } else {
+               laddr = fmt.Sprintf("%s:%d", bindAddr.String(), o.DefaultMasterPort)
+       }
+       o.Info("Binding to %s", laddr)
+       listener, err := tls.Listen("tcp", laddr, &sockConfig)
+       o.MightFail(err, "Couldn't bind TLS listener")
+
+       for {
+               o.Warn("Waiting for Connection...")
+               c, err := listener.Accept()
+               o.MightFail(err, "Couldn't accept TLS connection")
+               o.Warn("Connection received from %s", c.RemoteAddr().String())
+               HandleConnection(c)
+       }
+}
+
+
+
diff --git a/src/conductor/signal.go b/src/conductor/signal.go
new file mode 100644 (file)
index 0000000..509c187
--- /dev/null
@@ -0,0 +1,50 @@
+/* signal.go
+ *
+ * Signal Handlers
+ */
+
+package main
+
+import (
+       "os/signal"
+       "os"
+       "syscall"
+       "fmt"
+       o "orchestra"
+)
+
+
+// handle the signals.  By default, we ignore everything, but the
+// three terminal signals, HUP, INT, TERM, we want to explicitly
+// handle.
+func signalHandler() {
+       for {
+               sig := <-signal.Incoming
+
+               ux, ok := sig.(os.UnixSignal)
+               if !ok {
+                       o.Warn("Couldn't handle signal %s, Coercion failed", sig)
+                       continue
+               }
+
+               switch int(ux) {
+               case syscall.SIGHUP:
+                       o.Warn("Reloading Configuration")
+                       ConfigLoad()
+               case syscall.SIGINT:
+                       fmt.Fprintln(os.Stderr, "Interrupt Received - Terminating")
+                       //FIXME: Gentle Shutdown
+                       SaveState()
+                       os.Exit(1)
+               case syscall.SIGTERM:
+                       fmt.Fprintln(os.Stderr, "Terminate Received - Terminating")
+                       //FIXME: Gentle Shutdown
+                       os.Exit(2)
+               }
+       }
+
+}
+
+func init() {
+       go signalHandler()
+}
diff --git a/src/conductor/task_request.go b/src/conductor/task_request.go
new file mode 100644 (file)
index 0000000..f6f3fc0
--- /dev/null
@@ -0,0 +1,47 @@
+// task_request.go
+//
+package main
+
+import (
+       "sort"
+       o "orchestra"
+)
+
+type TaskRequest struct {
+       job             *JobRequest
+       Player          string                          `json:"player"`
+       State           TaskState                       `json:"state"`
+       RetryTime       int64                           `json:"retrytime"`
+}
+
+func NewTaskRequest() (tr *TaskRequest) {
+       tr = new(TaskRequest)
+       tr.State = TASK_QUEUED
+
+       return tr
+}
+
+func (task *TaskRequest) Encode() (ptr *o.ProtoTaskRequest) {
+       ptr = new(o.ProtoTaskRequest)
+       ptr.Jobname = &task.job.Score
+       ptr.Id = new(uint64)
+       *ptr.Id = task.job.Id
+       ptr.Parameters = o.ProtoJobParametersFromMap(task.job.Params)
+
+       return ptr
+}
+
+func (task *TaskRequest) IsTarget(player string) (valid bool) {
+       valid = false
+       if task.Player == "" {
+               n := sort.SearchStrings(task.job.Players, player)
+               if task.job.Players[n] == player {
+                       valid = true
+               }
+       } else {
+               if task.Player == player {
+                       valid = true
+               }
+       }
+       return valid
+}
diff --git a/src/conductor/task_response.go b/src/conductor/task_response.go
new file mode 100644 (file)
index 0000000..c76819a
--- /dev/null
@@ -0,0 +1,61 @@
+// task_response.go
+//
+package main
+
+import (
+       o "orchestra"
+)
+
+type TaskResponse struct {
+       id              uint64                          
+       State           ResponseState                   `json:"state"`
+       Response        map[string]string               `json:"response"`       
+}
+
+// Response related magic
+
+func NewTaskResponse() (resp *TaskResponse) {
+       resp = new(TaskResponse)
+       resp.Response = make(map[string]string)
+
+       return resp
+}
+
+func (resp *TaskResponse) IsFinished() bool {
+       return resp.State.Finished()
+}
+
+func (resp *TaskResponse) DidFail() bool {
+       return resp.State.Failed()
+}
+
+func (resp *TaskResponse) CanRetry() bool {
+       return resp.State.CanRetry()
+}
+
+
+func ResponseFromProto(ptr *o.ProtoTaskResponse) (r *TaskResponse) {
+       r = new(TaskResponse)
+
+       switch (*(ptr.Status)) {
+       case o.ProtoTaskResponse_JOB_INPROGRESS:
+               r.State = RESP_RUNNING
+       case o.ProtoTaskResponse_JOB_SUCCESS:
+               r.State = RESP_FINISHED
+       case o.ProtoTaskResponse_JOB_FAILED:
+               r.State = RESP_FAILED
+       case o.ProtoTaskResponse_JOB_HOST_FAILURE:
+               r.State = RESP_FAILED_HOST_ERROR
+       case o.ProtoTaskResponse_JOB_UNKNOWN:
+               r.State = RESP_FAILED_UNKNOWN_SCORE
+       case o.ProtoTaskResponse_JOB_UNKNOWN_FAILURE:
+               fallthrough
+       default:
+               r.State = RESP_FAILED_UNKNOWN
+       }
+
+       r.id = *(ptr.Id)
+       r.Response = o.MapFromProtoJobParameters(ptr.Response)
+
+       return r
+}
diff --git a/src/conductor/task_state.go b/src/conductor/task_state.go
new file mode 100644 (file)
index 0000000..43160fb
--- /dev/null
@@ -0,0 +1,68 @@
+// task_state.go
+
+package main
+
+import (
+       "os"
+       "json"
+)
+
+type TaskState int
+
+const (
+       TASK_INVALID            = TaskState(iota)
+       // Task is fresh and has never been sent to the client.  It can be rescheduled still.
+       TASK_QUEUED
+       // Task has been transmitted at least once
+       TASK_PENDINGRESULT
+       // Task has finished and we have received a result.
+       TASK_FINISHED
+)
+
+func (ts TaskState) String() (strout string) {
+       switch ts {
+       case TASK_QUEUED:
+               strout = "QUEUED"
+       case TASK_PENDINGRESULT:
+               strout = "PENDING"
+       case TASK_FINISHED:
+               strout = "FINISHED"
+       default:
+               strout = ""
+       }
+       return strout
+}
+
+func (ts TaskState) MarshalJSON() (out []byte, err os.Error) {
+       strout := ts.String()
+       if strout != "" {
+               return json.Marshal(strout)
+       }
+       return nil, InvalidValueError
+}
+
+func (ts *TaskState) UnmarshalJSON(in []byte) (err os.Error) {
+       var statestr string
+       err = json.Unmarshal(in, &statestr)
+       if err != nil {
+               return err
+       }
+       switch statestr {
+       case "QUEUED":
+               *ts = TASK_QUEUED
+       case "PENDING":
+               *ts = TASK_PENDINGRESULT
+       case "FINISHED":
+               *ts = TASK_FINISHED
+       default:
+               return InvalidValueError
+       }
+       return nil
+}
+
+func (ts TaskState) Finished() bool {
+       if ts == TASK_FINISHED {
+               return true
+       }
+       return false
+}
\ No newline at end of file
diff --git a/src/getstatus/Makefile b/src/getstatus/Makefile
new file mode 100644 (file)
index 0000000..778c9c5
--- /dev/null
@@ -0,0 +1,8 @@
+include $(GOROOT)/src/Make.inc
+
+TARG=getstatus
+
+GOFILES=\
+       getstatus.go\
+
+include $(GOROOT)/src/Make.cmd
diff --git a/src/getstatus/getstatus.go b/src/getstatus/getstatus.go
new file mode 100644 (file)
index 0000000..34618ce
--- /dev/null
@@ -0,0 +1,137 @@
+// getstatus.go
+//
+// A sample Orchestra status polling client.
+
+package main
+
+import (
+       "io"
+       "net"
+       "json"
+       "flag"
+       "fmt"
+       "os"
+       "strconv"
+)
+
+type StatusRequest struct {
+       Op      string  `json:"op"`
+       Id      uint64  `json:"id"`
+}
+
+type PlayerStatus struct {
+       Status          *string         `json:"status"`
+       Response        map[string]*string      `json:"response"`
+}
+
+type StatusResponse struct {
+       Status          *string         `json:"status"`
+       Players         map[string]*PlayerStatus        `json:"players"`
+}
+
+var (
+       AudienceSock = flag.String("audience-sock", "/var/spool/orchestra/conductor.sock", "Path for the audience submission socket")
+)
+
+func NewStatusRequest() (sr *StatusRequest) {
+       sr = new(StatusRequest)
+       sr.Op = "status"
+       return sr
+}
+
+func Usage() {
+       fmt.Fprintf(os.Stderr, "Usage:\n")
+       fmt.Fprintf(os.Stderr, "  %s [<options>] <jobid>\n", os.Args[0])
+       flag.PrintDefaults()
+}
+
+func main() {
+       flag.Usage = Usage
+       flag.Parse()
+
+       if flag.NArg() != 1 {
+               flag.Usage()
+               os.Exit(1)
+       }
+
+       sr := NewStatusRequest()
+       var err os.Error
+       sr.Id, err = strconv.Atoui64(flag.Arg(0))
+       if nil != err {
+               fmt.Fprintf(os.Stderr, "Failed to parse JobID: %s\n", err)
+               os.Exit(1)
+       }
+       
+       raddr, err := net.ResolveUnixAddr("unix", *AudienceSock)
+       if err != nil {
+               fmt.Fprintf(os.Stderr, "Failed to resolve sockaddr: %s\n", err)
+               os.Exit(1)
+       }
+       conn, err := net.DialUnix("unix", nil, raddr)
+       if err != nil {
+               fmt.Fprintf(os.Stderr, "Failed to connect to sockaddr: %s\n", err)
+               os.Exit(1)
+       }
+
+       defer conn.Close()
+
+       conn.SetTimeout(0)
+
+       nc := net.Conn(conn)
+
+       r, _ := nc.(io.Reader)
+       w, _ := nc.(io.Writer)
+
+       dec := json.NewDecoder(r)
+       enc := json.NewEncoder(w)
+
+       // send the message
+       err = enc.Encode(sr)
+       if err != nil {
+               fmt.Fprintf(os.Stderr, "Failed to marshal & send: %s\n", err)
+               os.Exit(1)
+       }
+
+       response := new([2]interface{})
+       sresp := new(StatusResponse)
+       response[1] = sresp
+       err = dec.Decode(response)
+       if err != nil {
+               // OK, the problem here is that an in the request will throw a null in the second field.
+               // This will cause Decode to softfault.  We'll ignore these softfaults.
+               utye, ok := err.(*json.UnmarshalTypeError)
+               if ok {
+                       fmt.Fprintf(os.Stderr, "Unmarshalling error: %s of Type %s\n", utye.Value, utye.Type)
+               } else {
+                       ufe, ok := err.(*json.UnmarshalFieldError)
+                       if ok {
+                               fmt.Fprintf(os.Stderr, "Error decoding response: UFE %s of Type %s\n", ufe.Key, ufe.Type)
+                               os.Exit(1)
+                       }
+                       ute, ok := err.(*json.UnsupportedTypeError)
+                       if ok {
+                               fmt.Fprintf(os.Stderr, "Error decoding response: UTE of Type %s\n", ute.Type)
+                               os.Exit(1)
+                       }
+
+                       fmt.Fprintf(os.Stderr, "Error decoding response: %s\n", err)
+                       os.Exit(1)
+               }
+       }
+
+       // coerce field 0 back into a string.
+       rerr,ok := response[0].(string)
+       if ok {
+               if rerr == "OK" {
+                       // all OK, process the sresp.
+                       fmt.Printf("Aggregate: %s\n", *sresp.Status)
+                       os.Exit(0)
+               } else {
+                       fmt.Fprintf(os.Stderr, "Server Error: %s\n", rerr)
+                       os.Exit(1)
+               }
+       } else {
+               fmt.Fprintf(os.Stderr, "Couldn't unmarshal response correctly.\n");
+               os.Exit(1)
+       }
+}
diff --git a/src/orchestra/Makefile b/src/orchestra/Makefile
new file mode 100644 (file)
index 0000000..72c2a95
--- /dev/null
@@ -0,0 +1,17 @@
+include $(GOROOT)/src/Make.inc
+
+#TARGDIR=../build/pkg/$(GOOS)_$(GOARCH)
+
+TARG=orchestra/orchestra
+GOFILES=\
+       orchestra.pb.go\
+       wire.go\
+       marshal.go\
+       shared.go\
+       registry.go\
+
+include $(GOROOT)/src/Make.pkg
+
+ifdef HAVE_PROTOBUF
+include $(GOROOT)/src/pkg/goprotobuf.googlecode.com/hg/Make.protobuf
+endif
diff --git a/src/orchestra/marshal.go b/src/orchestra/marshal.go
new file mode 100644 (file)
index 0000000..0418a84
--- /dev/null
@@ -0,0 +1,173 @@
+/* marshal.go
+ *
+ * Common wire marshalling code.
+*/
+
+package orchestra;
+
+import (
+       "os"
+       "goprotobuf.googlecode.com/hg/proto"
+)
+
+var (
+       ErrUnknownType = os.NewError("Unknown Type in Encode request")
+       ErrObjectTooLarge = os.NewError("Encoded Object exceeds maximum encoding size")
+)
+
+/* ugh ugh ugh.  As much as I love protocol buffers, not having maps
+ * as a native type is a PAIN IN THE ASS.
+ *
+ * Here's some common code to convert my K/V format in protocol
+ * buffers to and from native Go structures.
+*/
+func MapFromProtoJobParameters(parray []*ProtoJobParameter) (mapparam map[string]string) {
+       mapparam = make(map[string]string)
+
+       for p := range parray {
+               mapparam[*(parray[p].Key)] = *(parray[p].Value)
+       }
+
+       return mapparam
+}
+
+func ProtoJobParametersFromMap(mapparam map[string]string) (parray []*ProtoJobParameter) {
+       parray = make([]*ProtoJobParameter, len(mapparam))
+       i := 0
+       for k,v := range mapparam {
+               arg := new(ProtoJobParameter)
+               arg.Key = proto.String(k)
+               arg.Value = proto.String(v)
+               parray[i] = arg
+               i++
+       }
+
+       return parray
+}
+
+
+
+func (p *WirePkt) Decode() (obj interface{}, err os.Error) {
+       switch (p.Type) {
+       case TypeNop:
+               if (p.Length != 0) {
+                       /* throw error later... */
+                       return nil, ErrMalformedMessage;
+               }
+               return nil, nil
+       case TypeIdentifyClient:
+               ic := new(IdentifyClient)
+               err := proto.Unmarshal(p.Payload[0:p.Length], ic)
+               if err != nil {
+                       return nil, err
+               }
+               return ic, nil
+       case TypeReadyForTask:
+               if (p.Length != 0) {
+                       /* throw error later... */
+                       return nil, ErrMalformedMessage;
+               }
+               return nil, nil
+       case TypeTaskRequest:
+               tr := new(ProtoTaskRequest)
+               err := proto.Unmarshal(p.Payload[0:p.Length], tr)
+               if err != nil {
+                       return nil, err
+               }
+               return tr, nil
+       case TypeTaskResponse:
+               tr := new(ProtoTaskResponse)
+               err := proto.Unmarshal(p.Payload[0:p.Length], tr)
+               if err != nil {
+                       return nil, err
+               }
+               return tr, nil
+       case TypeAcknowledgement:
+               tr := new(ProtoAcknowledgement)
+               err := proto.Unmarshal(p.Payload[0:p.Length], tr)
+               if err != nil {
+                       return nil, err
+               }
+               return tr, nil
+       }
+       return nil, ErrUnknownMessage
+}
+
+func Encode(obj interface{}) (p *WirePkt, err os.Error) {
+       p = new(WirePkt)
+       switch obj.(type) {
+       case *IdentifyClient:
+               p.Type = TypeIdentifyClient
+       case *ProtoTaskRequest:
+               p.Type = TypeTaskRequest
+       case *ProtoTaskResponse:
+               p.Type = TypeTaskResponse
+       case *ProtoAcknowledgement:
+               p.Type = TypeAcknowledgement
+       default:
+               Warn("Encoding unknown type!")
+               return nil, ErrUnknownType
+       }
+       p.Payload, err = proto.Marshal(obj)
+       if err != nil {
+               return nil, err
+       }
+       if len(p.Payload) >= 0x10000 {
+               return nil, ErrObjectTooLarge
+       }
+       p.Length = uint16(len(p.Payload))
+
+       return p, nil   
+}
+
+
+func MakeNop() (p *WirePkt) {
+       p = new(WirePkt)
+       p.Length = 0
+       p.Type = TypeNop
+       p.Payload = nil
+
+       return p
+}
+
+func MakeIdentifyClient(hostname string) (p *WirePkt) {
+       s := new(IdentifyClient)
+       s.Hostname = proto.String(hostname)
+
+       p, _ = Encode(s)
+       
+       return p
+}
+
+func MakeReadyForTask() (p *WirePkt){
+       p = new(WirePkt)
+       p.Type = TypeReadyForTask
+       p.Length = 0
+       p.Payload = nil
+
+       return p
+}
+
+/* We use the failure code for negative acknowledgements */
+func MakeNack(id uint64) (p *WirePkt) {
+       a := new(ProtoAcknowledgement)
+       a.Id = proto.Uint64(id)
+       a.Response = new(ProtoAcknowledgement_AckType)
+       *(a.Response) = ProtoAcknowledgement_ACK_ERROR
+
+       p, _ = Encode(a)
+
+       return p
+}
+
+// Construct a positive ACK for transmission
+func MakeAck(id uint64) (p *WirePkt) {
+       a := new(ProtoAcknowledgement)
+       a.Id = proto.Uint64(id)
+       a.Response = new(ProtoAcknowledgement_AckType)
+       *(a.Response) = ProtoAcknowledgement_ACK_OK
+
+       p, _ = Encode(a)
+
+       return p
+}
diff --git a/src/orchestra/orchestra.pb.go b/src/orchestra/orchestra.pb.go
new file mode 100644 (file)
index 0000000..504912d
--- /dev/null
@@ -0,0 +1,127 @@
+// Code generated by protoc-gen-go from "orchestra.proto"
+// DO NOT EDIT!
+
+package orchestra
+
+import proto "goprotobuf.googlecode.com/hg/proto"
+import "math"
+import "os"
+
+// Reference proto, math & os imports to suppress error if they are not otherwise used.
+var _ = proto.GetString
+var _ = math.Inf
+var _ os.Error
+
+
+type ProtoAcknowledgement_AckType int32
+
+const (
+       ProtoAcknowledgement_ACK_OK     = 1
+       ProtoAcknowledgement_ACK_ERROR  = 3
+)
+
+var ProtoAcknowledgement_AckType_name = map[int32]string{
+       1:      "ACK_OK",
+       3:      "ACK_ERROR",
+}
+var ProtoAcknowledgement_AckType_value = map[string]int32{
+       "ACK_OK":       1,
+       "ACK_ERROR":    3,
+}
+
+func NewProtoAcknowledgement_AckType(x int32) *ProtoAcknowledgement_AckType {
+       e := ProtoAcknowledgement_AckType(x)
+       return &e
+}
+func (x ProtoAcknowledgement_AckType) String() string {
+       return proto.EnumName(ProtoAcknowledgement_AckType_name, int32(x))
+}
+
+type ProtoTaskResponse_TaskStatus int32
+
+const (
+       ProtoTaskResponse_JOB_INPROGRESS        = 2
+       ProtoTaskResponse_JOB_SUCCESS           = 3
+       ProtoTaskResponse_JOB_FAILED            = 4
+       ProtoTaskResponse_JOB_HOST_FAILURE      = 5
+       ProtoTaskResponse_JOB_UNKNOWN           = 6
+       ProtoTaskResponse_JOB_UNKNOWN_FAILURE   = 7
+)
+
+var ProtoTaskResponse_TaskStatus_name = map[int32]string{
+       2:      "JOB_INPROGRESS",
+       3:      "JOB_SUCCESS",
+       4:      "JOB_FAILED",
+       5:      "JOB_HOST_FAILURE",
+       6:      "JOB_UNKNOWN",
+       7:      "JOB_UNKNOWN_FAILURE",
+}
+var ProtoTaskResponse_TaskStatus_value = map[string]int32{
+       "JOB_INPROGRESS":       2,
+       "JOB_SUCCESS":          3,
+       "JOB_FAILED":           4,
+       "JOB_HOST_FAILURE":     5,
+       "JOB_UNKNOWN":          6,
+       "JOB_UNKNOWN_FAILURE":  7,
+}
+
+func NewProtoTaskResponse_TaskStatus(x int32) *ProtoTaskResponse_TaskStatus {
+       e := ProtoTaskResponse_TaskStatus(x)
+       return &e
+}
+func (x ProtoTaskResponse_TaskStatus) String() string {
+       return proto.EnumName(ProtoTaskResponse_TaskStatus_name, int32(x))
+}
+
+type IdentifyClient struct {
+       Hostname                *string `protobuf:"bytes,1,req,name=hostname"`
+       XXX_unrecognized        []byte
+}
+
+func (this *IdentifyClient) Reset()            { *this = IdentifyClient{} }
+func (this *IdentifyClient) String() string    { return proto.CompactTextString(this) }
+
+type ProtoJobParameter struct {
+       Key                     *string `protobuf:"bytes,1,req,name=key"`
+       Value                   *string `protobuf:"bytes,2,req,name=value"`
+       XXX_unrecognized        []byte
+}
+
+func (this *ProtoJobParameter) Reset()         { *this = ProtoJobParameter{} }
+func (this *ProtoJobParameter) String() string { return proto.CompactTextString(this) }
+
+type ProtoTaskRequest struct {
+       Jobname                 *string                 `protobuf:"bytes,1,req,name=jobname"`
+       Id                      *uint64                 `protobuf:"varint,2,req,name=id"`
+       Parameters              []*ProtoJobParameter    `protobuf:"bytes,3,rep,name=parameters"`
+       XXX_unrecognized        []byte
+}
+
+func (this *ProtoTaskRequest) Reset()          { *this = ProtoTaskRequest{} }
+func (this *ProtoTaskRequest) String() string  { return proto.CompactTextString(this) }
+
+type ProtoAcknowledgement struct {
+       Id                      *uint64                         `protobuf:"varint,1,req,name=id"`
+       Response                *ProtoAcknowledgement_AckType   `protobuf:"varint,2,req,name=response,enum=orchestra.ProtoAcknowledgement_AckType,def=1"`
+       XXX_unrecognized        []byte
+}
+
+func (this *ProtoAcknowledgement) Reset()              { *this = ProtoAcknowledgement{} }
+func (this *ProtoAcknowledgement) String() string      { return proto.CompactTextString(this) }
+
+const Default_ProtoAcknowledgement_Response ProtoAcknowledgement_AckType = ProtoAcknowledgement_ACK_OK
+
+type ProtoTaskResponse struct {
+       Id                      *uint64                         `protobuf:"varint,1,req,name=id"`
+       Status                  *ProtoTaskResponse_TaskStatus   `protobuf:"varint,3,req,name=status,enum=orchestra.ProtoTaskResponse_TaskStatus"`
+       Response                []*ProtoJobParameter            `protobuf:"bytes,4,rep,name=response"`
+       XXX_unrecognized        []byte
+}
+
+func (this *ProtoTaskResponse) Reset()         { *this = ProtoTaskResponse{} }
+func (this *ProtoTaskResponse) String() string { return proto.CompactTextString(this) }
+
+func init() {
+       proto.RegisterEnum("orchestra.ProtoAcknowledgement_AckType", ProtoAcknowledgement_AckType_name, ProtoAcknowledgement_AckType_value)
+       proto.RegisterEnum("orchestra.ProtoTaskResponse_TaskStatus", ProtoTaskResponse_TaskStatus_name, ProtoTaskResponse_TaskStatus_value)
+}
diff --git a/src/orchestra/orchestra.proto b/src/orchestra/orchestra.proto
new file mode 100644 (file)
index 0000000..f129531
--- /dev/null
@@ -0,0 +1,43 @@
+package orchestra;
+
+/* P->C : Provide Client Identity and negotiate other initial parameters */
+message IdentifyClient {
+       required string         hostname = 1;
+}
+
+message ProtoJobParameter {
+       required string         key = 1;
+       required string         value = 2;
+}
+
+/* C->P : Do Shit kthxbye */
+message ProtoTaskRequest {
+       required string         jobname = 1;
+       required uint64         id = 2;
+       repeated ProtoJobParameter      parameters = 3;
+}
+
+/* C->P, P->C : Acknowledge Message */
+message ProtoAcknowledgement {
+       required uint64         id = 1;
+       enum AckType {
+               ACK_OK = 1;
+               ACK_ERROR = 3; /* Other Error */
+       }
+       required AckType        response = 2 [default=ACK_OK];
+}
+
+/* P->C : Results from Task */
+message ProtoTaskResponse {
+       required uint64 id = 1;
+       enum TaskStatus {
+               JOB_INPROGRESS = 2;     // Client has the job.
+               JOB_SUCCESS = 3;        // everything was OK, we don't care.
+               JOB_FAILED = 4;         // the job ran ok, but told us it blew up.
+               JOB_HOST_FAILURE = 5;   // something internally blew up.
+               JOB_UNKNOWN = 6;        // What Job?
+               JOB_UNKNOWN_FAILURE = 7;// somethign went wrong, but we don't know what.
+       }
+       required TaskStatus status = 3;
+       repeated ProtoJobParameter response = 4;
+}
diff --git a/src/orchestra/shared.go b/src/orchestra/shared.go
new file mode 100644 (file)
index 0000000..ab78461
--- /dev/null
@@ -0,0 +1,82 @@
+/* various important shared defaults. */
+package orchestra
+
+import (
+       "os"
+       "net"
+       "syslog"
+       "fmt"
+       "runtime/debug"
+)
+
+const (
+       DefaultMasterPort = 2258
+       DefaultHTTPPort = 2259
+)
+
+var    logWriter  *syslog.Writer = nil
+
+func SetLogName(name string) {
+       if nil != logWriter {
+               logWriter.Close()
+               logWriter = nil
+       }
+       var err os.Error
+       logWriter, err = syslog.New(syslog.LOG_DEBUG, name)
+       MightFail(err, "Couldn't reopen syslog")
+}
+
+
+func Debug(format string, args ...interface{}) {
+       if nil != logWriter {
+               logWriter.Debug(fmt.Sprintf(format, args...))
+       }
+}
+
+func Info(format string, args ...interface{}) {
+       if nil != logWriter {
+               logWriter.Info(fmt.Sprintf(format, args...))
+       }
+}
+
+func Warn(format string, args ...interface{}) {
+       if nil != logWriter {
+               logWriter.Warning(fmt.Sprintf(format, args...))
+       }
+}
+
+func Fail(mesg string, args ...interface {}) {
+       if nil != logWriter {
+               logWriter.Err(fmt.Sprintf(mesg, args...))
+       }
+       fmt.Fprintf(os.Stderr, "ERR: "+mesg+"\n", args...);
+       os.Exit(1)
+}      
+
+func MightFail(err os.Error, mesg string, args ...interface {}) {
+       if (nil != err) {
+               imesg := fmt.Sprintf(mesg, args...)
+               Fail("%s: %s", imesg, err.String())
+       }
+}
+
+// Throws a generic assertion error, stacktraces, dies.
+// only really to be used where the runtime-time configuration
+// fucks up internally, not for user induced errors.
+func Assert(mesg string, args ...interface{}) {
+       fmt.Fprintf(os.Stderr, mesg, args...)
+       debug.PrintStack()
+       os.Exit(1)
+}
+
+func ProbeHostname() (fqdn string) {
+       var shortHostname string
+
+       shortHostname, err := os.Hostname()
+       addr, err := net.LookupHost(shortHostname)
+       MightFail(err, "Failed to get address for hostname")
+       hostnames, err := net.LookupAddr(addr[0])
+       MightFail(err, "Failed to get full hostname for address")
+
+       return hostnames[0]
+}
diff --git a/src/orchestra/wire.go b/src/orchestra/wire.go
new file mode 100644 (file)
index 0000000..fe28f61
--- /dev/null
@@ -0,0 +1,107 @@
+/* wire.go
+ *
+ * Wire Level Encapsulation
+*/
+
+package orchestra;
+
+import (
+       "os"
+       "net"
+       "fmt"
+)
+
+type WirePkt struct {
+       Type    byte
+       Length  uint16
+       Payload []byte
+}
+
+const (
+       TypeNop                 = 0
+       TypeIdentifyClient      = 1
+       TypeReadyForTask        = 2
+       TypeTaskRequest         = 3
+       TypeTaskResponse        = 4
+       TypeAcknowledgement     = 5
+)
+
+var (
+       ErrMalformedMessage = os.NewError("Malformed Message")
+       ErrUnknownMessage   = os.NewError("Unknown Message")
+)
+
+func (p *WirePkt) ValidUnidentified() bool {
+       if p.Type == TypeNop {
+               return true
+       }
+       if p.Type == TypeIdentifyClient {
+               return true
+       }
+
+       return false
+}
+
+func (p *WirePkt) Send(c net.Conn) (n int, err os.Error) {
+       n = 0
+       preamble := make([]byte, 3)
+       preamble[0] = p.Type
+       preamble[1] = byte((p.Length >> 8) & 0xFF)
+       preamble[2] = byte(p.Length & 0xFF)
+       ninc, err := c.Write(preamble)
+       n += ninc
+       if (err != nil) {
+               return n, err
+       }       
+       ninc, err = c.Write(p.Payload[0:p.Length])
+       n += ninc
+       if (err != nil) {
+               return n, err
+       }
+       return n, nil
+}
+
+func (p *WirePkt) Dump() {
+       fmt.Printf("Packet Dump: Type %d, Len %d\n", p.Type, p.Length)
+       for i := 0; i < int(p.Length); i++ {
+               if i%16 == 0 {
+                       fmt.Printf("%04x: ", i)
+               }
+               fmt.Printf("%02x ", p.Payload[i])
+               if i%16 == 15 {
+                       fmt.Println()
+               }
+       }
+       fmt.Println()
+}
+
+func Receive(c net.Conn) (msg *WirePkt, err os.Error) {
+       msg = new(WirePkt)
+       preamble := make([]byte, 3)
+
+       n, err := c.Read(preamble)
+       if err != nil {
+               return nil, err
+       }
+       if n < 3 {
+               /* short read!  wtf! err? */
+               return nil, ErrMalformedMessage
+       }
+       msg.Type = preamble[0]
+       msg.Length = (uint16(preamble[1]) << 8) | uint16(preamble[2])
+       if msg.Length > 0 {
+               msg.Payload = make([]byte, msg.Length)
+               n, err = c.Read(msg.Payload)
+               if err != nil {
+                       return nil, err
+               }
+               if n < int(msg.Length) {
+                       /* short read!  wtf! err? */
+                       return nil, ErrMalformedMessage
+               }
+       }
+
+       /* Decode! */
+       return msg, nil
+}
+
diff --git a/src/player/Makefile b/src/player/Makefile
new file mode 100644 (file)
index 0000000..a5cef90
--- /dev/null
@@ -0,0 +1,15 @@
+include $(GOROOT)/src/Make.inc
+
+TARG=player
+
+GOFILES=\
+       player.go\
+       scores.go\
+       execution.go\
+       interface.go\
+       signal.go\
+       config.go\
+       if_env.go\
+       if_pipe.go\
+
+include $(GOROOT)/src/Make.cmd
diff --git a/src/player/config.go b/src/player/config.go
new file mode 100644 (file)
index 0000000..72f13ae
--- /dev/null
@@ -0,0 +1,85 @@
+// config.go
+//
+// configuration file handling for orchestra.
+
+package main
+
+import (
+       o "orchestra"
+       "strings"
+       "github.com/kuroneko/configureit"
+       "crypto/tls"
+       "crypto/x509"
+       "os"
+)
+
+var configFile = configureit.New()
+
+func init() {
+       configFile.Add("x509 certificate", configureit.NewStringOption("/etc/orchestra/player_crt.pem"))
+       configFile.Add("x509 private key", configureit.NewStringOption("/etc/orchestra/player_key.pem"))
+       configFile.Add("ca certificates", configureit.NewPathListOption(nil))
+       configFile.Add("master", configureit.NewStringOption("conductor"))
+       configFile.Add("score directory", configureit.NewStringOption("/usr/lib/orchestra/scores"))
+       configFile.Add("player name", configureit.NewStringOption(""))
+}
+
+func GetStringOpt(key string) string {
+       cnode := configFile.Get(key)
+       if cnode == nil {
+               o.Assert("tried to get a configuration option that doesn't exist.")
+       }
+       sopt, ok := cnode.(*configureit.StringOption)
+       if !ok {
+               o.Assert("tried to get a non-string configuration option with GetStringOpt")
+       }
+       return strings.TrimSpace(sopt.Value)
+}
+
+func GetCACertList() []string {
+       cnode := configFile.Get("ca certificates")
+       if cnode == nil {
+               o.Assert("tried to get a configuration option that doesn't exist.")
+       }
+       plopt, _ := cnode.(*configureit.PathListOption)
+       return plopt.Values
+}
+
+func ConfigLoad() {
+       // attempt to open the configuration file.
+       fh, err := os.Open(*ConfigFile)
+       if nil == err {
+               defer fh.Close()
+               // reset the config File data, then reload it.
+               configFile.Reset()
+               ierr := configFile.Read(fh, 1)
+               o.MightFail(ierr, "Couldn't parse configuration")
+       } else {
+               o.Warn("Couldn't open configuration file: %s.  Proceeding anyway.", err)
+       }
+
+       // load the x509 certificates
+       x509CertFilename := GetStringOpt("x509 certificate")
+       x509PrivateKeyFilename := GetStringOpt("x509 private key")
+       CertPair, err = tls.LoadX509KeyPair(x509CertFilename, x509PrivateKeyFilename)
+       o.MightFail(err, "Couldn't load certificates")
+
+       // load the CA Certs
+       CACertPool = x509.NewCertPool()
+       caCertNames := GetCACertList()
+       if caCertNames != nil {
+               for _, filename := range caCertNames {
+                       fh, err := os.Open(filename)
+                       if err != nil {
+                               o.Warn("Whilst parsing CA certs, couldn't open %s: %s", filename, err)
+                               continue
+                       }
+                       defer fh.Close()
+                       fi, err := fh.Stat()
+                       o.MightFail(err, "Couldn't stat CA certificate file: %s", filename)
+                       data := make([]byte, fi.Size)
+                       fh.Read(data)
+                       CACertPool.AppendCertsFromPEM(data)
+               }
+       }
+}
\ No newline at end of file
diff --git a/src/player/execution.go b/src/player/execution.go
new file mode 100644 (file)
index 0000000..a1c5713
--- /dev/null
@@ -0,0 +1,159 @@
+// execution.go
+
+package main
+
+import (
+       "os"
+       "bufio"
+       "strings"
+       o "orchestra"
+)
+
+func ExecuteTask(task *TaskRequest) <-chan *TaskResponse {
+       complete  := make(chan *TaskResponse, 1)
+       go doExecution(task, complete)
+
+       return complete
+}
+
+func batchLogger(jobid uint64, errpipe *os.File) {
+       defer errpipe.Close()
+
+       r := bufio.NewReader(errpipe)
+       for {
+               lb, _, err := r.ReadLine()
+               if err == os.EOF {
+                       return
+               }
+               if err != nil {
+                       o.Warn("executionLogger failed: %s", err)
+                       return
+               }
+               o.Info("JOB %d:STDERR:%s", jobid, string(lb))
+       }
+}
+
+func peSetEnv(env []string, key string, value string) []string {
+       mkey := key+"="
+       found := false
+       for i, v := range env {
+               if strings.HasPrefix(v, mkey) {
+                       env[i] = key+"="+value
+                       found = true
+                       break
+               }
+       }
+       if !found {
+               env = append(env, key+"="+value)
+       }
+       return env
+}
+
+func doExecution(task *TaskRequest, completionChannel chan<- *TaskResponse) {
+       // we must notify the parent when we exit.
+       defer func(c chan<- *TaskResponse, task *TaskRequest) { c <- task.MyResponse }(completionChannel,task)
+
+       // first of all, verify that the score exists at all.
+       score, exists := Scores[task.Score]
+       if !exists {
+               o.Warn("job%d: Request for unknown score \"%s\"", task.Id, task.Score)
+               task.MyResponse.State = RESP_FAILED_UNKNOWN_SCORE
+               return
+       }
+       si := NewScoreInterface(task)
+       if si == nil {
+               o.Warn("job%d: Couldn't initialise Score Interface", task.Id)
+               task.MyResponse.State = RESP_FAILED_HOST_ERROR
+               return
+       }
+       if !si.Prepare() {
+               o.Warn("job%d: Couldn't Prepare Score Interface", task.Id)
+               task.MyResponse.State = RESP_FAILED_HOST_ERROR
+               return
+       }
+       defer si.Cleanup()
+
+       eenv := si.SetupProcess()
+       task.MyResponse.State = RESP_RUNNING
+
+       procenv := new(os.ProcAttr)
+       // Build the default environment.
+       procenv.Env = peSetEnv(procenv.Env, "PATH", "/usr/bin:/usr/sbin:/bin:/sbin")
+       procenv.Env = peSetEnv(procenv.Env, "IFS", " \t\n")
+       pwd, err := os.Getwd()
+       if err != nil {
+               task.MyResponse.State = RESP_FAILED_HOST_ERROR
+               o.Warn("job%d: Couldn't resolve PWD: %s", task.Id, err)
+               return
+       }
+       procenv.Env = peSetEnv(procenv.Env, "PWD", pwd)
+       // copy in the environment overrides
+       for k, v := range eenv.Environment {
+               procenv.Env = peSetEnv(procenv.Env, k, v)
+       }
+
+       // attach FDs to procenv.
+       procenv.Files = make([]*os.File, 3)
+
+       // first off, attach /dev/null to stdin and stdout
+       devNull, err := os.OpenFile(os.DevNull, os.O_RDWR | os.O_APPEND, 0666)
+       o.MightFail(err, "couldn't open DevNull")
+       defer devNull.Close()
+       for i := 0; i < 2; i++ {
+               procenv.Files[i] = devNull
+       }
+       // attach STDERR to to our logger via pipe.
+       lr, lw, err := os.Pipe()
+       o.MightFail(err, "Couldn't create pipe")
+       defer lw.Close()
+       // lr will be closed by the logger.
+       procenv.Files[2] = lw
+       // check the environment's configuration and allow it to override stdin, stdout, and FDs 3+
+       if nil != eenv.Files {
+               for i := range eenv.Files {
+                       if i < 2 {
+                               procenv.Files[i] = eenv.Files[i]
+                       } else {
+                               procenv.Files = append(procenv.Files, eenv.Files[i])
+                       }
+               }
+       }
+       var args []string = nil
+       args = append(args, eenv.Arguments...)
+
+       o.Info("job%d: Executing %s", task.Id, score.Executable)
+       go batchLogger(task.Id, lr)
+       proc, err := os.StartProcess(score.Executable, args, procenv)
+       if err != nil {
+               o.Warn("job%d: Failed to start processs", task.Id)
+               task.MyResponse.State = RESP_FAILED_HOST_ERROR
+               return
+       }
+       wm, err := proc.Wait(0)
+       if err != nil {
+               o.Warn("job%d: Error waiting for process", task.Id)
+               task.MyResponse.State = RESP_FAILED_UNKNOWN
+               // Worse of all, we don't even know if we succeeded.
+               return
+       }
+       if !(wm.WaitStatus.Signaled() || wm.WaitStatus.Exited()) {
+               o.Assert("Non Terminal notification received when not expected.")
+               return
+       }
+       if wm.WaitStatus.Signaled() {
+               o.Warn("job%d: Process got signalled", task.Id)
+               task.MyResponse.State = RESP_FAILED_UNKNOWN
+               return
+       }
+       if wm.WaitStatus.Exited() {
+               if 0 == wm.WaitStatus.ExitStatus() {
+                       o.Warn("job%d: Process exited OK", task.Id)
+                       task.MyResponse.State = RESP_FINISHED
+               } else {
+                       o.Warn("job%d: Process exited with failure", task.Id)
+                       task.MyResponse.State = RESP_FAILED
+               }
+               return
+       }
+       o.Assert("Should never get here.")
+}
\ No newline at end of file
diff --git a/src/player/if_env.go b/src/player/if_env.go
new file mode 100644 (file)
index 0000000..260a6e5
--- /dev/null
@@ -0,0 +1,42 @@
+// if_env
+//
+// 'env' score interface
+
+package main
+
+const (
+       envEnvironmentPrefix = "ORC_"
+)
+
+func init() {
+       RegisterInterface("env", newEnvInterface)
+}
+
+type EnvInterface struct {
+       task    *TaskRequest
+}
+
+func newEnvInterface(task *TaskRequest) (iface ScoreInterface) {
+       ei := new(EnvInterface)
+       ei.task = task
+
+       return ei
+}
+
+func (ei *EnvInterface) Prepare() bool {
+       // does nothing!
+       return true
+}
+
+func (ei *EnvInterface) SetupProcess() (ee *ExecutionEnvironment) {
+       ee = NewExecutionEnvironment()
+       for k,v := range ei.task.Params {
+               ee.Environment[envEnvironmentPrefix+k] = v
+       }
+
+       return ee
+}
+
+func (ei *EnvInterface) Cleanup() {
+       // does nothing!
+}
\ No newline at end of file
diff --git a/src/player/if_pipe.go b/src/player/if_pipe.go
new file mode 100644 (file)
index 0000000..eab72f4
--- /dev/null
@@ -0,0 +1,92 @@
+// if_pipe
+//
+// 'pipe' score interface
+//
+// The PIPE score interface works much like the ENV interace, but also attaches
+// a pipe to STDOUT which captures <x>=<y> repsonse values.
+
+package main
+
+import (
+       "strings"
+       "bufio"
+       "os"
+       o "orchestra"
+)
+
+const (
+       pipeEnvironmentPrefix = "ORC_"
+)
+
+func init() {
+       RegisterInterface("pipe", newPipeInterface)
+}
+
+type PipeInterface struct {
+       task    *TaskRequest
+       pipew   *os.File
+}
+
+func newPipeInterface(task *TaskRequest) (iface ScoreInterface) {
+       ei := new(PipeInterface)
+       ei.task = task
+
+       return ei
+}
+
+
+// pipeListener is the goroutine that sits on the stdout pipe and
+// processes what it sees.
+func pipeListener(task *TaskRequest, outpipe *os.File) {
+       defer outpipe.Close()
+
+       r := bufio.NewReader(outpipe)
+       for {
+               lb, _, err := r.ReadLine()
+               if err == os.EOF {
+                       return
+               }
+               if err != nil {
+                       o.Warn("pipeListener failed: %s", err)
+                       return
+               }
+               linein := string(lb)
+               if strings.Index(linein, "=") >= 0 {
+                       bits := strings.SplitN(linein, "=", 2)
+                       task.MyResponse.Response[bits[0]] = bits[1]
+               }
+       }
+}
+
+
+func (ei *PipeInterface) Prepare() bool {
+       lr, lw, err := os.Pipe()
+       if (err != nil) {
+               return false
+       }
+       // save the writing end of the pipe so we can close our local end of it during cleanup.
+       ei.pipew = lw
+
+       // start the pipe listener
+       go pipeListener(ei.task, lr)
+
+       return true
+}
+
+func (ei *PipeInterface) SetupProcess() (ee *ExecutionEnvironment) {
+       ee = NewExecutionEnvironment()
+       for k,v := range ei.task.Params {
+               ee.Environment[pipeEnvironmentPrefix+k] = v
+       }
+       ee.Files = make([]*os.File, 2)
+       ee.Files[1] = ei.pipew
+       return ee
+}
+
+func (ei *PipeInterface) Cleanup() {
+       // close the local copy of the pipe.
+       //
+       // if the child didn't start, this will also EOF the
+       // pipeListener which will clean up that goroutine.
+       ei.pipew.Close()
+}
\ No newline at end of file
diff --git a/src/player/interface.go b/src/player/interface.go
new file mode 100644 (file)
index 0000000..580bb4e
--- /dev/null
@@ -0,0 +1,77 @@
+// interface.go
+//
+// Score Interface
+//
+// This provides the interface that the score interfaces need to conform to.
+
+package main
+
+import (
+       o "orchestra"
+       "os"
+)
+
+var (
+       interfaces      = make(map[string]func(*TaskRequest)(ScoreInterface))
+)
+
+type ExecutionEnvironment struct {
+       Environment     map[string]string
+       Arguments       []string
+       Files           []*os.File
+}
+
+func NewExecutionEnvironment() (ee *ExecutionEnvironment) {
+       ee = new(ExecutionEnvironment)
+       ee.Environment = make(map[string]string)
+
+       return ee
+}
+
+type ScoreInterface interface {
+       // prepare gets called up front before the fork.  It should do
+       // any/all lifting required.
+       //
+       // returns false if there are any problems.
+       Prepare() bool
+
+       // SetupEnvironment gets called prior to starting the child
+       // process.  It should return an ExecutionEnvironment with the
+       // bits filled in the way it wants.
+       SetupProcess() *ExecutionEnvironment
+
+       // Cleanup is responsible for mopping up the mess, filing any
+       // results that need to be stored, etc.  This will be called
+       // only from the main thread to ensure that results can be updated
+       // safely.
+       Cleanup()
+}
+
+func HasInterface(ifname string) bool {
+       _, exists := interfaces[ifname]
+
+       return exists
+}
+
+func RegisterInterface(ifname string, initfunc func(*TaskRequest)(ScoreInterface)) {
+       _, exists := interfaces[ifname]
+       if exists {
+               o.Assert("Multiple attempts to register %s interface", ifname)
+       }
+       interfaces[ifname] = initfunc
+}
+
+func NewScoreInterface(task *TaskRequest) (iface ScoreInterface) {
+       score, exists := Scores[task.Score]
+       if !exists {
+               return nil
+       }
+       if !HasInterface(score.Interface) {
+               return nil
+       }
+       ifinit, _ := interfaces[score.Interface]
+       
+       iface = ifinit(task)
+
+       return iface
+}
\ No newline at end of file
diff --git a/src/player/player.go b/src/player/player.go
new file mode 100644 (file)
index 0000000..0bb9b54
--- /dev/null
@@ -0,0 +1,373 @@
+/* player.go
+*/
+
+package main
+
+import (
+       "os"
+       "fmt"
+       "flag"
+       o       "orchestra"
+       "crypto/tls"
+       "crypto/x509"
+       "net"
+       "time"
+       "container/list"
+)
+
+const (
+       InitialReconnectDelay           = 5e9
+       MaximumReconnectDelay           = 300e9
+       ReconnectDelayScale             = 2
+       KeepaliveDelay                  = 200e9
+       RetryDelay                      = 5e9
+)
+
+type NewConnectionInfo struct {
+       conn net.Conn
+       timeout int64
+}
+
+var (
+       ConfigFile              = flag.String("config-file", "/etc/orchestra/player.conf", "Path to the configuration file")    
+       DontVerifyPeer          = flag.Bool("dont-verify-peer", false, "Ignore TLS verification for the peer")
+       CertPair tls.Certificate
+       CACertPool *x509.CertPool
+       LocalHostname string    = ""
+       
+       receivedMessage         = make(chan *o.WirePkt)
+       lostConnection          = make(chan int)
+       reloadScores            = make(chan int, 2)
+       pendingQueue            = list.New()
+       unacknowledgedQueue     = list.New()
+       newConnection           = make(chan *NewConnectionInfo)
+       pendingTaskRequest      = false
+       InvalidValueError       = os.NewError("Invalid value")
+)
+
+func getNextPendingTask() (task *TaskRequest) {
+       e := pendingQueue.Front()
+       if e != nil {
+               task, _ = e.Value.(*TaskRequest)
+               pendingQueue.Remove(e)
+       }
+       return task
+}
+
+func appendPendingTask(task *TaskRequest) {
+       pendingTaskRequest = false
+       pendingQueue.PushBack(task)
+}
+
+func getNextUnacknowledgedResponse() (resp *TaskResponse) {
+       e := unacknowledgedQueue.Front()
+       if e != nil {
+               resp, _ = e.Value.(*TaskResponse)
+               unacknowledgedQueue.Remove(e)
+       }
+       return resp
+}
+
+func appendUnacknowledgedResponse(resp *TaskResponse) {
+       resp.RetryTime = time.Nanoseconds() + RetryDelay
+       unacknowledgedQueue.PushBack(resp)
+}
+
+func acknowledgeResponse(jobid uint64) {
+       for e := unacknowledgedQueue.Front(); e != nil; e = e.Next() {
+               resp := e.Value.(*TaskResponse)
+               if resp.id == jobid {
+                       unacknowledgedQueue.Remove(e)
+               }
+       }
+}
+
+func sendResponse(c net.Conn, resp *TaskResponse) {
+       //FIXME: update retry time on Response
+       o.Debug("Sending Response!")
+       ptr := resp.Encode()
+       p, err := o.Encode(ptr)
+       o.MightFail(err, "Failed to encode response")
+       _, err = p.Send(c)
+       if err != nil {
+               o.Warn("Transmission error: %s", err)
+               c.Close()
+               prequeueResponse(resp)
+               lostConnection <- 1
+       } else {
+               appendUnacknowledgedResponse(resp)
+       }
+}
+
+func prequeueResponse(resp *TaskResponse) {
+       unacknowledgedQueue.PushFront(resp)
+}
+
+func Reader(conn net.Conn) {
+       defer func(l chan int) {
+               l <- 1
+       }(lostConnection)
+
+       for {
+               pkt, err := o.Receive(conn)
+               if (err != nil) {
+                       o.Warn("Error receiving message: %s", err)
+                       break;
+               }
+               receivedMessage <- pkt
+       }       
+}
+
+func handleNop(c net.Conn, message interface{}) {
+       o.Debug("NOP Received")
+}
+
+func handleIllegal(c net.Conn, message interface{}) {
+       o.Fail("Got Illegal Message")
+}
+
+func handleRequest(c net.Conn, message interface{}) {
+       o.Debug("Request Recieved.  Decoding!")
+       ptr, ok := message.(*o.ProtoTaskRequest)
+       if !ok {
+               o.Assert("CC stuffed up - handleRequest got something that wasn't a ProtoTaskRequest.")
+       }
+       task := TaskFromProto(ptr)
+       /* search the registry for the task */
+       o.Debug("Request for Job.ID %d", task.Id)
+       existing := TaskGet(task.Id)
+       if nil != existing {
+               if (existing.MyResponse.IsFinished()) {
+                       o.Debug("job%d: Resending Response", task.Id)
+                       sendResponse(c, existing.MyResponse)
+               }
+       } else {
+               // check to see if we have the score
+               // add the Job to our Registry
+               task.MyResponse = NewTaskResponse()
+               task.MyResponse.id = task.Id
+               task.MyResponse.State = RESP_PENDING            
+               TaskAdd(task)
+               o.Info("Added New Task (Job ID %d) to our local registry", task.Id)
+               // and then push it onto the pending job list so we know it needs actioning.
+               appendPendingTask(task)
+       }
+}
+
+func handleAck(c net.Conn, message interface{}) {
+       o.Debug("Ack Received")
+       ack, ok := message.(*o.ProtoAcknowledgement)
+       if !ok {
+               o.Assert("CC stuffed up - handleAck got something that wasn't a ProtoAcknowledgement.")
+       }
+       if ack.Id != nil {
+               acknowledgeResponse(*ack.Id)
+       }
+}
+
+
+var dispatcher = map[uint8] func(net.Conn, interface{}) {
+       o.TypeNop:              handleNop,
+       o.TypeTaskRequest:      handleRequest,
+       o.TypeAcknowledgement:  handleAck,
+
+       /* P->C only messages, should never appear on the wire to us. */
+       o.TypeIdentifyClient:   handleIllegal,
+       o.TypeReadyForTask:     handleIllegal,
+       o.TypeTaskResponse:     handleIllegal,
+}
+
+func connectMe(initialDelay int64) {
+       var backOff int64 = initialDelay
+       for {
+               // Sleep first.
+               if backOff > 0 {
+                       o.Info("Sleeping for %d seconds", backOff/1e9)
+                       err := time.Sleep(backOff)
+                       o.MightFail(err, "Couldn't Sleep")
+                       backOff *= ReconnectDelayScale
+                       if backOff > MaximumReconnectDelay {
+                               backOff = MaximumReconnectDelay
+                       }
+               } else {
+                       backOff = InitialReconnectDelay
+               }
+
+               tconf := &tls.Config{
+               RootCAs: CACertPool,
+               }
+               tconf.Certificates = append(tconf.Certificates, CertPair)
+
+               // update our local hostname.
+               LocalHostname = GetStringOpt("player name")
+               if (LocalHostname == "") {
+                       LocalHostname = o.ProbeHostname()
+                       o.Warn("No hostname provided - probed hostname: %s", LocalHostname)
+               }
+
+               masterHostname := GetStringOpt("master")
+
+               raddr := fmt.Sprintf("%s:%d", masterHostname, 2258)
+               o.Info("Connecting to %s", raddr)
+               conn, err := tls.Dial("tcp", raddr, tconf)              
+               if err == nil && !*DontVerifyPeer {
+                       conn.Handshake()
+                       err = conn.VerifyHostname(masterHostname)
+               }
+               if err == nil {
+                       nc := new(NewConnectionInfo)
+                       nc.conn = conn
+                       nc.timeout = backOff
+                       newConnection <- nc
+                       return
+               }
+               o.Warn("Couldn't connect to master: %s", err)
+       }
+}
+
+func ProcessingLoop() {
+       var     conn                    net.Conn                = nil
+       var     nextRetryResp           *TaskResponse           = nil
+       var     taskCompletionChan      <-chan *TaskResponse    = nil
+       var     connectDelay            int64                   = 0
+       var     doScoreReload           bool                    = false
+       // kick off a new connection attempt.
+       go connectMe(connectDelay)
+
+       // and this is where we spin!
+       for {   
+               var retryDelay int64 = 0
+               var retryChan  <-chan int64 = nil
+
+               if conn != nil {
+                       for nextRetryResp == nil {
+                               nextRetryResp = getNextUnacknowledgedResponse()
+                               if nil == nextRetryResp {
+                                       break
+                               }
+                               retryDelay = nextRetryResp.RetryTime - time.Nanoseconds()
+                               if retryDelay < 0 {
+                                       sendResponse(conn, nextRetryResp)
+                                       nextRetryResp = nil
+                               }
+                       }
+                       if nextRetryResp != nil {
+                               retryChan = time.After(retryDelay)
+                       }
+               }
+               if taskCompletionChan == nil {
+                       nextTask := getNextPendingTask()
+                       if nextTask != nil {
+                               taskCompletionChan = ExecuteTask(nextTask)
+                       } else {
+                               if conn != nil && !pendingTaskRequest {
+                                       o.Debug("Asking for trouble")
+                                       p := o.MakeReadyForTask()
+                                       p.Send(conn)
+                                       o.Debug("Sent Request for trouble")
+                                       pendingTaskRequest = true
+                               }
+                       }
+               }
+               select {
+               // Currently executing job finishes.
+               case newresp := <- taskCompletionChan:
+                       o.Debug("job%d: Completed with State %s\n", newresp.id, newresp.State)
+                       // preemptively set a retrytime.
+                       newresp.RetryTime = time.Nanoseconds()
+                       // ENOCONN - sub it in as our next retryresponse, and prepend the old one onto the queue.
+                       if nil == conn {
+                               if nil != nextRetryResp {
+                                       prequeueResponse(nextRetryResp)
+                               }
+                               o.Debug("job%d: Queuing Initial Response", newresp.id)
+                               nextRetryResp = newresp
+                       } else {
+                               o.Debug("job%d: Sending Initial Response", newresp.id)
+                               sendResponse(conn, newresp)
+                       }
+                       if doScoreReload {
+                               o.Info("Performing Deferred score reload")
+                               LoadScores()
+                               doScoreReload = false
+                       }
+                       taskCompletionChan = nil
+               // If the current unacknowledged response needs a retry, send it.
+               case <-retryChan:
+                       sendResponse(conn, nextRetryResp)
+                       nextRetryResp = nil
+               // New connection.  Set up the receiver thread and Introduce ourselves.
+               case nci := <-newConnection:
+                       if conn != nil {
+                               conn.Close()
+                       }
+                       conn = nci.conn
+                       connectDelay = nci.timeout
+                       pendingTaskRequest = false
+
+                       // start the reader
+                       go Reader(conn)
+               
+                       /* Introduce ourself */
+                       p := o.MakeIdentifyClient(LocalHostname)
+                       p.Send(conn)
+               // Lost connection.  Shut downt he connection.
+               case <-lostConnection:
+                       o.Warn("Lost Connection to Master")
+                       conn.Close()
+                       conn = nil
+                       // restart the connection attempts
+                       go connectMe(connectDelay)
+               // Message received from master.  Decode and action.
+               case p := <-receivedMessage:
+                       // because the message could possibly be an ACK, push the next retry response back into the queue so acknowledge can find it.
+                       if nil != nextRetryResp {
+                               prequeueResponse(nextRetryResp)
+                               nextRetryResp = nil
+                       }
+                       var upkt interface{} = nil
+                       if p.Length > 0 {
+                               var err os.Error
+                               upkt, err = p.Decode()
+                               o.MightFail(err, "Couldn't decode packet from master")
+                       }
+                       handler, exists := dispatcher[p.Type]
+                       if (exists) {
+                               connectDelay = 0
+                               handler(conn, upkt)
+                       } else {
+                               o.Fail("Unhandled Pkt Type %d", p.Type)
+                       }
+               // Reload scores
+               case <-reloadScores:
+                       // fortunately this is actually completely safe as 
+                       // long as nobody's currently executing.
+                       // who'd have thunk it?
+                       if taskCompletionChan == nil {
+                               o.Info("Reloading scores")
+                               LoadScores()
+                       } else {
+                               o.Info("Deferring score reload (execution in progress)")
+                               doScoreReload = true
+                       }
+               // Keepalive delay expired.  Send Nop.
+               case <-time.After(KeepaliveDelay):
+                       if conn == nil {
+                               break
+                       }
+                       o.Debug("Sending Nop")
+                       p := o.MakeNop()
+                       p.Send(conn)
+               }
+       }
+}
+
+func main() {
+       o.SetLogName("player")
+
+       flag.Parse()
+
+       ConfigLoad()
+       LoadScores()
+       ProcessingLoop()
+}
diff --git a/src/player/registry.go b/src/player/registry.go
new file mode 100644 (file)
index 0000000..2466bb7
--- /dev/null
@@ -0,0 +1,100 @@
+// registry.go
+//
+// Job Registry.
+//
+// The Registry provides a 'threadsafe' interface to various global
+// information stores.
+//
+// The registry dispatch thread is forbidden from performing any work
+// that is likely to block.  Result channels must be buffered with
+// enough space for the full set of results.
+
+package main
+
+const (
+       requestAddTask                  = iota
+       requestGetTask
+
+       requestQueueSize                = 10
+)
+
+type registryRequest struct {
+       operation               int
+       id                      uint64
+       task                    *TaskRequest
+       responseChannel         chan *registryResponse
+}
+
+type registryResponse struct {
+       success                 bool
+       task                    *TaskRequest
+}
+       
+var chanRequest = make(chan *registryRequest, requestQueueSize)
+
+// bake a minimal request structure together.
+func newRequest(wants_response bool) (r *registryRequest) {
+       r = new(registryRequest)
+       if wants_response {
+               r.responseChannel = make(chan *registryResponse, 1)
+       }
+
+       return r
+}
+
+// Add a Task to the registry.  Return true if successful, returns
+// false if the task is lacking critical information (such as a Job Id)
+// and can't be registered.
+func TaskAdd(task *TaskRequest) bool {
+       rr := newRequest(true)
+       rr.operation = requestAddTask
+       rr.task = task
+
+       chanRequest <- rr
+       resp := <- rr.responseChannel 
+       return resp.success
+}
+
+// Get a Task from the registry.  Returns the task if successful,
+// returns nil if the task couldn't be found.
+func TaskGet(id uint64) *TaskRequest {
+       rr := newRequest(true)
+       rr.operation = requestGetTask
+       rr.id = id
+
+       chanRequest <- rr
+       resp := <- rr.responseChannel
+       return resp.task
+}
+
+func manageRegistry() {
+       taskRegister := make(map[uint64]*TaskRequest)
+
+       for {
+               req := <- chanRequest
+               resp := new (registryResponse)
+               switch (req.operation) {
+               case requestAddTask:
+                       if nil != req.task {
+                               // and register the job
+                               taskRegister[req.task.Id] = req.task
+                               resp.success = true
+                       } else {
+                               resp.success = false
+                       }
+               case requestGetTask:
+                       task, exists := taskRegister[req.id]
+                       resp.success = exists
+                       if exists {
+                               resp.task = task
+                       }
+               }
+               if req.responseChannel != nil {
+                       req.responseChannel <- resp
+               }
+       }
+}
+
+func init() {
+       go manageRegistry()
+}
diff --git a/src/player/resp_state.go b/src/player/resp_state.go
new file mode 100644 (file)
index 0000000..32e11ac
--- /dev/null
@@ -0,0 +1,112 @@
+// resp_state.go
+
+package main
+
+import (
+       "os"
+       "json"
+)
+
+type ResponseState int
+
+const (
+       // Response states
+       RESP_PENDING                    = ResponseState(iota)   // internal state, not wire.
+       RESP_RUNNING
+       RESP_FINISHED
+       RESP_FAILED
+       RESP_FAILED_UNKNOWN_SCORE
+       RESP_FAILED_HOST_ERROR
+       RESP_FAILED_UNKNOWN
+)
+
+func (rs ResponseState) String() (strout string) {
+       switch rs {
+       case RESP_RUNNING:
+               return "PENDING"
+       case RESP_FINISHED:
+               return "OK"
+       case RESP_FAILED:
+               return "FAIL"
+       case RESP_FAILED_UNKNOWN_SCORE:
+               return "UNK_SCORE"
+       case RESP_FAILED_HOST_ERROR:
+               return "HOST_ERROR"
+       case RESP_FAILED_UNKNOWN:
+               return "UNKNOWN_FAILURE"
+       }
+       return ""
+}
+
+func (rs ResponseState) MarshalJSON() (out []byte, err os.Error) {
+       strout := rs.String()
+       if strout != "" {
+               return json.Marshal(strout)
+       }
+       return nil, InvalidValueError
+}
+
+func (rs ResponseState) UnmarshalJSON(in []byte) (err os.Error) {
+       var statestr string
+       err = json.Unmarshal(in, &statestr)
+       if err != nil {
+               return err
+       }
+       switch statestr {
+       case "PENDING":
+               rs = RESP_PENDING
+       case "OK":
+               rs = RESP_FINISHED
+       case "FAIL":
+               rs = RESP_FAILED
+       case "UNK_SCORE":
+               rs = RESP_FAILED_UNKNOWN_SCORE
+       case "HOST_ERROR":
+               rs = RESP_FAILED_HOST_ERROR
+       case "UNKNOWN_FAILURE":
+               rs = RESP_FAILED_UNKNOWN
+       default:
+               return InvalidValueError
+       }
+       return nil
+}
+
+func (rs ResponseState) Finished() bool {
+       switch rs {
+       case RESP_FINISHED:
+               fallthrough
+       case RESP_FAILED:
+               fallthrough
+       case RESP_FAILED_UNKNOWN_SCORE:
+               fallthrough
+       case RESP_FAILED_HOST_ERROR:
+               fallthrough
+       case RESP_FAILED_UNKNOWN:
+               return true
+       }
+       return false
+}
+
+// true if the response says the task failed.  false otherwise.
+func (rs ResponseState) Failed() bool {
+       switch rs {
+       case RESP_RUNNING:
+               fallthrough
+       case RESP_FINISHED:
+               return false
+       }
+       return true
+}
+
+// true if the task can be tried.
+// precond:  DidFail is true, job is a ONE_OF job.
+// must return false otherwise.
+func (rs ResponseState) CanRetry() bool {
+       switch rs {
+       case RESP_FAILED_UNKNOWN_SCORE:
+               fallthrough
+       case RESP_FAILED_HOST_ERROR:
+               return true
+       }
+       return false
+}
\ No newline at end of file
diff --git a/src/player/scores.go b/src/player/scores.go
new file mode 100644 (file)
index 0000000..6f727e3
--- /dev/null
@@ -0,0 +1,134 @@
+// scores.go
+//
+// Score handling
+//
+// In here, we have the probing code that learns about scores, reads
+// their configuration files, and does the heavy lifting for launching
+// them, doing the privilege drop, etc.
+
+package main
+
+import (
+       "os"
+       "io"
+       "strings"
+       o "orchestra"
+       "path"
+       "github.com/kuroneko/configureit"
+)
+
+type ScoreInfo struct {
+       Name            string
+       Executable      string
+       InitialPwd      string
+       InitialEnv      map[string]string
+
+       Interface       string
+
+       Config          *configureit.Config
+}
+
+type ScoreExecution struct {
+       Score   *ScoreInfo
+       Task    *TaskRequest
+}
+       
+
+func NewScoreInfo() (si *ScoreInfo) {
+       si = new (ScoreInfo)
+       si.InitialEnv = make(map[string]string)
+
+       config := NewScoreInfoConfig()
+       si.updateFromConfig(config)
+
+       return si
+}
+
+func NewScoreInfoConfig() (config *configureit.Config) {
+       config = configureit.New()
+
+       config.Add("interface", configureit.NewStringOption("env"))
+       config.Add("dir", configureit.NewStringOption(""))
+       config.Add("path", configureit.NewStringOption("/usr/bin:/bin"))
+       config.Add("user", configureit.NewUserOption(""))
+
+       return config
+}
+
+func (si *ScoreInfo) updateFromConfig(config *configureit.Config) {
+       // propogate PATH overrides.
+       opt := config.Get("dir")
+       sopt, _ := opt.(*configureit.StringOption)
+       si.InitialEnv["PATH"] = sopt.Value
+
+       // set the interface type.
+       opt = config.Get("interface")
+       sopt, _ = opt.(*configureit.StringOption)
+       si.Interface = sopt.Value
+
+       // propogate initial Pwd
+       opt = config.Get("dir")
+       sopt, _ = opt.(*configureit.StringOption)
+       si.InitialPwd = sopt.Value      
+}
+
+var (
+       Scores          map[string]*ScoreInfo
+)
+
+func ScoreConfigure(si *ScoreInfo, r io.Reader) {
+       config := NewScoreInfoConfig()
+       err := config.Read(r, 1)
+       o.MightFail(err, "Error Parsing Score Configuration for %s", si.Name)
+       si.updateFromConfig(config)
+}
+
+func LoadScores() {
+       scoreDirectory := GetStringOpt("score directory")
+
+       dir, err := os.Open(scoreDirectory)
+       o.MightFail(err, "Couldn't open Score directory")
+       defer dir.Close()
+
+       Scores = make(map[string]*ScoreInfo)
+       
+       files, err := dir.Readdir(-1)
+       for i := range files {
+               // skip ., .. and other dotfiles.
+               if strings.HasPrefix(files[i].Name, ".") {
+                       continue
+               }
+               // emacs backup files.  ignore these.
+               if strings.HasSuffix(files[i].Name, "~") || strings.HasPrefix(files[i].Name, "#") {
+                       continue
+               }
+               // .conf is reserved for score configurations.
+               if strings.HasSuffix(files[i].Name, ".conf") {
+                       continue
+               }
+               if !files[i].IsRegular() && !files[i].IsSymlink() {
+                       continue
+               }
+
+               // check for the executionable bit
+               if (files[i].Permission() & 0111) != 0 {
+                       fullpath := path.Join(scoreDirectory, files[i].Name)
+                       conffile := fullpath+".conf"
+                       o.Warn("Considering %s as score", files[i].Name)
+
+                       si := NewScoreInfo()
+                       si.Name = files[i].Name
+                       si.Executable = fullpath
+               
+                       conf, err := os.Open(conffile)
+                       if err == nil {
+                               o.Warn("Parsing configuration for %s", fullpath)
+                               ScoreConfigure(si, conf)
+                               conf.Close()
+                       } else {
+                               o.Warn("Couldn't open config file for %s, assuming defaults: %s", files[i].Name, err)
+                       }
+                       Scores[files[i].Name] = si
+               }
+       }
+}
\ No newline at end of file
diff --git a/src/player/signal.go b/src/player/signal.go
new file mode 100644 (file)
index 0000000..626fb73
--- /dev/null
@@ -0,0 +1,49 @@
+/* signal.go
+ *
+ * Signal Handlers
+ */
+
+package main
+
+import (
+       "os/signal"
+       "os"
+       "syscall"
+       "fmt"
+       o "orchestra"
+)
+
+
+// handle the signals.  By default, we ignore everything, but the
+// three terminal signals, HUP, INT, TERM, we want to explicitly
+// handle.
+func signalHandler() {
+       for {
+               sig := <-signal.Incoming
+
+               ux, ok := sig.(os.UnixSignal)
+               if !ok {
+                       o.Warn("Couldn't handle signal %s, Coercion failed", sig)
+                       continue
+               }
+
+               switch int(ux) {
+               case syscall.SIGHUP:
+                       o.Warn("Reloading Configuration")
+                       reloadScores <- 1
+               case syscall.SIGINT:
+                       fmt.Fprintln(os.Stderr, "Interrupt Received - Terminating")
+                       //FIXME: Gentle Shutdown
+                       os.Exit(1)
+               case syscall.SIGTERM:
+                       fmt.Fprintln(os.Stderr, "Terminate Received - Terminating")
+                       //FIXME: Gentle Shutdown
+                       os.Exit(2)
+               }
+       }
+
+}
+
+func init() {
+       go signalHandler()
+}
diff --git a/src/player/task_request.go b/src/player/task_request.go
new file mode 100644 (file)
index 0000000..b571c27
--- /dev/null
@@ -0,0 +1,33 @@
+// task_request.go
+//
+
+package main
+
+import (
+       o "orchestra"
+)
+
+type TaskRequest struct {
+       Id              uint64                          `json:"id"`
+       Score           string                          `json:"score"`
+       Params          map[string]string               `json:"params"`
+       MyResponse      *TaskResponse                   `json:"response"`
+}
+
+func NewTaskRequest() (req *TaskRequest) {
+       req = new(TaskRequest)
+       return req
+}
+
+/* Map a wire task to an internal Task Request.
+*/
+func TaskFromProto(ptr *o.ProtoTaskRequest) (t *TaskRequest) {
+       t = NewTaskRequest()
+       
+       t.Score = *(ptr.Jobname)
+       t.Id = *(ptr.Id)
+       t.Params = o.MapFromProtoJobParameters(ptr.Parameters)
+
+       return t
+}
+
diff --git a/src/player/task_response.go b/src/player/task_response.go
new file mode 100644 (file)
index 0000000..f284b5c
--- /dev/null
@@ -0,0 +1,88 @@
+// task_response.go
+//
+package main
+
+import (
+       o "orchestra"
+)
+
+type TaskResponse struct {
+       id              uint64                          
+       State           ResponseState                   `json:"state"`
+       Response        map[string]string               `json:"response"`
+       // player only fields
+       RetryTime       int64                           `json:"retrytime"`
+}
+
+// Response related magic
+
+func NewTaskResponse() (resp *TaskResponse) {
+       resp = new(TaskResponse)
+       resp.Response = make(map[string]string)
+
+       return resp
+}
+
+func (resp *TaskResponse) IsFinished() bool {
+       return resp.State.Finished()
+}
+
+func (resp *TaskResponse) DidFail() bool {
+       return resp.State.Failed()
+}
+
+func (resp *TaskResponse) CanRetry() bool {
+       return resp.State.CanRetry()
+}
+
+
+func ResponseFromProto(ptr *o.ProtoTaskResponse) (r *TaskResponse) {
+       r = new(TaskResponse)
+
+       switch (*(ptr.Status)) {
+       case o.ProtoTaskResponse_JOB_INPROGRESS:
+               r.State = RESP_RUNNING
+       case o.ProtoTaskResponse_JOB_SUCCESS:
+               r.State = RESP_FINISHED
+       case o.ProtoTaskResponse_JOB_FAILED:
+               r.State = RESP_FAILED
+       case o.ProtoTaskResponse_JOB_HOST_FAILURE:
+               r.State = RESP_FAILED_HOST_ERROR
+       case o.ProtoTaskResponse_JOB_UNKNOWN:
+               r.State = RESP_FAILED_UNKNOWN_SCORE
+       case o.ProtoTaskResponse_JOB_UNKNOWN_FAILURE:
+               fallthrough
+       default:
+               r.State = RESP_FAILED_UNKNOWN
+       }
+
+       r.id = *(ptr.Id)
+       r.Response = o.MapFromProtoJobParameters(ptr.Response)
+
+       return r
+}
+
+func (resp *TaskResponse) Encode() (ptr *o.ProtoTaskResponse) {
+       ptr = new(o.ProtoTaskResponse)
+       
+       switch resp.State {
+       case RESP_RUNNING:
+               ptr.Status = o.NewProtoTaskResponse_TaskStatus(o.ProtoTaskResponse_JOB_INPROGRESS)
+       case RESP_FINISHED:
+               ptr.Status = o.NewProtoTaskResponse_TaskStatus(o.ProtoTaskResponse_JOB_SUCCESS)
+       case RESP_FAILED:
+               ptr.Status = o.NewProtoTaskResponse_TaskStatus(o.ProtoTaskResponse_JOB_FAILED)
+       case RESP_FAILED_UNKNOWN_SCORE:
+               ptr.Status = o.NewProtoTaskResponse_TaskStatus(o.ProtoTaskResponse_JOB_UNKNOWN)
+       case RESP_FAILED_HOST_ERROR:
+               ptr.Status = o.NewProtoTaskResponse_TaskStatus(o.ProtoTaskResponse_JOB_HOST_FAILURE)
+       case RESP_FAILED_UNKNOWN:
+               ptr.Status = o.NewProtoTaskResponse_TaskStatus(o.ProtoTaskResponse_JOB_UNKNOWN_FAILURE)
+       }
+       ptr.Id = new(uint64)
+       *ptr.Id = resp.id
+       ptr.Response = o.ProtoJobParametersFromMap(resp.Response)
+
+       return ptr
+}
+
diff --git a/src/submitjob/Makefile b/src/submitjob/Makefile
new file mode 100644 (file)
index 0000000..f5f972e
--- /dev/null
@@ -0,0 +1,8 @@
+include $(GOROOT)/src/Make.inc
+
+TARG=submitjob
+
+GOFILES=\
+       submitjob.go\
+
+include $(GOROOT)/src/Make.cmd
diff --git a/src/submitjob/submitjob.go b/src/submitjob/submitjob.go
new file mode 100644 (file)
index 0000000..2142e5a
--- /dev/null
@@ -0,0 +1,142 @@
+// submitjob.go
+//
+// A sample Orchestra submission client.
+
+package main
+
+import (
+       "io"
+       "net"
+       "json"
+       "flag"
+       "fmt"
+       "os"
+)
+
+type JobRequest struct {
+       Op      string          `json:"op"`
+       Score   string          `json:"score"`
+       Players []string        `json:"players"`
+       Scope   string          `json:"scope"`
+       Params  map[string]string       `json:"params"`
+}
+
+var (
+       AllOf        = flag.Bool("all-of", false, "Send request to all named players")
+       AudienceSock = flag.String("audience-sock", "/var/spool/orchestra/conductor.sock", "Path for the audience submission socket")
+)
+
+func NewJobRequest() (jr *JobRequest) {
+       jr = new(JobRequest)
+       jr.Params = make(map[string]string)
+
+       return jr
+}
+
+func Usage() {
+       fmt.Fprintf(os.Stderr, "Usage:\n")
+       fmt.Fprintf(os.Stderr, "  %s [<options>] <score> <player1> [<player2>...] [! [<key1> <value1>]...]\n", os.Args[0])
+       flag.PrintDefaults()
+}
+
+func main() {
+       flag.Usage = Usage
+       flag.Parse()
+
+       args := flag.Args()
+
+       if len(args) < 2 {
+               flag.Usage()
+               os.Exit(1)
+       }
+
+       jr := NewJobRequest()
+       jr.Op = "queue"
+       jr.Score = args[0]
+       if *AllOf {
+               jr.Scope = "all"
+       } else {
+               jr.Scope = "one"
+       }
+
+       var k int
+       for k = 1; k < len(args); k++ {
+               if args[k] == "!" {
+                       break;
+               }
+               insertionpoint := 0
+               if nil == jr.Players {
+                       jr.Players = make([]string, 1)
+               } else {
+                       insertionpoint = len(jr.Players)
+                       newplayers := make([]string, insertionpoint+1)
+                       copy(newplayers, jr.Players)
+                       jr.Players = newplayers
+               }
+               jr.Players[insertionpoint] = args[k]
+       }
+       if (k < len(args)) {
+               // skip the !
+               k++
+               if (len(args) - (k))%2 != 0 {
+                       fmt.Fprintf(os.Stderr, "Error: Odd number of param arguments.\n")
+                       os.Exit(1)
+               }
+               for ; k < len(args); k+=2 {
+                       jr.Params[args[k]] = args[k+1]
+               }
+       }
+       
+       raddr, err := net.ResolveUnixAddr("unix", *AudienceSock)
+       if err != nil {
+               fmt.Fprintf(os.Stderr, "Failed to resolve sockaddr: %s\n", err)
+               os.Exit(1)
+       }
+       conn, err := net.DialUnix("unix", nil, raddr)
+       if err != nil {
+               fmt.Fprintf(os.Stderr, "Failed to connect to sockaddr: %s\n", err)
+               os.Exit(1)
+       }
+
+       defer conn.Close()
+
+       conn.SetTimeout(0)
+
+       nc := net.Conn(conn)
+
+       r, _ := nc.(io.Reader)
+       w, _ := nc.(io.Writer)
+
+       dec := json.NewDecoder(r)
+       enc := json.NewEncoder(w)
+
+       // send the message
+       err = enc.Encode(jr)
+       if err != nil {
+               fmt.Fprintf(os.Stderr, "Failed to marshal & send: %s\n", err)
+               os.Exit(1)
+       }
+
+       response := new([2]interface{})
+       err = dec.Decode(response)
+       if err != nil {
+               fmt.Fprintf(os.Stderr, "Error decoding response: %s\n", err)
+               os.Exit(1)
+       }
+       // coerce field 0 back into a string.
+       rerr,ok := response[0].(string)
+       if ok {
+               if rerr == "OK" {
+                       // all OK!  get the JobID
+                       jobid, _ := response[1].(float64)
+                       fmt.Printf("%d\n", uint64(jobid))
+                       os.Exit(0)
+               } else {
+                       fmt.Fprintf(os.Stderr, "Server Error: %s\n", rerr)
+                       os.Exit(1)
+               }
+       } else {
+               fmt.Fprintf(os.Stderr, "Couldn't unmarshal response correctly.\n");
+               os.Exit(1)
+       }
+}