From 70e5f6e13441fe2a7912d90467b40b8389e81f68 Mon Sep 17 00:00:00 2001 From: Steven McDonald Date: Sun, 25 Sep 2011 23:55:24 +1000 Subject: [PATCH 1/1] initialise repo --- .gitignore | 10 + LICENSE | 24 + Makefile | 70 +++ README | 124 +++++ clientlibs/python/README.md | 16 + clientlibs/python/audience.py | 58 +++ debian/changelog | 59 +++ debian/compat | 1 + debian/control | 30 ++ debian/orchestra-conductor.conffile | 2 + debian/orchestra-conductor.default | 8 + debian/orchestra-conductor.dirs | 4 + debian/orchestra-conductor.init | 66 +++ debian/orchestra-conductor.install | 5 + debian/orchestra-player-go.conffile | 1 + debian/orchestra-player-go.default | 8 + debian/orchestra-player-go.dirs | 3 + debian/orchestra-player-go.init | 67 +++ debian/orchestra-player-go.install | 2 + debian/rules | 78 +++ debian/source/format | 1 + debian/source/local-options | 2 + doc/audience_api.txt | 50 ++ doc/orchestra.tex | 239 +++++++++ doc/score_pipe_interface.txt | 22 + go-patches/json-unmarshal-immediate.diff | 21 + go-patches/syslog-auto-reconnect.diff | 385 +++++++++++++++ samples/conductor.conf | 29 ++ samples/player.conf | 31 ++ samples/players | 6 + src/conductor/Makefile | 36 ++ src/conductor/audience.go | 212 ++++++++ src/conductor/client.go | 393 +++++++++++++++ src/conductor/conductor.go | 51 ++ src/conductor/config.go | 106 ++++ src/conductor/dispatch.go | 140 ++++++ src/conductor/http.go | 39 ++ src/conductor/job_request.go | 134 ++++++ src/conductor/job_scope.go | 53 ++ src/conductor/job_state.go | 76 +++ src/conductor/persistence.go | 315 ++++++++++++ src/conductor/registry.go | 586 +++++++++++++++++++++++ src/conductor/resp_state.go | 113 +++++ src/conductor/server.go | 105 ++++ src/conductor/signal.go | 50 ++ src/conductor/task_request.go | 47 ++ src/conductor/task_response.go | 61 +++ src/conductor/task_state.go | 68 +++ src/getstatus/Makefile | 8 + src/getstatus/getstatus.go | 137 ++++++ src/orchestra/Makefile | 17 + src/orchestra/marshal.go | 173 +++++++ src/orchestra/orchestra.pb.go | 127 +++++ src/orchestra/orchestra.proto | 43 ++ src/orchestra/shared.go | 82 ++++ src/orchestra/wire.go | 107 +++++ src/player/Makefile | 15 + src/player/config.go | 85 ++++ src/player/execution.go | 159 ++++++ src/player/if_env.go | 42 ++ src/player/if_pipe.go | 92 ++++ src/player/interface.go | 77 +++ src/player/player.go | 373 +++++++++++++++ src/player/registry.go | 100 ++++ src/player/resp_state.go | 112 +++++ src/player/scores.go | 134 ++++++ src/player/signal.go | 49 ++ src/player/task_request.go | 33 ++ src/player/task_response.go | 88 ++++ src/submitjob/Makefile | 8 + src/submitjob/submitjob.go | 142 ++++++ 71 files changed, 6210 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 Makefile create mode 100644 README create mode 100644 clientlibs/python/README.md create mode 100644 clientlibs/python/audience.py create mode 100644 debian/changelog create mode 100644 debian/compat create mode 100644 debian/control create mode 100644 debian/orchestra-conductor.conffile create mode 100644 debian/orchestra-conductor.default create mode 100644 debian/orchestra-conductor.dirs create mode 100755 debian/orchestra-conductor.init create mode 100644 debian/orchestra-conductor.install create mode 100644 debian/orchestra-player-go.conffile create mode 100644 debian/orchestra-player-go.default create mode 100644 debian/orchestra-player-go.dirs create mode 100755 debian/orchestra-player-go.init create mode 100644 debian/orchestra-player-go.install create mode 100755 debian/rules create mode 100644 debian/source/format create mode 100644 debian/source/local-options create mode 100644 doc/audience_api.txt create mode 100644 doc/orchestra.tex create mode 100644 doc/score_pipe_interface.txt create mode 100644 go-patches/json-unmarshal-immediate.diff create mode 100644 go-patches/syslog-auto-reconnect.diff create mode 100644 samples/conductor.conf create mode 100644 samples/player.conf create mode 100644 samples/players create mode 100644 src/conductor/Makefile create mode 100644 src/conductor/audience.go create mode 100644 src/conductor/client.go create mode 100644 src/conductor/conductor.go create mode 100644 src/conductor/config.go create mode 100644 src/conductor/dispatch.go create mode 100644 src/conductor/http.go create mode 100644 src/conductor/job_request.go create mode 100644 src/conductor/job_scope.go create mode 100644 src/conductor/job_state.go create mode 100644 src/conductor/persistence.go create mode 100644 src/conductor/registry.go create mode 100644 src/conductor/resp_state.go create mode 100644 src/conductor/server.go create mode 100644 src/conductor/signal.go create mode 100644 src/conductor/task_request.go create mode 100644 src/conductor/task_response.go create mode 100644 src/conductor/task_state.go create mode 100644 src/getstatus/Makefile create mode 100644 src/getstatus/getstatus.go create mode 100644 src/orchestra/Makefile create mode 100644 src/orchestra/marshal.go create mode 100644 src/orchestra/orchestra.pb.go create mode 100644 src/orchestra/orchestra.proto create mode 100644 src/orchestra/shared.go create mode 100644 src/orchestra/wire.go create mode 100644 src/player/Makefile create mode 100644 src/player/config.go create mode 100644 src/player/execution.go create mode 100644 src/player/if_env.go create mode 100644 src/player/if_pipe.go create mode 100644 src/player/interface.go create mode 100644 src/player/player.go create mode 100644 src/player/registry.go create mode 100644 src/player/resp_state.go create mode 100644 src/player/scores.go create mode 100644 src/player/signal.go create mode 100644 src/player/task_request.go create mode 100644 src/player/task_response.go create mode 100644 src/submitjob/Makefile create mode 100644 src/submitjob/submitjob.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..98ca431 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +.\#* +*~ +\#* +*.6 +*.8 + +**/build.out +**/_obj +bin/** +pkg/** diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..7eca0f5 --- /dev/null +++ b/LICENSE @@ -0,0 +1,24 @@ +Copyright (c) 2011, Anchor Systems Pty Ltd +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of Anchor Systems Pty Ltd nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL ANCHOR SYSTEMS PTY LTD BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..6f2fd53 --- /dev/null +++ b/Makefile @@ -0,0 +1,70 @@ +# +# version of Orchestra +# +VERSION=0.3.0 + +# +# packaging revision. +# +REVISION=1 + +# remove at your own peril. +# +# This tells goinstall to work against the local directory as the +# build/source path, and not use the system directories. +# +GOPATH=$(PWD)/build-tree:$(PWD) +GOINSTALL_FLAGS=-dashboard=false -clean=true -u=false -make=false + +export GOPATH + +all: build + +build: build-tree + goinstall $(GOINSTALL_FLAGS) conductor + goinstall $(GOINSTALL_FLAGS) player + goinstall $(GOINSTALL_FLAGS) submitjob + goinstall $(GOINSTALL_FLAGS) getstatus + +build-tree: + mkdir -p build-tree/src + mkdir -p build-tree/bin + mkdir -p build-tree/pkg + +clean: + -$(RM) -r build-tree/pkg + -$(RM) -r build-tree/bin + +distclean: + -$(RM) -r build-tree + +### NOTE: Make sure the checkouts have the correct tags in the lines below! +deps: distclean build-tree + mkdir -p build-tree/src/github.com/kuroneko && cd build-tree/src/github.com/kuroneko && git clone http://github.com/kuroneko/configureit.git && cd configureit && git checkout v0.1 + mkdir -p build-tree/src/goprotobuf.googlecode.com && cd build-tree/src/goprotobuf.googlecode.com && hg clone -r go.r60 http://goprotobuf.googlecode.com/hg + +archive.deps: deps + tar czf ../orchestra-$(VERSION).build-tree.tar.gz -C build-tree --exclude .git --exclude .hg . + +archive.release: archive.deps + git archive --format=tar --prefix=orchestra-$(VERSION)/ v$(VERSION) | gzip -9c > ../orchestra-$(VERSION).tgz + +.PHONY : debian debian.orig debian.debian debian.build-tree archive.deps archive.release archive.head + +archive.head: + git archive --format=tar --prefix=orchestra/ HEAD | gzip -9c > ../orchestra-HEAD.tgz + +DEBIAN_VERSION=$(shell dpkg-parsechangelog | grep -e 'Version:' | awk '{ print $$2 }') +DEBIAN_SRC_VERSION=$(shell echo $(DEBIAN_VERSION) | cut -d- -f 1) + +debian: debian.orig debian.debian debian.build-tree clean + cd .. && dpkg-source -b $(PWD) + +debian.orig: + git archive --format=tar --prefix=orchestra-$(DEBIAN_SRC_VERSION)/ v$(DEBIAN_SRC_VERSION) | gzip -9c > ../orchestra_$(DEBIAN_SRC_VERSION).orig.tar.gz + +debian.debian: + tar zcf ../orchestra_$(DEBIAN_VERSION).debian.tar.gz -C debian . + +debian.build-tree: deps + tar zcf ../orchestra_$(DEBIAN_SRC_VERSION).orig-build-tree.tar.gz -C build-tree --exclude .git --exclude .hg . diff --git a/README b/README new file mode 100644 index 0000000..229f25a --- /dev/null +++ b/README @@ -0,0 +1,124 @@ +A 5 Minute Guide to Orcehstra +----------------------------- + +What is it? +=========== + +Orchestra is a series of tools for Getting Shit Run. + +It consists of a Conductor, which is the coordinating process, and +Players, which are the actual daemons running on nodes to do the work. + +To prevent arbitrary execution of code, Players can only execute +predefined scores which have to be installed on them seperately. You +can use puppet, cfengine or other configuration management system to +do this. + +Canonically, entities requesting work to be done are known as the +Audience. + +Please read the Orchestra paper (in doc/) for more information. + +License +======= + +Copyright (c) 2011, Anchor Systems Pty Ltd +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of Anchor Systems Pty Ltd nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +Building +======== + +Install the Go Release 59 compiler. Prior to compilation, apply the +patches in go-patches/ in order to fix a few package bugs. + +From the top level directory, run make. It will use goinstall to +build the binaries in bin/ + +Source Layout +============= + +src/ -- All the go sources for the conductor, player, and the + submitjob and getstatus sample implementations. + +doc/ -- Documentation about Orchestra and it's implementation. + +samples/ -- Sample configuration files. + +clientlibs/ -- Sample client libraries for communicating with the + Conductor as the Audience. + +go-patches/ -- Patches against the Release version of Go to fix + critical bugs or missing features encountered when + implementing Orchestra + +Patches Included +================ + + * json-unmarshal-immediate.diff : Fix for Go Bug #2170 + * syslog-auto-reconnect.diff : Fix for Go Bug #2264 + +New In This Release +=================== + +v0.3.0: + * BUGFIX: Fixed conductor ignoring the last_id checkpoint file after + clean shutdowns. + * FEATUREFIX: Fix the exported fieldnames in the audience interface + so they no longer contain capitals. Refactor slightly to reuse + state types defined for persistence. + * Separation of some of the more esoteric shared code from the common + library + * Conductor Queue Persistence. + * Patches against the Go standard packages. :( + +v0.2.0: + * First public release. + +Known Issues +============ + + * There is no clean up of job data, or persistance of results at this + time. This is intended to be implemented as soon as time permits. + + * getstatus gets back a lot more information than it displays. + + * No efficient 'wait for job' interface yet. You have to poll the + audience interface for completion results for now. (The polls are, + however, stupidly cheap) + + * Disconnect/Reconnect behaviour for players is not particualrly well + tested. Annecdotal evidence suggests that this is relatively + robust however. + + * Jobs will be left dangling if a player is removed from the + conductor's configuration and the conductor HUP'd whilst there is + still work pending for that player. + + * Some of the more advanced score scheduling ideas that we've had + remain unimplemented, resulting in Orchestra looking a lot blander + than it really is meant to be. + + * There is no support for CRLs yet. diff --git a/clientlibs/python/README.md b/clientlibs/python/README.md new file mode 100644 index 0000000..6af781d --- /dev/null +++ b/clientlibs/python/README.md @@ -0,0 +1,16 @@ +# pyaudience + +This is a very simple interface stub for python to talk to a running +conductor. + +submit_job submits a job. It does no argument checking (it probably +should). + +get_status gets data for a job. If successful, it returns the unmarshall'd +json result. + +Both methods throw exceptions if anything goes wrong. There is a +ServerError exception type which will be raised if the server +complains about anything. + +For more information about this API, please see doc/audience_api.txt diff --git a/clientlibs/python/audience.py b/clientlibs/python/audience.py new file mode 100644 index 0000000..27106d0 --- /dev/null +++ b/clientlibs/python/audience.py @@ -0,0 +1,58 @@ +# audience.py + +import socket +import json + +DEFAULT_SOCKET_PATH="/var/spool/orchestra/conductor.sock" + +class ServerError(Exception): + pass + +def submit_job(score, scope, target, args=None, sockname=DEFAULT_SOCKET_PATH): + reqObj = { + 'op': 'queue', + 'score': score, + 'scope': scope, + 'players': None, + 'params': {} + } + + reqObj['players'] = list(target) + + if args is not None: + reqObj['params'] = args + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(sockname) + f = sock.makefile() + try: + f.write(json.dumps(reqObj)) + f.flush() + resp = json.load(f) + + if resp[0] == 'OK': + return resp[1] + else: + raise ServerError(resp[1]) + finally: + sock.close() + +def get_status(jobid, sockname=DEFAULT_SOCKET_PATH): + reqObj = { + 'op': 'status', + 'id': jobid + } + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(sockname) + f = sock.makefile() + try: + f.write(json.dumps(reqObj)) + f.flush() + resp = json.load(f) + + if resp[0] == 'OK': + return resp[1] + else: + raise ServerError(resp[1]) + finally: + sock.close() diff --git a/debian/changelog b/debian/changelog new file mode 100644 index 0000000..83f9462 --- /dev/null +++ b/debian/changelog @@ -0,0 +1,59 @@ +orchestra (0.3.0-0anchor1) unstable; urgency=low + + * Update to golang r60. + * Update to upstream 0.3.0 + + -- Chris Collins Tue, 20 Sep 2011 14:27:37 +1000 + +orchestra (0.2.0-0anchor1) unstable; urgency=low + + * Update to golang r59. + * New build chain. + + -- Chris Collins Thu, 04 Aug 2011 15:53:40 +1000 + +orchestra (0.1.4-1) unstable; urgency=low + + * Fix bug that crept in with fixes for #218. + + -- Chris Collins Tue, 26 Jul 2011 14:17:58 +1000 + +orchestra (0.1.3-1) unstable; urgency=low + + * Update job state when disqualifying players. (fixes #208 & #218) + + * Validate requests better. (fixes #218) + + * Fix handling of last_id (fixes #216) + + * Initialise Response Maps correctly (fixes #217) + + * Filter non-executable files in the scores directory from consideration + + -- Chris Collins Tue, 26 Jul 2011 12:01:48 +1000 + +orchestra (0.1.2-2) unstable; urgency=low + + * Add missing /var/spool/orchestra directory to package management. + + -- Chris Collins Tue, 19 Jul 2011 14:58:13 +1000 + +orchestra (0.1.2-1) unstable; urgency=low + + * Update for 0.1.2 bugfixes and 'pipe' interface. + + -- Chris Collins Tue, 19 Jul 2011 14:35:52 +1000 + +orchestra (0.1.1-1) unstable; urgency=low + + * Update for 0.1.1 bugfixes. + + * Default to disabled. + + -- Chris Collins Thu, 07 Jul 2011 16:03:56 +1000 + +orchestra (0.1-1) unstable; urgency=low + + * Initial Version + + -- Chris Collins Mon, 04 Jul 2011 11:53:26 +1000 diff --git a/debian/compat b/debian/compat new file mode 100644 index 0000000..7f8f011 --- /dev/null +++ b/debian/compat @@ -0,0 +1 @@ +7 diff --git a/debian/control b/debian/control new file mode 100644 index 0000000..62319a5 --- /dev/null +++ b/debian/control @@ -0,0 +1,30 @@ +Source: orchestra +Maintainer: Chris Collins +Section: admin +Priority: extra +Build-Depends: debhelper (>= 7.0.50~), golang (>= 1:60), golang (<< 1:61) +Standards-Version: 3.9.1 + +Package: orchestra-conductor +Architecture: i386 amd64 +Depends: ssl-cert, ${misc:Depends} +Description: The Orchestra management server + Orchestra is a system for managing distributed job execution over a + group of servers. + . + This package contains the Conductor which manages the request queues + and dispatches work to the servers. It also contains the job + submission tools. + +Package: orchestra-player-go +Architecture: i386 amd64 +Description: The Orchestra execution agent + Orchestra is a system for managing distributed job execution over a + group of servers. + . + This package contains the go implementation of the Player which + retrieves the work queue from the Conductor and performs the actual + task execution. + . + As this particular implemention is written in Go, it only works on + x86 architecture systems. diff --git a/debian/orchestra-conductor.conffile b/debian/orchestra-conductor.conffile new file mode 100644 index 0000000..949d739 --- /dev/null +++ b/debian/orchestra-conductor.conffile @@ -0,0 +1,2 @@ +/etc/orchestra/player +/etc/orchestra/conductor.conf diff --git a/debian/orchestra-conductor.default b/debian/orchestra-conductor.default new file mode 100644 index 0000000..fcde44a --- /dev/null +++ b/debian/orchestra-conductor.default @@ -0,0 +1,8 @@ +# +# Defaults for orchestra-conductor +# + +# +# set ENABLED=yes to enable the conductor. +# +ENABLED=no diff --git a/debian/orchestra-conductor.dirs b/debian/orchestra-conductor.dirs new file mode 100644 index 0000000..b3bb146 --- /dev/null +++ b/debian/orchestra-conductor.dirs @@ -0,0 +1,4 @@ +/usr/sbin +/usr/bin +/etc/orchestra +/var/spool/orchestra diff --git a/debian/orchestra-conductor.init b/debian/orchestra-conductor.init new file mode 100755 index 0000000..192bbf7 --- /dev/null +++ b/debian/orchestra-conductor.init @@ -0,0 +1,66 @@ +#!/bin/sh + +### BEGIN INIT INFO +# Provides: orchestra-conductor +# Required-Start: networking +# Required-Stop: networking +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Short-Description: Conductor queue manager for Orchestra +### END INIT INFO + +set -e + +export PATH="/usr/bin:/bin:/usr/sbin:/sbin" + +CONDUCTOR=/usr/sbin/conductor + +test -x "${CONDUCTOR}" || exit 0 + +CONDUCTOR_ARGS="" +ENABLED=no +PIDFILE=/var/run/orchestra-conductor.pid + +if test -f /etc/default/orchestra-conductor; then + . /etc/default/orchestra-conductor +fi + +. /lib/lsb/init-functions + +if [ "${ENABLED}" != "yes" ]; then + exit 0 +fi + +case "$1" in + start) + log_daemon_msg "Starting the Conductor" + if start-stop-daemon --start --quiet --oknodo --background --pidfile "${PIDFILE}" --make-pidfile --exec "${CONDUCTOR}" -- ${CONDUCTOR_ARGS}; then + log_end_msg 0 + else + log_end_msg 1 + fi + ;; + stop) + log_daemon_msg "Stopping the Conductor" + if start-stop-daemon --stop --quiet --oknodo --pidfile "${PIDFILE}" -x "${CONDUCTOR}"; then + log_end_msg 0 + else + log_end_msg 1 + fi + ;; + reload) + log_daemon_msg "Asking the Conductor to Reload Configuration" + if start-stop-daemon --stop --quiet --signal HUP --pidfile "${PIDFILE}" -x "${CONDUCTOR}"; then + log_end_msg 0 + else + log_end_msg 1 + fi + ;; + status) + status_of_proc -p "${PIDFILE}" "${CONDUCTOR}" conductor && exit 0 || exit $? + ;; + *) + log_action_msg "Usage: /etc/init.d/orchestra-conductor {start|stop|reload|status}" + exit 1 + ;; +esac diff --git a/debian/orchestra-conductor.install b/debian/orchestra-conductor.install new file mode 100644 index 0000000..c267556 --- /dev/null +++ b/debian/orchestra-conductor.install @@ -0,0 +1,5 @@ +bin/conductor /usr/sbin +bin/getstatus /usr/bin +bin/submitjob /usr/bin +samples/players /etc/conductor +samples/conductor.conf /etc/conductor \ No newline at end of file diff --git a/debian/orchestra-player-go.conffile b/debian/orchestra-player-go.conffile new file mode 100644 index 0000000..e9ea019 --- /dev/null +++ b/debian/orchestra-player-go.conffile @@ -0,0 +1 @@ +/etc/orchestra/player.conf diff --git a/debian/orchestra-player-go.default b/debian/orchestra-player-go.default new file mode 100644 index 0000000..d4c5c35 --- /dev/null +++ b/debian/orchestra-player-go.default @@ -0,0 +1,8 @@ +# +# Defaults for orchestra-player-go +# + +# +# set ENABLED=yes to enable the player. +# +ENABLED=no diff --git a/debian/orchestra-player-go.dirs b/debian/orchestra-player-go.dirs new file mode 100644 index 0000000..d175ca6 --- /dev/null +++ b/debian/orchestra-player-go.dirs @@ -0,0 +1,3 @@ +/usr/sbin +/usr/lib/orchestra/scores +/etc/orchestra diff --git a/debian/orchestra-player-go.init b/debian/orchestra-player-go.init new file mode 100755 index 0000000..933912a --- /dev/null +++ b/debian/orchestra-player-go.init @@ -0,0 +1,67 @@ +#!/bin/sh + +### BEGIN INIT INFO +# Provides: orchestra-player-go +# Required-Start: networking +# Required-Stop: networking +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Short-Description: Player execuation agent for Orchestra +### END INIT INFO + +set -e + +export PATH="/usr/bin:/bin:/usr/sbin:/sbin" + +PLAYER=/usr/sbin/player + +test -x "${PLAYER}" || exit 0 + +PLAYER_ARGS="" +ENABLED=no + +PIDFILE=/var/run/orchestra-player-go.pid + +if test -f /etc/default/orchestra-player-go; then + . /etc/default/orchestra-player-go +fi + +. /lib/lsb/init-functions + +if [ "${ENABLED}" != "yes" ]; then + exit 0 +fi + +case "$1" in + start) + log_daemon_msg "Starting the Player" + if start-stop-daemon --start --quiet --oknodo --background --pidfile "${PIDFILE}" --make-pidfile --exec "${PLAYER}" -- ${PLAYER_ARGS}; then + log_end_msg 0 + else + log_end_msg 1 + fi + ;; + stop) + log_daemon_msg "Stopping the Player" + if start-stop-daemon --stop --quiet --oknodo --pidfile "${PIDFILE}" -x "${PLAYER}"; then + log_end_msg 0 + else + log_end_msg 1 + fi + ;; + reload) + log_daemon_msg "Asking the Player to Reload Configuration" + if start-stop-daemon --stop --quiet --signal HUP --pidfile "${PIDFILE}" -x "${PLAYER}"; then + log_end_msg 0 + else + log_end_msg 1 + fi + ;; + status) + status_of_proc -p "${PIDFILE}" "${PLAYER}" player && exit 0 || exit $? + ;; + *) + log_action_msg "Usage: /etc/init.d/orchestra-player-go {start|stop|status}" + exit 1 + ;; +esac diff --git a/debian/orchestra-player-go.install b/debian/orchestra-player-go.install new file mode 100644 index 0000000..52ba624 --- /dev/null +++ b/debian/orchestra-player-go.install @@ -0,0 +1,2 @@ +bin/player /usr/sbin +samples/player.conf /etc/conductor diff --git a/debian/rules b/debian/rules new file mode 100755 index 0000000..33b82a2 --- /dev/null +++ b/debian/rules @@ -0,0 +1,78 @@ +#!/usr/bin/make -f +# Sample debian/rules that uses debhelper. +# This file is public domain software, originally written by Joey Hess. +# +# This version is for packages that are architecture dependent. + +# Uncomment this to turn on verbose mode. +#export DH_VERBOSE=1 + +build: build-stamp +build-stamp: + dh_testdir + + make build + + touch build-stamp + +clean: + dh_testdir + dh_testroot + rm -f build-stamp + + make clean + + dh_clean + +install: build + dh_testdir + dh_testroot + dh_prep + dh_installdirs + + # Add here commands to install the package into debian/ + #$(MAKE) prefix=`pwd`/debian/`dh_listpackages`/usr install + +# Build architecture-independent files here. +binary-indep: build install +# We have nothing to do by default. + +# Build architecture-dependent files here. +binary-arch: build install + dh_testdir + dh_testroot + dh_installchangelogs + dh_installdocs + dh_installexamples + dh_install +# dh_installmenu +# dh_installdebconf +# dh_installlogrotate +# dh_installemacsen +# dh_installcatalogs +# dh_installpam +# dh_installmime + dh_installinit +# dh_installcron +# dh_installinfo +# dh_installwm +# dh_installudev +# dh_lintian +# dh_bugfiles +# dh_undocumented + dh_installman + dh_link +# Strip baaaad when go programs are involved. +# dh_strip + dh_compress + dh_fixperms +# dh_perl +# dh_makeshlibs + dh_installdeb + dh_shlibdeps + dh_gencontrol + dh_md5sums + dh_builddeb + +binary: binary-indep binary-arch +.PHONY: build clean binary-indep binary-arch binary install diff --git a/debian/source/format b/debian/source/format new file mode 100644 index 0000000..163aaf8 --- /dev/null +++ b/debian/source/format @@ -0,0 +1 @@ +3.0 (quilt) diff --git a/debian/source/local-options b/debian/source/local-options new file mode 100644 index 0000000..2340d6e --- /dev/null +++ b/debian/source/local-options @@ -0,0 +1,2 @@ +tar-ignore = .git .hg + diff --git a/doc/audience_api.txt b/doc/audience_api.txt new file mode 100644 index 0000000..e6c6824 --- /dev/null +++ b/doc/audience_api.txt @@ -0,0 +1,50 @@ +Audience API: + +So: Our Audience needs a way to do stuff. + +All key names (quoted below) are case sensitive. + +Overview: + Connect to the Unix Socket. + Send JSON. + Read JSON response back. + Close socket. + +QUEUE JOB: + +Request: +- dict: + - 'op': 'queue' + - 'score': Score Name + - 'players': Array + - playername + - 'scope': either 'all' or 'one' + - 'params': dict + - k/v's passed through to job. + +Response: +- array: +[error, jobid] + +GET STATUS: +Request: +- dict: + - 'op': 'status' + - 'id': jobid + +Response: +- array: +[error, dict] + +dict is: +- 'status': aggregated result "OK/Failure" +- 'players': dict - individual results + - hostname: dict + - 'status': individual OK/Failure + - 'response': dict + +error is 'OK' if successful. jobid is the JobID if sucessful. + +Error is otherwise an error mesage. + + diff --git a/doc/orchestra.tex b/doc/orchestra.tex new file mode 100644 index 0000000..63c6c71 --- /dev/null +++ b/doc/orchestra.tex @@ -0,0 +1,239 @@ +\documentclass[a4paper]{article} +\usepackage[T1]{fontenc} +\usepackage{textcomp} +\usepackage{mathptmx} +\begin{document} +\title{Orchestra} +\author{Christopher Collins} +\maketitle + +\section{Introduction} + +Orchestra is a suite for managing the reliable execution of tasks over +a number of hosts. It is intended to be simple and secure, leaving +more complicated coordination and tasks to tools better suited for +those jobs. + +To accomodate the needs of web interfaces and other transactional +interface methods, the main mode of operation for clients using +Orchestra is asynchronous with respect to the execution of the jobs. + +\section{Operating Principles} + +The fundamental ideas behing Orchestra is that a client (the +``Audience'') requests that a job (the ``Score'') is excuted by one or +more agents (the ``Players''). This process is managed by the +``Conductor'' which mediates between the Audience and the Players. + +A request consists of the name of the Score to execute, and a scope of +execution which defines if the score is to be executed on a single +player or multiple players, and which players are allowed to service +the request. + +Once a request has been received by a conductor, it is broken down +into one or more tasks which represent a single execution of the +requests score on a single machine. These tasks are communicated to +the players, which perform the task, and report back outcomes, which +are then collated by the conductor back into a response. + +\section{Scores} + +Scores are an executable object, be it a script or binary executable, +which is optionally accompanied by a configuration file. + +In order for a Score to be considered for execution by a Player, it +must have the executable bit set, and must be in the Score Directory +({\tt /usr/lib/orchestra/scores} by default). The filename of the +Score itself is used to identify it in Job requests. + +The optional configuration file contains directives which may tell the +Player how to behave when executing the score. + +Currently, the score configuration file allows you to set which +communication interface to use when exchanging information with the +Score, the initial working directory and the initial path for the +score. + +Intended future features includes the ability to have the Player +change the effective User ID prior to execution of the score. + +Because we allow bidirectional communication between Scores and +Players, we need to establish a mechanism to do this. The interface +option controls this, and currently can be set to either ``{\tt env}'' +or ``{\tt pipe}''. + +The Player only loads score information at start up, or when prompted +to by a {\tt SIGHUP}. Adding or removing files from this directory +will not change what the Player considers valid, or the parameters for +executing a score, until a {\tt SIGHUP} has been issued. + +\subsection{env Interface} + +The ``{\tt env}'' interface is a one-way communication interface which +allows the Player to communicate paramaters to the Score, but not +vice-versa. + +It does this by preprending ``{\tt ORC\_}'' to the key name, and +setting it in the environment appropriately. + +The ``{\tt env}'' interface connects the Score's {\tt STDIN} and {\tt + STDOUT} to {\tt /dev/null}. + +If the process exits with a exit status of 0, it is treated as a +success. All other outcomes are treated as failures. + +\subsection{pipe Interface} + +The ``{\tt pipe}'' interface is a two-way communication interface that +operates in a very similar manner to the {\tt env} interface. + +Rather than connecting {\tt STDOUT} to {\tt /dev/null}, it's connected +to a pipe which it listens on for lines of the format ``{\tt + =}''. When it receives such a line, it is set into the +Response set, replacing any existing values for {\tt }. + +Just as with {\tt env}, if the process exits with a exit status of 0, +it is treated as a success. All other outcomes are treated as +failures. + +\section{Audience Requests} + +At present, there are only two operations available to the audience: + +\begin{itemize} +\item Submit Job +\item Query Status of Job +\end{itemize} + +\subsection{Submit Job} + +The Submit Job operation allows an audience member to submit a request +for a score to be executed. + +The request contains the Score's name, a scope (``One of'' or ``All +of''), the valid players list, and a series of key/value parameters to +be passed to the score. + +The Conductor responds to this by either rejecting the request with an +error, or by returning the ID for this request. + +Once accepted, the job is then completed as per the considerations +listed under ``Job Execution''. + +\subsection{Query Status of Job} + +The Query Status of Job operation allows an audience member to get the +status of a previous job sumission. + +The request contains the ID for the job to check. + +The Conductor responds to this by either rejecting the request with an +error, or by returning an aggregate status for the Job, and detailed +individual response data. + +The Aggregate status is either a Success, Partial Success or Failure +status. + +With a ``One Of'' job, the aggregate status is simply the outcome from +the execution attempt. + +With an ``All Of'' job, the aggregate status only indicates success or +failure if the result is unanimous. Partial Success is returned for +any mixed outcome jobs. + +\section{Job Execution} + +When a Job is accepted, it is refactored as a series of single machine +tasks which are then appened to a FIFO queue. + +Players, when idle, send requests for tasks to the Conductor, which +then drains the first task from the queue which the Player can +service, and sends the data to that Player, committing the Task to +that specific player if it has ``One of'' scope. + +The Player then attempts to execute a score according to the details +in the task. This returns a result upon completion or error, and if +successful, may also contain a key/value pair set response. This is +then reported back to the Conductor. + +Score results are either Hard outcomes (such as Success, Failure, +Internal Error, Host Error During Execution) or Soft outcomes (Error +starting score, Score unknown). When a soft failure outcome is +reported back to the Conductor for a ``One Of'' job, the conductor +records the result, removes the player from the valid destinations +list, and reschedules the task for execution at the head of the queue. + +In the interest of security, all network communication performed +between the Conductor and Player is encrypted using TLS. + +\section{The Conductor} + +The Conductor is the single most imporant process in an Orchestra +deployment. It manages the work queue, and services requests from the +Audience. + +Access to the Conductor by Players is controlled by a combination of +the player list, a simple text file which lists the full hostname of +all authorised players, and verification of the Player's TLS +certificate. + +\subsection{Audience Interface} + +The Conductor listens for audience requests on a Unix Domain Socket. +The path can be configured via the {\tt conductor.conf} configuration +file, but defaults to {\tt /var/run/audience.sock}. + +The audience socket accepts a connection from any local user, and +expects the client to send a single JSON encoded object. It will then +either send back a JSON encoded response or close the socket +immediately if it couldn't understand the request. + +It is expected that the service time for such requests is fast enough +that it will not interfere with web applications polling for job +execution status. + +Further documentation about this interface can be found in {\tt + doc/audience\_api.txt}. + +\subsection{Status Interface} + +The Condcutor implements a very basic human readable status interface +accessible via HTTP on port 2259. This interface tells you how many +tasks are currently pending dispatch, and which hosts are currently +idle, pending Tasks. + +\subsection{Player Interface} + +The Player interface is a TCP TLS listener on port 2258 which +implements the Orchestra Player/Conductor protocol. + +The Protocol is a binary protocol that uses a well defined 3 byte +header combined with message bodies encoded using the Google Protocol +Buffers toolkit. + +The protocol was constructed this way in order to make it easy to +implement a new Player in another language. This was to allow for the +implementation of Orchestra Players on systems that do not have a Go +compiler. + +\section{Security Considerations} + +To ensure security, the Players and Conductor mutually authenticate +each other using X.509 certificates - the players verify that the +conductor's name matches it's certificate, and that it has a trust +chain to a trusted Certificate Authority, and the conductor verifies +that each connecting player's certificate matches who it claims to be +and that the certificate has a trust chain back to it's trusted +Certificate Authorities. This allows both parties to be assured that +they're communicating with the correct entity. + +As the whole concept of Orchestra is to allow the remote execution of +work, it was an intentional design decision to restrict execution to +existing scripts or executables on the system which have been +explicitly configured to be invocable. The distribution or +installation of these materials is left for either manual deployment, +or for other automated management systems to handle (such as Chef, or +Puppet). + +\end{document} + diff --git a/doc/score_pipe_interface.txt b/doc/score_pipe_interface.txt new file mode 100644 index 0000000..12a5020 --- /dev/null +++ b/doc/score_pipe_interface.txt @@ -0,0 +1,22 @@ +The Score 'pipe' Interface +========================== + +So, there was a 'env' interface which provided K/V data to the score via +the environment. + +That works for sending information to the score, but not for getting +information back. + +To make that step, the 'pipe' interface has been built. + +pipe reads line output in stdout from the executing process. If the +line contains a '=', it's split at the first '=' for the Key and +Value, and they are stored in the responses. All other output is +ignored. + +To use this interface, the score's .conf file should contain: +---- +interface=pipe +---- + + diff --git a/go-patches/json-unmarshal-immediate.diff b/go-patches/json-unmarshal-immediate.diff new file mode 100644 index 0000000..e734ca0 --- /dev/null +++ b/go-patches/json-unmarshal-immediate.diff @@ -0,0 +1,21 @@ +diff -r bacb4ed5791c src/pkg/json/decode.go +--- a/src/pkg/json/decode.go Thu Aug 04 16:38:18 2011 +1000 ++++ b/src/pkg/json/decode.go Mon Aug 22 16:50:36 2011 +1000 +@@ -549,6 +549,17 @@ + item := d.data[start:d.off] + + // Check for unmarshaler. ++ // first, check if we can do an immediate unmarshalling. ++ if v.CanAddr() { ++ unmarshaler, ok := v.Addr().Interface().(Unmarshaler) ++ if ok { ++ err := unmarshaler.UnmarshalJSON(item) ++ if err != nil { ++ d.error(err) ++ } ++ return ++ } ++ } + wantptr := item[0] == 'n' // null + unmarshaler, pv := d.indirect(v, wantptr) + if unmarshaler != nil { diff --git a/go-patches/syslog-auto-reconnect.diff b/go-patches/syslog-auto-reconnect.diff new file mode 100644 index 0000000..a6ce1b6 --- /dev/null +++ b/go-patches/syslog-auto-reconnect.diff @@ -0,0 +1,385 @@ +diff -r b0819469a6df src/pkg/syslog/syslog.go +--- a/src/pkg/syslog/syslog.go Thu Sep 08 12:16:42 2011 +1000 ++++ b/src/pkg/syslog/syslog.go Tue Sep 20 11:29:08 2011 +1000 +@@ -17,23 +17,66 @@ + type Priority int + + const ( +- // From /usr/include/sys/syslog.h. +- // These are the same on Linux, BSD, and OS X. +- LOG_EMERG Priority = iota +- LOG_ALERT +- LOG_CRIT +- LOG_ERR +- LOG_WARNING +- LOG_NOTICE +- LOG_INFO +- LOG_DEBUG ++ // these are internal and are used to detect when overrides ++ // are in place. ++ SeverityPresent = Priority(0x100) ++ SeverityMask = Priority(0x7) ++ FacilityPresent = Priority(0x200) ++ FacilityMask = Priority(0x1f << 3) ++ ++ // From the RFCs. Names lifted from C. ++ LOG_EMERG = Priority(0) | SeverityPresent ++ LOG_ALERT = Priority(1) | SeverityPresent ++ LOG_CRIT = Priority(2) | SeverityPresent ++ LOG_ERR = Priority(3) | SeverityPresent ++ LOG_WARNING = Priority(4) | SeverityPresent ++ LOG_NOTICE = Priority(5) | SeverityPresent ++ LOG_INFO = Priority(6) | SeverityPresent ++ LOG_DEBUG = Priority(7) | SeverityPresent ++ ++ LOG_KERN = Priority(0<<3) | FacilityPresent ++ LOG_USER = Priority(1<<3) | FacilityPresent ++ LOG_MAIL = Priority(2<<3) | FacilityPresent ++ LOG_DAEMON = Priority(3<<3) | FacilityPresent ++ LOG_AUTH = Priority(4<<3) | FacilityPresent ++ LOG_SYSLOG = Priority(5<<3) | FacilityPresent ++ LOG_LPR = Priority(6<<3) | FacilityPresent ++ LOG_NEWS = Priority(7<<3) | FacilityPresent ++ LOG_UUCP = Priority(8<<3) | FacilityPresent ++ LOG_CRON = Priority(9<<3) | FacilityPresent ++ LOG_AUTHPRIV = Priority(10<<3) | FacilityPresent ++ LOG_FTP = Priority(11<<3) | FacilityPresent ++ LOG_LOCAL0 = Priority(16<<3) | FacilityPresent ++ LOG_LOCAL1 = Priority(17<<3) | FacilityPresent ++ LOG_LOCAL2 = Priority(18<<3) | FacilityPresent ++ LOG_LOCAL3 = Priority(19<<3) | FacilityPresent ++ LOG_LOCAL4 = Priority(20<<3) | FacilityPresent ++ LOG_LOCAL5 = Priority(21<<3) | FacilityPresent ++ LOG_LOCAL6 = Priority(22<<3) | FacilityPresent ++ LOG_LOCAL7 = Priority(23<<3) | FacilityPresent + ) + ++// splicePriority takes origPri, and mixes in a severity or facility ++// if present in mixPri. ++func splicePriority(origPri, mixPri Priority) (newPri Priority) { ++ newPri = origPri ++ if (mixPri & SeverityPresent) == SeverityPresent { ++ newPri = (newPri & ^SeverityMask) | (mixPri & SeverityMask) ++ } ++ if (mixPri & FacilityPresent) == FacilityPresent { ++ newPri = (newPri & ^FacilityMask) | (mixPri & FacilityMask) ++ } ++ return newPri ++} ++ + // A Writer is a connection to a syslog server. + type Writer struct { + priority Priority + prefix string + conn serverConn ++ // persist the configured peer so we can reconnect when things really catch on fire. ++ network string ++ raddr string + } + + type serverConn interface { +@@ -53,38 +96,104 @@ + return Dial("", "", priority, prefix) + } + +-// Dial establishes a connection to a log daemon by connecting +-// to address raddr on the network net. ++// Dial sets up a connection to a log daemon. The connection attempt ++// will be deferred until the first log message is sent. + // Each write to the returned writer sends a log message with + // the given priority and prefix. ++// If the prefix is empty, the binary's name - will be used in it's place. + func Dial(network, raddr string, priority Priority, prefix string) (w *Writer, err os.Error) { + if prefix == "" { + prefix = os.Args[0] + } +- var conn serverConn +- if network == "" { +- conn, err = unixSyslog() ++ return &Writer{priority & 0xFF, prefix, nil, network, raddr}, err ++} ++ ++// Actually perform a real reconnect, closing any connections that may be open. ++func (w *Writer) Reconnect() (err os.Error) { ++ if w.conn != nil { ++ log.Printf("writer.Reconnect() on old connection.\n") ++ w.conn.close() ++ w.conn = nil ++ } ++ if w.network == "" { ++ w.conn, err = unixSyslog() + } else { + var c net.Conn +- c, err = net.Dial(network, raddr) +- conn = netConn{c} ++ c, err = net.Dial(w.network, w.raddr) ++ w.conn = &netConn{c} + } +- return &Writer{priority, prefix, conn}, err ++ return err ++} ++ ++func canRetry(err os.Error) bool { ++ // not an error? can't retry. ++ if err == nil { ++ return false ++ } ++ oe, ok := err.(*net.OpError) ++ if ok { ++ if oe.Error == os.ECONNREFUSED { ++ return true ++ } ++ } ++ return false + } + + // Write sends a log message to the syslog daemon. +-func (w *Writer) Write(b []byte) (int, os.Error) { +- if w.priority > LOG_DEBUG || w.priority < LOG_EMERG { +- return 0, os.EINVAL ++func (w *Writer) Write(b []byte) (bout int, err os.Error) { ++ if w.conn == nil { ++ err = w.Reconnect() ++ if err != nil { ++ return 0, err ++ } + } +- return w.conn.writeBytes(w.priority, w.prefix, b) ++ retried := false ++retry: ++ bout, err = w.conn.writeBytes(w.priority, w.prefix, b) ++ ++ if canRetry(err) && !retried { ++ retried = true ++ err = w.Reconnect() ++ if err != nil { ++ return 0, err ++ } ++ goto retry ++ } ++ return bout, err + } + +-func (w *Writer) writeString(p Priority, s string) (int, os.Error) { +- return w.conn.writeString(p, w.prefix, s) ++func (w *Writer) writeString(p Priority, s string) (bout int, err os.Error) { ++ msgpriority := splicePriority(w.priority, p) ++ if w.conn == nil { ++ err := w.Reconnect() ++ if err != nil { ++ return 0, err ++ } ++ } ++ retried := false ++retry: ++ bout, err = w.conn.writeString(msgpriority, w.prefix, s) ++ if canRetry(err) && !retried { ++ log.Printf("Retrying: %s", err) ++ if err == os.ECONNREFUSED { ++ log.Printf("Hit!") ++ } ++ retried = true ++ err = w.Reconnect() ++ if err != nil { ++ return 0, err ++ } ++ goto retry ++ } ++ return bout, err + } + +-func (w *Writer) Close() os.Error { return w.conn.close() } ++func (w *Writer) Close() os.Error { ++ if w.conn != nil { ++ return w.conn.close() ++ } ++ return nil ++} + + // Emerg logs a message using the LOG_EMERG priority. + func (w *Writer) Emerg(m string) (err os.Error) { +@@ -124,15 +233,15 @@ + return err + } + +-func (n netConn) writeBytes(p Priority, prefix string, b []byte) (int, os.Error) { +- return fmt.Fprintf(n.conn, "<%d>%s: %s\n", p, prefix, b) ++func (n *netConn) writeBytes(p Priority, prefix string, b []byte) (int, os.Error) { ++ return fmt.Fprintf(n.conn, "<%d>%s: %s\n", p&0xFF, prefix, b) + } + +-func (n netConn) writeString(p Priority, prefix string, s string) (int, os.Error) { +- return fmt.Fprintf(n.conn, "<%d>%s: %s\n", p, prefix, s) ++func (n *netConn) writeString(p Priority, prefix string, s string) (int, os.Error) { ++ return fmt.Fprintf(n.conn, "<%d>%s: %s\n", p&0xFF, prefix, s) + } + +-func (n netConn) close() os.Error { ++func (n *netConn) close() os.Error { + return n.conn.Close() + } + +diff -r b0819469a6df src/pkg/syslog/syslog_test.go +--- a/src/pkg/syslog/syslog_test.go Thu Sep 08 12:16:42 2011 +1000 ++++ b/src/pkg/syslog/syslog_test.go Tue Sep 20 11:29:08 2011 +1000 +@@ -7,11 +7,14 @@ + "io" + "log" + "net" ++ "os" + "testing" + ) + + var serverAddr string + ++const testSocketAddress = "/tmp/fakelog" ++ + func runSyslog(c net.PacketConn, done chan<- string) { + var buf [4096]byte + var rcvd string = "" +@@ -22,10 +25,12 @@ + } + rcvd += string(buf[0:n]) + } ++ ++ c.Close() + done <- rcvd + } + +-func startServer(done chan<- string) { ++func startUDPServer(done chan<- string) { + c, e := net.ListenPacket("udp", "127.0.0.1:0") + if e != nil { + log.Fatalf("net.ListenPacket failed udp :0 %v", e) +@@ -35,6 +40,39 @@ + go runSyslog(c, done) + } + ++func runUDPSyslog(c *net.UDPConn, done chan<- string) { ++ var buf [4096]byte ++ var rcvd string = "" ++ for { ++ n, err := c.Read(buf[0:]) ++ if err != nil || n == 0 { ++ break ++ } ++ rcvd += string(buf[0:n]) ++ } ++ ++ c.Close() ++ done <- rcvd ++} ++ ++func startUnixServer(done chan<- string) (ready <-chan int) { ++ syslogready := make(chan int, 1) ++ os.Remove(testSocketAddress) ++ uaddr, e := net.ResolveUnixAddr("unixgram", testSocketAddress) ++ if e != nil { ++ log.Fatalf("net.ResolveUnixAddr failed: %v", e) ++ } ++ c, e := net.ListenUnixgram("unixgram", uaddr) ++ if e != nil { ++ log.Fatalf("net.ListenPacket failed unixgram /tmp/fakelog %v", e) ++ } ++ ++ c.SetReadTimeout(1000e6) // 100ms ++ go runUDPSyslog(c, done) ++ syslogready <- 1 ++ return syslogready ++} ++ + func skipNetTest(t *testing.T) bool { + if testing.Short() { + // Depends on syslog daemon running, and sometimes it's not. +@@ -44,6 +82,57 @@ + return false + } + ++func TestFakeUnixSyslog(t *testing.T) { ++ done := make(chan string) ++ startUnixServer(done) ++ s, err := Dial("unixgram", "/tmp/fakelog", LOG_INFO|LOG_LOCAL0, "syslog_test") ++ if err != nil { ++ t.Fatalf("Dial() failed: %s", err) ++ } ++ err = s.Debug("Moo") ++ if err != nil { ++ t.Fatalf("s.Debug() failed: %s", err) ++ } ++ expected := "<135>syslog_test: Moo\n" ++ rcvd := <-done ++ if rcvd != expected { ++ t.Fatalf("s.Debug() = '%q', but wanted '%q'", rcvd, expected) ++ } ++ s.Close() ++} ++ ++func TestFlap(t *testing.T) { ++ done := make(chan string) ++ startUnixServer(done) ++ s, err := Dial("unixgram", "/tmp/fakelog", LOG_INFO|LOG_LOCAL0, "syslog_test") ++ if err != nil { ++ t.Fatalf("Dial() failed: %s", err) ++ } ++ err = s.Debug("Moo") ++ if err != nil { ++ t.Fatalf("s.Debug() failed: %s", err) ++ } ++ expected := "<135>syslog_test: Moo\n" ++ rcvd := <-done ++ if rcvd != expected { ++ t.Fatalf("s.Debug() = '%q', but wanted '%q'", rcvd, expected) ++ } ++ // restart the server. ++ <-startUnixServer(done) ++ ++ // and try retransmitting. ++ err = s.Debug("Re-Moo") ++ if err != nil { ++ t.Fatalf("s.Debug() failed: %s", err) ++ } ++ expected = "<135>syslog_test: Re-Moo\n" ++ rcvd = <-done ++ if rcvd != expected { ++ t.Fatalf("s.Info() = '%q', but wanted '%q'", rcvd, expected) ++ } ++ s.Close() ++} ++ + func TestNew(t *testing.T) { + if skipNetTest(t) { + return +@@ -79,8 +168,10 @@ + + func TestUDPDial(t *testing.T) { + done := make(chan string) +- startServer(done) +- l, err := Dial("udp", serverAddr, LOG_INFO, "syslog_test") ++ startUDPServer(done) ++ // it's important that the Dial priority is different to the ++ // actual message sent - this tests priority splicing. ++ l, err := Dial("udp", serverAddr, LOG_DEBUG, "syslog_test") + if err != nil { + t.Fatalf("syslog.Dial() failed: %s", err) + } +@@ -95,7 +186,7 @@ + + func TestWrite(t *testing.T) { + done := make(chan string) +- startServer(done) ++ startUDPServer(done) + l, err := Dial("udp", serverAddr, LOG_ERR, "syslog_test") + if err != nil { + t.Fatalf("syslog.Dial() failed: %s", err) +diff -r b0819469a6df src/pkg/syslog/syslog_unix.go +--- a/src/pkg/syslog/syslog_unix.go Thu Sep 08 12:16:42 2011 +1000 ++++ b/src/pkg/syslog/syslog_unix.go Tue Sep 20 11:29:08 2011 +1000 +@@ -23,7 +23,7 @@ + if err != nil { + continue + } else { +- return netConn{conn}, nil ++ return &netConn{conn}, nil + } + } + } diff --git a/samples/conductor.conf b/samples/conductor.conf new file mode 100644 index 0000000..255deb7 --- /dev/null +++ b/samples/conductor.conf @@ -0,0 +1,29 @@ +### +### conductor.conf +### +### System-wide configuration for the conductor +### + +### Specify the TLS private key and certificate +# x509 private key = /etc/orchestra/conductor_key.pem +# x509 certificate = /etc/orchestra/conductor_crt.pem + +### Specify the trusted CAs for clients. +### +### this is a ':' delimited list of pem files. +# ca certificates = + +### Manually specify the conductor's host name. +# server name = + +### Override the server's bind address +# bind address = + +### Set the path for the audience socket. +# audience socket path = /var/spool/orchestra/conductor.sock + +### Set the directory to store the conductor's state in +# conductor state path = /var/spool/orchestra + +### Set the path of the authorised players file. +# player file path = /etc/orchestra/players diff --git a/samples/player.conf b/samples/player.conf new file mode 100644 index 0000000..83bc9fa --- /dev/null +++ b/samples/player.conf @@ -0,0 +1,31 @@ +### +### player.conf +### +### System-wide configuration for the player +### + +### Specify the TLS private key and certificate +# x509 private key = /etc/orchestra/player_key.pem +# x509 certificate = /etc/orchestra/player_crt.pem + +### Specify the trusted CAs of your conductor/s +### +### this is a ':' delimited list of pem files. +# ca certificates = + +### Manually specify the player's name (typically host name). +### +### This must match the CN in the certificates or else the conductor +### will reject the connection +# player name = + +### The host name of the conductor to connect to. This should +### match the conductor's certificate CN. +# master = conductor + +### Path to the scores directory +### +### The specified directory will be searched for executables and +### matching configuration files. These will be made available as +### scores. +# score directory = /usr/lib/orchestra/scores diff --git a/samples/players b/samples/players new file mode 100644 index 0000000..25d1a09 --- /dev/null +++ b/samples/players @@ -0,0 +1,6 @@ +# /etc/conductor/players +# +# In this file you list the name of all servers you list the name of +# all servers you're willing to arbitrate jobs for. +# +# empty lines and lines starting with '#' are ignored. \ No newline at end of file diff --git a/src/conductor/Makefile b/src/conductor/Makefile new file mode 100644 index 0000000..55f9b6a --- /dev/null +++ b/src/conductor/Makefile @@ -0,0 +1,36 @@ +include $(GOROOT)/src/Make.inc + +DEPS=../orchestra + +TARG=conductor +#GCIMPORTS=-I../pkg/_obj +#LDIMPORTS=-L../pkg/_obj + +GOFILES=\ + conductor.go\ + dispatch.go\ + server.go\ + http.go\ + registry.go\ + client.go\ + signal.go\ + config.go\ + audience.go\ + job_scope.go\ + job_state.go\ + resp_state.go\ + task_state.go\ + job_request.go\ + task_response.go\ + + +include $(GOROOT)/src/Make.cmd + +testkey: conductor_crt.pem conductor_key.pem + +conductor_key.pem: + openssl genrsa -out conductor_key.pem + +HOSTNAME=$(shell hostname --fqdn) +conductor_crt.pem: conductor_key.pem + openssl req -new -x509 -key $< -outform pem -days 365 -out $@ -subj "/C=AU/ST=New South Wales/L=Sydney/CN=$(HOSTNAME)/" diff --git a/src/conductor/audience.go b/src/conductor/audience.go new file mode 100644 index 0000000..5aaf95f --- /dev/null +++ b/src/conductor/audience.go @@ -0,0 +1,212 @@ +/* audience.go +*/ + +package main + +import ( + "io" + "json" + "net" + "os" + o "orchestra" + "strings" +) + +type GenericJsonRequest struct { + Op *string `json:"op"` + Score *string `json:"score"` + Players []string `json:"players"` + Scope *JobScope `json:"scope"` + Params map[string]string `json:"params"` + Id *uint64 `json:"id"` +} + +type JsonPlayerStatus struct { + Status ResponseState `json:"status"` + Response map[string]string `json:"response"` +} + +type JsonStatusResponse struct { + Status JobState `json:"status"` + Players map[string]*JsonPlayerStatus `json:"players"` +} + +func NewJsonStatusResponse() (jsr *JsonStatusResponse) { + jsr = new(JsonStatusResponse) + jsr.Players = make(map[string]*JsonPlayerStatus) + + return jsr +} + +func NewJsonPlayerStatus() (jps *JsonPlayerStatus) { + jps = new(JsonPlayerStatus) + jps.Response = make(map[string]string) + + return jps +} + +func handleAudienceRequest(c net.Conn) { + defer c.Close() + + c.SetTimeout(0) + r, _ := c.(io.Reader) + w, _ := c.(io.Writer) + dec := json.NewDecoder(r) + enc := json.NewEncoder(w) + + outobj := new(GenericJsonRequest) + err := dec.Decode(outobj) + if err != nil { + o.Warn("Error decoding JSON talking to audience: %s", err) + return + } + + if nil == outobj.Op { + o.Warn("Malformed JSON message talking to audience. Missing Op") + return + } + switch *(outobj.Op) { + case "status": + if nil == outobj.Id { + o.Warn("Malformed Status message talking to audience. Missing Job ID") + return + } + job := JobGet(*outobj.Id) + jresp := new([2]interface{}) + if nil != job { + jresp[0] = "OK" + iresp := NewJsonStatusResponse() + iresp.Status = job.State + resnames := JobGetResultNames(*outobj.Id) + for i := range resnames { + tr := JobGetResult(*outobj.Id, resnames[i]) + if nil != tr { + presp := NewJsonPlayerStatus() + presp.Status = tr.State + for k,v:=range(tr.Response) { + presp.Response[k] = v + } + iresp.Players[resnames[i]] = presp + } + + } + jresp[1] = iresp + } else { + jresp[0] = "Error" + jresp[1] = nil + } + enc.Encode(jresp) + o.Debug("Status...") + case "queue": + if nil == outobj.Score { + o.Warn("Malformed Queue message talking to audience. Missing Score") + sendQueueFailureResponse("Missing Score", enc) + return + } + if nil == outobj.Scope { + o.Warn("Malformed Queue message talking to audience. Missing Scope") + sendQueueFailureResponse("Missing Scope", enc) + return + } + if nil == outobj.Players || len(outobj.Players) < 1 { + o.Warn("Malformed Queue message talking to audience. Missing Players") + sendQueueFailureResponse("Missing Players", enc) + return + } + for _, player := range outobj.Players { + if !HostAuthorised(player) { + o.Warn("Malformed Queue message - unknown player %s specified.", player) + sendQueueFailureResponse("Invalid Player", enc) + return + } + } + job := NewRequest() + job.Score = *outobj.Score + job.Scope = *outobj.Scope + job.Players = outobj.Players + job.Params = outobj.Params + + QueueJob(job) + sendQueueSuccessResponse(job, enc) + default: + o.Warn("Unknown operation talking to audience: \"%s\"", *(outobj.Op)) + return + } + + _ = enc +} + +func sendQueueSuccessResponse(job *JobRequest, enc *json.Encoder) { + resp := make([]interface{},2) + resperr := new(string) + *resperr = "OK" + resp[0] = resperr + + // this probably looks odd, but all numbers cross through float64 when being json encoded. d'oh! + jobid := new(uint64) + *jobid = uint64(job.Id) + resp[1] = jobid + + err := enc.Encode(resp) + if nil != err { + o.Warn("Couldn't encode response to audience: %s", err) + } +} + +func sendQueueFailureResponse(reason string, enc *json.Encoder) { + resp := make([]interface{},2) + resperr := new(string) + *resperr = "Error" + resp[0] = resperr + if reason != "" { + resp[1] = &reason + } + err := enc.Encode(resp) + if nil != err { + o.Warn("Couldn't encode response to audience: %s", err) + } +} + +func AudienceListener(l net.Listener) { + for { + c, err := l.Accept() + if err != nil { + o.Warn("Accept() failed on Audience Listenter.") + break + } + go handleAudienceRequest(c) + } +} + +func UnixAudienceListener(sockaddr string) { + fi, err := os.Stat(sockaddr) + if err == nil { + if fi.IsSocket() { + o.Warn("Removing stale socket at %s", sockaddr) + os.Remove(sockaddr) + } else { + o.Fail("%s exists and is not a socket", sockaddr) + } + } + laddr, err := net.ResolveUnixAddr("unix", sockaddr) + o.MightFail(err, "Couldn't resolve audience socket address") + l, err := net.ListenUnix("unix", laddr) + o.MightFail(err, "Couldn't start audience unixsock listener") + // Fudge the permissions on the unixsock! + fi, err = os.Stat(sockaddr) + if err == nil { + os.Chmod(sockaddr, fi.Mode | 0777) + } else { + o.Warn("Couldn't fudge permission on audience socket: %s", err) + } + + // make sure we clean up the unix socket when we die. + defer l.Close() + defer os.Remove(sockaddr) + AudienceListener(l) +} + +func StartAudienceSock() { + audienceSockPath := strings.TrimSpace(GetStringOpt("audience socket path")) + go UnixAudienceListener(audienceSockPath) +} \ No newline at end of file diff --git a/src/conductor/client.go b/src/conductor/client.go new file mode 100644 index 0000000..9dcf2e8 --- /dev/null +++ b/src/conductor/client.go @@ -0,0 +1,393 @@ +/* client.go + * + * Client Handling +*/ + +package main +import ( + o "orchestra" + "net" + "time" + "os" + "crypto/tls" + "crypto/x509" +) + +const ( + KeepaliveDelay = 200e9 // once every 200 seconds. + RetryDelay = 10e9 // retry every 10 seconds. Must be smaller than the keepalive to avoid channel race. + OutputQueueDepth = 10 // This needs to be large enough that we don't deadlock on ourself. +) + + +type ClientInfo struct { + Player string + PktOutQ chan *o.WirePkt + PktInQ chan *o.WirePkt + abortQ chan int + TaskQ chan *TaskRequest + connection net.Conn + pendingTasks map[uint64]*TaskRequest +} + +func NewClientInfo() (client *ClientInfo) { + client = new(ClientInfo) + client.abortQ = make(chan int, 2) + client.PktOutQ = make(chan *o.WirePkt, OutputQueueDepth) + client.PktInQ = make(chan *o.WirePkt) + client.TaskQ = make(chan *TaskRequest) + + return client +} + +func (client *ClientInfo) Abort() { + PlayerDied(client) + reg := ClientGet(client.Player) + if reg != nil { + reg.Disassociate() + } + client.abortQ <- 1; +} + +func (client *ClientInfo) Name() (name string) { + if client.Player == "" { + return "UNK:" + client.connection.RemoteAddr().String() + } + return client.Player +} + +// Must not be used from inside of handlers. +func (client *ClientInfo) Send(p *o.WirePkt) { + client.PktOutQ <- p +} + +// Can only be used form inside of handlers and the main client loop. +func (client *ClientInfo) sendNow(p *o.WirePkt) { + _, err := p.Send(client.connection) + if err != nil { + o.Warn("Error sending pkt to %s: %s. Terminating connection.", client.Name(), err) + client.Abort() + } +} + +func (client *ClientInfo) SendTask(task *TaskRequest) { + tr := task.Encode() + p, err := o.Encode(tr) + o.MightFail(err, "Couldn't encode task for client.") + client.Send(p) + task.RetryTime = time.Nanoseconds() + RetryDelay +} + +func (client *ClientInfo) GotTask(task *TaskRequest) { + /* first up, look at the task state */ + switch (task.State) { + case TASK_QUEUED: + fallthrough + case TASK_PENDINGRESULT: + /* this is a new task. We should send it straight */ + task.Player = client.Player + task.State = TASK_PENDINGRESULT + client.pendingTasks[task.job.Id] = task + client.SendTask(task) + // request a update to the spool so the PENDING flag is stored. + JobWriteUpdate(task.job.Id) + case TASK_FINISHED: + /* discard. We don't care about tasks that are done. */ + } +} + +// reset the task state so it can be requeued. +func CleanTask(task *TaskRequest) { + task.State = TASK_QUEUED + task.Player = "" +} + +// this merges the state from the registry record into the client it's called against. +// it also copies back the active communication channels to the registry record. +func (client *ClientInfo) MergeState(regrecord *ClientInfo) { + client.Player = regrecord.Player + client.pendingTasks = regrecord.pendingTasks + + regrecord.TaskQ = client.TaskQ + regrecord.abortQ = client.abortQ + regrecord.PktOutQ = client.PktOutQ + regrecord.PktInQ = client.PktInQ + regrecord.connection = client.connection +} + +// Sever the connection state from the client (used against registry records only) +func (client *ClientInfo) Disassociate() { + client.TaskQ = nil + client.abortQ = nil + client.PktInQ = nil + client.PktOutQ = nil + client.connection = nil +} + +func handleNop(client *ClientInfo, message interface{}) { + o.Debug("Client %s: NOP Received", client.Name()) +} + +func handleIdentify(client *ClientInfo, message interface{}) { + if client.Player != "" { + o.Warn("Client %s: Tried to reintroduce itself. Terminating Connection.", client.Name()) + client.Abort() + return + } + ic, _ := message.(*o.IdentifyClient) + o.Info("Client %s: Identified Itself As \"%s\"", client.Name(), *ic.Hostname) + client.Player = *ic.Hostname + if (!HostAuthorised(client.Player)) { + o.Warn("Client %s: Not Authorised. Terminating Connection.", client.Name()) + client.Abort() + return + } + + /* if we're TLS, verify the client's certificate given the name it used */ + tlsc, ok := client.connection.(*tls.Conn) + if ok && !*DontVerifyPeer { + intermediates := x509.NewCertPool() + + o.Debug("Connection is TLS.") + o.Debug("Checking Connection State") + cs := tlsc.ConnectionState() + vo := x509.VerifyOptions{ + Roots: CACertPool, + Intermediates: intermediates, + DNSName: client.Player, + } + if cs.PeerCertificates == nil || cs.PeerCertificates[0] == nil { + o.Warn("Peer didn't provide a certificate. Aborting Connection.") + client.Abort() + return + } + // load any intermediate certificates from the chain + // into the intermediates pool so we can verify that + // the chain can be rebuilt. + // + // All we care is that we can reach an authorised CA. + // + //FIXME: Need CRL handling. + if len(cs.PeerCertificates) > 1 { + for i := 1; i < len(cs.PeerCertificates); i++ { + intermediates.AddCert(cs.PeerCertificates[i]) + } + } + _, err := cs.PeerCertificates[0].Verify(vo) + if err != nil { + o.Warn("couldn't verify client certificate: %s", err) + client.Abort() + return + } + } + reg := ClientGet(client.Player) + if nil == reg { + o.Warn("Couldn't register client %s. aborting connection.", client.Name()) + client.Abort() + return + } + client.MergeState(reg) +} + +func handleReadyForTask(client *ClientInfo, message interface{}) { + o.Debug("Client %s: Asked for Job", client.Name()) + PlayerWaitingForJob(client) +} + +func handleIllegal(client *ClientInfo, message interface{}) { + o.Warn("Client %s: Sent Illegal Message") + client.Abort() +} + +func handleResult(client *ClientInfo, message interface{}){ + jr, _ := message.(*o.ProtoTaskResponse) + r := ResponseFromProto(jr) + // at this point in time, we only care about terminal + // condition codes. a Job that isn't finished is just + // prodding us back to let us know it lives. + if r.IsFinished() { + job := JobGet(r.id) + if nil == job { + o.Warn("Client %s: NAcking for Job %d - couldn't find job data.", client.Name(), r.id) + nack := o.MakeNack(r.id) + client.sendNow(nack) + } else { + job := JobGet(r.id) + if job != nil { + o.Debug("Got Response. Acking.") + /* if the job exists, Ack it. */ + ack := o.MakeAck(r.id) + client.sendNow(ack) + } + // now, we only accept the results if we were + // expecting the results (ie: it was pending) + // and expunge the task information from the + // pending list so we stop bugging the client for it. + task, exists := client.pendingTasks[r.id] + if exists { + o.Debug("Storing results for Job %d", r.id) + // store the result. + if !JobAddResult(client.Player, r) { + o.Assert("Couldn't add result for pending task") + } + + // next, work out if the job is a retryable failure or not + var didretry bool = false + + if r.DidFail() { + o.Info("Client %s reports failure for Job %d", client.Name(), r.id) + if r.CanRetry() { + job := JobGet(r.id) + if job.Scope == SCOPE_ONEOF { + // right, we're finally deep enough to work out what's going on! + JobDisqualifyPlayer(r.id, client.Player) + if len(job.Players) >= 1 { + // still players left we can try? then go for it! + CleanTask(task) + DispatchTask(task) + didretry = true + } + } + } + } + if !didretry { + // if we didn't retry, the task needs to be marked as finished. + task.State = TASK_FINISHED + } + // update the job state. + JobReviewState(r.id) + + client.pendingTasks[r.id] = nil, false + } + } + } +} + + +var dispatcher = map[uint8] func(*ClientInfo,interface{}) { + o.TypeNop: handleNop, + o.TypeIdentifyClient: handleIdentify, + o.TypeReadyForTask: handleReadyForTask, + o.TypeTaskResponse: handleResult, + /* C->P only messages, should never appear on the wire. */ + o.TypeTaskRequest: handleIllegal, + +} + +var loopFudge int64 = 10e6; /* 10 ms should be enough fudgefactor */ +func clientLogic(client *ClientInfo) { + loop := true + for loop { + var retryWait <-chan int64 = nil + var retryTask *TaskRequest = nil + if (client.Player != "") { + var waitTime int64 = 0 + var now int64 = 0 + cleanPass := false + attempts := 0 + for !cleanPass && attempts < 10 { + /* reset our state for the pass */ + waitTime = 0 + retryTask = nil + attempts++ + cleanPass = true + now = time.Nanoseconds() + loopFudge + // if the client is correctly associated, + // evaluate all jobs for outstanding retries, + // and work out when our next retry is due. + for _,v := range client.pendingTasks { + if v.RetryTime < now { + client.SendTask(v) + cleanPass = false + } else { + if waitTime == 0 || v.RetryTime < waitTime { + retryTask = v + waitTime = v.RetryTime + } + } + } + } + if (attempts > 10) { + o.Fail("Couldn't find next timeout without restarting excessively.") + } + if (retryTask != nil) { + retryWait = time.After(waitTime-time.Nanoseconds()) + } + } + select { + case <-retryWait: + client.SendTask(retryTask) + case p := <-client.PktInQ: + /* we've received a packet. do something with it. */ + if client.Player == "" && p.Type != o.TypeIdentifyClient { + o.Warn("Client %s didn't Identify self - got type %d instead! Terminating Connection.", client.Name(), p.Type) + client.Abort() + break + } + var upkt interface {} = nil + if p.Length > 0 { + var err os.Error + + upkt, err = p.Decode() + if err != nil { + o.Warn("Error unmarshalling message from Client %s: %s. Terminating Connection.", client.Name(), err) + client.Abort() + break + } + } + handler, exists := dispatcher[p.Type] + if (exists) { + handler(client, upkt) + } else { + o.Warn("Unhandled Pkt Type %d", p.Type) + } + case p := <-client.PktOutQ: + if p != nil { + client.sendNow(p) + } + case t := <-client.TaskQ: + client.GotTask(t) + case <-client.abortQ: + o.Debug("Client %s connection has been told to abort!", client.Name()) + loop = false + case <-time.After(KeepaliveDelay): + p := o.MakeNop() + o.Debug("Sending Keepalive to %s", client.Name()) + _, err := p.Send(client.connection) + if err != nil { + o.Warn("Error sending pkt to %s: %s. Terminating Connection.", client.Name(), err) + client.Abort() + } + } + } + client.connection.Close() +} + +func clientReceiver(client *ClientInfo) { + conn := client.connection + + + loop := true + for loop { + pkt, err := o.Receive(conn) + if nil != err { + o.Warn("Error receiving pkt from %s: %s", conn.RemoteAddr().String(), err) + client.Abort() + client.connection.Close() + loop = false + } else { + client.PktInQ <- pkt + } + } + o.Debug("Client %s connection reader has exited it's loop!", conn.RemoteAddr().String()) +} + +/* The Main Server loop calls this method to hand off connections to us */ +func HandleConnection(conn net.Conn) { + /* this is a temporary client info, we substitute it for the real + * one once we ID the connection correctly */ + c := NewClientInfo() + c.connection = conn + go clientReceiver(c) + go clientLogic(c) +} diff --git a/src/conductor/conductor.go b/src/conductor/conductor.go new file mode 100644 index 0000000..61e6245 --- /dev/null +++ b/src/conductor/conductor.go @@ -0,0 +1,51 @@ +/* conductor.go +*/ + +package main + +import ( + "flag" + "os" + o "orchestra" +) + +var ( + ConfigFile = flag.String("config-file", "/etc/orchestra/conductor.conf", "File containing the conductor configuration") + DontVerifyPeer = flag.Bool("dont-verify-peer", false, "Ignore TLS verification for the peer") + // this is used a lot for marshalling. I just can't stuff it + // anywhere else. + InvalidValueError = os.NewError("Invalid value") +) + + +func main() { + o.SetLogName("conductor") + + // parse command line options. + flag.Parse() + + // Start the client registry - configuration parsing will block indefinately + // if the registry listener isn't working + StartRegistry() + + // do an initial configuration load - must happen before + // MakeSpoolDir and InitDispatch() + ConfigLoad() + + // Build the Spool Tree if necessary + MakeSpoolDir() + + // Load any old state we had. + LoadState() + defer SaveState() + + // start the master dispatch system + InitDispatch() + + // start the status listener + StartHTTP() + // start the audience listener + StartAudienceSock() + // service TLS requests. + ServiceRequests() +} diff --git a/src/conductor/config.go b/src/conductor/config.go new file mode 100644 index 0000000..1a32da1 --- /dev/null +++ b/src/conductor/config.go @@ -0,0 +1,106 @@ +package main + +import ( + "os" + "bufio" + o "orchestra" + "strings" + "github.com/kuroneko/configureit" +) + +var configFile *configureit.Config = configureit.New() + +func init() { + configFile.Add("x509 certificate", configureit.NewStringOption("/etc/orchestra/conductor_crt.pem")) + configFile.Add("x509 private key", configureit.NewStringOption("/etc/orchestra/conductor_key.pem")) + configFile.Add("ca certificates", configureit.NewPathListOption(nil)) + configFile.Add("bind address", configureit.NewStringOption("")) + configFile.Add("server name", configureit.NewStringOption("")) + configFile.Add("audience socket path", configureit.NewStringOption("/var/spool/orchestra/conductor.sock")) + configFile.Add("conductor state path", configureit.NewStringOption("/var/spool/orchestra")) + configFile.Add("player file path", configureit.NewStringOption("/etc/orchestra/players")) +} + +func GetStringOpt(key string) string { + cnode := configFile.Get(key) + if cnode == nil { + o.Assert("tried to get a configuration option that doesn't exist.") + } + sopt, ok := cnode.(*configureit.StringOption) + if !ok { + o.Assert("tried to get a non-string configuration option with GetStringOpt") + } + return strings.TrimSpace(sopt.Value) +} + + +func GetCACertList() []string { + cnode := configFile.Get("ca certificates") + if cnode == nil { + o.Assert("tried to get a configuration option that doesn't exist.") + } + plopt, _ := cnode.(*configureit.PathListOption) + return plopt.Values +} + +func ConfigLoad() { + // attempt to open the configuration file. + fh, err := os.Open(*ConfigFile) + if nil == err { + defer fh.Close() + // reset the config File data, then reload it. + configFile.Reset() + ierr := configFile.Read(fh, 1) + o.MightFail(ierr, "Couldn't parse configuration") + } else { + o.Warn("Couldn't open configuration file: %s. Proceeding anyway.", err) + } + + playerpath := strings.TrimSpace(GetStringOpt("player file path")) + pfh, err := os.Open(playerpath) + o.MightFail(err, "Couldn't open \"%s\"", playerpath) + + pbr := bufio.NewReader(pfh) + + ahmap := make(map[string]bool) + for err = nil; err == nil; { + var lb []byte + var prefix bool + + lb, prefix, err = pbr.ReadLine() + + if nil == lb { + break; + } + if prefix { + o.Fail("ConfigLoad: Short Read (prefix only)!") + } + + line := strings.TrimSpace(string(lb)) + if line == "" { + continue; + } + if line[0] == '#' { + continue; + } + ahmap[line] = true + } + // convert newAuthorisedHosts to a slice + authorisedHosts := make([]string, len(ahmap)) + idx := 0 + for k,_ := range ahmap { + authorisedHosts[idx] = k + idx++ + } + ClientUpdateKnown(authorisedHosts) + + // set the spool directory + SetSpoolDirectory(GetStringOpt("conductor state path")) +} + + +func HostAuthorised(hostname string) (r bool) { + /* if we haven't loaded the configuration, nobody is authorised */ + ci := ClientGet(hostname) + return ci != nil +} diff --git a/src/conductor/dispatch.go b/src/conductor/dispatch.go new file mode 100644 index 0000000..945c618 --- /dev/null +++ b/src/conductor/dispatch.go @@ -0,0 +1,140 @@ +/* dispatch.go +*/ + +package main + +import ( + "container/list" + o "orchestra" +) + +func NewRequest() (req *JobRequest) { + req = NewJobRequest() + + return req +} + +const messageBuffer = 10 + +var newJob = make(chan *JobRequest, messageBuffer) +var rqTask = make(chan *TaskRequest, messageBuffer) +var playerIdle = make(chan *ClientInfo, messageBuffer) +var playerDead = make(chan *ClientInfo, messageBuffer) +var statusRequest = make(chan(chan *QueueInformation)) + +func PlayerWaitingForJob(player *ClientInfo) { + playerIdle <- player +} + +func PlayerDied(player *ClientInfo) { + playerDead <- player +} + +func DispatchTask(task *TaskRequest) { + rqTask <- task +} + +type QueueInformation struct { + idlePlayers []string + waitingTasks int +} + +func DispatchStatus() (waitingTasks int, waitingPlayers []string) { + r := make(chan *QueueInformation) + + statusRequest <- r + s := <- r + + return s.waitingTasks, s.idlePlayers +} + +func InitDispatch() { + go masterDispatch(); // go! +} + +func QueueJob(job *JobRequest) { + /* first, allocate the Job it's ID */ + job.Id = NextRequestId() + /* first up, split the job up into it's tasks. */ + job.Tasks = job.MakeTasks() + /* add it to the registry */ + JobAdd(job) + /* an enqueue all of the tasks */ + for i := range job.Tasks { + DispatchTask(job.Tasks[i]) + } +} + +func masterDispatch() { + pq := list.New() + tq := list.New() + + for { + select { + case player := <-playerIdle: + o.Debug("Dispatch: Player") + /* first, scan to see if we have anything for this player */ + i := tq.Front() + for { + if (nil == i) { + /* Out of items! */ + /* Append this player to the waiting players queue */ + pq.PushBack(player) + break; + } + t,_ := i.Value.(*TaskRequest) + if t.IsTarget(player.Player) { + /* Found a valid job. Send it to the player, and remove it from our pending + * list */ + tq.Remove(i) + player.TaskQ <- t + break; + } + i = i.Next() + } + case player := <-playerDead: + o.Debug("Dispatch: Dead Player") + for i := pq.Front(); i != nil; i = i.Next() { + p, _ := i.Value.(*ClientInfo) + if player.Player == p.Player { + pq.Remove(i) + break; + } + } + case task := <-rqTask: + o.Debug("Dispatch: Task") + /* first, scan to see if we have valid pending player for this task */ + i := pq.Front() + for { + if (nil == i) { + /* Out of players! */ + /* Append this task to the waiting tasks queue */ + tq.PushBack(task) + break; + } + p,_ := i.Value.(*ClientInfo) + if task.IsTarget(p.Player) { + /* Found it. */ + pq.Remove(i) + p.TaskQ <- task + break; + } + i = i.Next(); + } + case respChan := <-statusRequest: + o.Debug("Status!") + response := new(QueueInformation) + response.waitingTasks = tq.Len() + pqLen := pq.Len() + response.idlePlayers = make([]string, pqLen) + + idx := 0 + for i := pq.Front(); i != nil; i = i.Next() { + player,_ := i.Value.(*ClientInfo) + response.idlePlayers[idx] = player.Player + idx++ + } + respChan <- response + } + } +} diff --git a/src/conductor/http.go b/src/conductor/http.go new file mode 100644 index 0000000..30f4109 --- /dev/null +++ b/src/conductor/http.go @@ -0,0 +1,39 @@ +/* http.go + * + * HTTP status server. +*/ + +package main + +import ( + "fmt" + "http" + "orchestra" +) + +/* default ports are all in server.go */ + +func StartHTTP() { + go httpServer() +} + +func returnStatus(w http.ResponseWriter, r *http.Request) { + tasks, players := DispatchStatus() + fmt.Fprintf(w, "

Tasks Waiting: %d

\n", tasks) + fmt.Fprintf(w, "

Players Idle:

\n
    \n") + var i int + for i = 0; i < len(players); i++ { + fmt.Fprintf(w, "
  • %s
  • \n", players[i]) + } + if (i == 0) { + fmt.Fprintf(w, "
  • none
  • ") + } + fmt.Fprintf(w, "
") +} + +func httpServer() { + laddr := fmt.Sprintf(":%d", orchestra.DefaultHTTPPort) + http.HandleFunc("/", returnStatus) + http.ListenAndServe(laddr, nil) +} + diff --git a/src/conductor/job_request.go b/src/conductor/job_request.go new file mode 100644 index 0000000..971b648 --- /dev/null +++ b/src/conductor/job_request.go @@ -0,0 +1,134 @@ +// job_request.go +// + +package main + +import ( + "sort" + "json" + "path" + "os" + "io" + o "orchestra" +) + +type JobRequest struct { + Score string `json:"score"` + Scope JobScope `json:"scope"` + Players []string `json:"players"` + Id uint64 `json:"id"` + State JobState `json:"state"` + Params map[string]string `json:"params"` + Tasks []*TaskRequest `json:"tasks"` + // you need to use the registry to access these - only public for + // marshalling use. + Results map[string]*TaskResponse `json:"results"` + // private: + + // Timeout for autoexpiry. Only valid if State if + // job.State.Finished() is true. + expirytime int64 +} + +func NewJobRequest() (req *JobRequest) { + req = new(JobRequest) + req.Results = make(map[string]*TaskResponse) + return req +} + +func JobRequestFromReader(src io.Reader) (req *JobRequest, err os.Error) { + req = NewJobRequest() + jdec := json.NewDecoder(src) + + err = jdec.Decode(req) + if err == nil { + if req.Results == nil { + req.Results = make(map[string]*TaskResponse) + } + } + + return req, err +} + +func (req *JobRequest) normalise() { + if (len(req.Players) > 1) { + /* sort targets so search works */ + sort.Strings(req.Players) + } else { + if (req.Scope == SCOPE_ONEOF) { + req.Scope = SCOPE_ALLOF + } + } +} + +func (req *JobRequest) MakeTasks() (tasks []*TaskRequest) { + req.normalise() + + var numtasks int + + switch (req.Scope) { + case SCOPE_ONEOF: + numtasks = 1 + case SCOPE_ALLOF: + numtasks = len(req.Players) + } + tasks = make([]*TaskRequest, numtasks) + + for c := 0; c < numtasks; c++ { + t := NewTaskRequest() + t.job = req + if (req.Scope == SCOPE_ALLOF) { + t.Player = req.Players[c] + } + tasks[c] = t + } + return tasks +} + +func (req *JobRequest) Valid() bool { + if (len(req.Players) <= 0) { + return false + } + return true +} + +func (req *JobRequest) FilenameForSpool() string { + if (req.State == JOB_PENDING) { + return path.Join(GetSpoolDirectory(), "active", FilenameForJobId(req.Id)) + } + return path.Join(GetSpoolDirectory(), "finished", FilenameForJobId(req.Id)) +} + +// dump the bytestream in buf into the serialisation file for req. +func (req *JobRequest) doSerialisation(buf []byte) { + // first up, clean up old state. + UnlinkNodesForJobId(req.Id) + outpath := req.FilenameForSpool() + fh, err := os.OpenFile(outpath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600) + if err != nil { + o.Warn("Could not create persistence file %s: %s", outpath, err) + return + } + defer fh.Close() + fh.Write(buf) +} + +func (req *JobRequest) UpdateInSpool() { + buf, err := json.MarshalIndent(req, "", " ") + o.MightFail(err, "Failed to marshal job %d", req.Id) + //FIXME: should try to do this out of the registry's thread. + req.doSerialisation(buf) +} + +// deserialise the job record from the finished spool +func LoadFromFinished(jobid uint64) (req *JobRequest, err os.Error) { + fpath := path.Join(GetSpoolDirectory(), "finished", FilenameForJobId(jobid)) + fh, err := os.Open(fpath) + if err != nil { + return nil, err + } + defer fh.Close() + + req, err = JobRequestFromReader(fh) + return req, err +} \ No newline at end of file diff --git a/src/conductor/job_scope.go b/src/conductor/job_scope.go new file mode 100644 index 0000000..45270a8 --- /dev/null +++ b/src/conductor/job_scope.go @@ -0,0 +1,53 @@ +// job_scope.go + +package main + +import ( + "os" + "json" +) + +const ( + SCOPE_INVALID = JobScope(iota) + SCOPE_ONEOF + SCOPE_ALLOF +) + +type JobScope int + +func (js JobScope) String() (strout string) { + switch js { + case SCOPE_ONEOF: + strout = "one" + case SCOPE_ALLOF: + strout = "all" + default: + strout = "" + } + return strout +} + +func (js JobScope) MarshalJSON() (out []byte, err os.Error) { + strout := js.String() + if strout != "" { + return json.Marshal(strout) + } + return nil, InvalidValueError +} + +func (js *JobScope) UnmarshalJSON(in []byte) (err os.Error) { + var scopestr string + err = json.Unmarshal(in, &scopestr) + if err != nil { + return err + } + switch scopestr { + case "one": + *js = SCOPE_ONEOF + case "all": + *js = SCOPE_ALLOF + default: + return InvalidValueError + } + return nil +} diff --git a/src/conductor/job_state.go b/src/conductor/job_state.go new file mode 100644 index 0000000..171ef47 --- /dev/null +++ b/src/conductor/job_state.go @@ -0,0 +1,76 @@ +// job_state.go + +package main + +import ( + "os" + "json" +) + +type JobState int + +const ( + JOB_STATE_INVALID = JobState(iota) + // Job is pending resolution + JOB_PENDING + // Job has completed and has no failures. + JOB_SUCCESSFUL + // Job has completed and has mixed results. + JOB_FAILED_PARTIAL + // Job has completed and has completely failed. + JOB_FAILED +) + +func (js JobState) Finished() bool { + if js == JOB_PENDING { + return false + } + return true +} + +func (js JobState) String() (strout string) { + switch js { + case JOB_PENDING: + strout = "PENDING" + case JOB_SUCCESSFUL: + strout = "OK" + case JOB_FAILED: + strout = "FAIL" + case JOB_FAILED_PARTIAL: + strout = "PARTIAL_FAIL" + default: + strout = "" + } + return strout + +} + +func (js JobState) MarshalJSON() (out []byte, err os.Error) { + strout := js.String() + if strout != "" { + return json.Marshal(strout) + } + return nil, InvalidValueError +} + +func (js *JobState) UnmarshalJSON(in []byte) (err os.Error) { + var statestr string + err = json.Unmarshal(in, &statestr) + if err != nil { + return err + } + switch statestr { + case "PENDING": + *js = JOB_PENDING + case "OK": + *js = JOB_SUCCESSFUL + case "FAIL": + *js = JOB_FAILED + case "PARTIAL_FAIL": + *js = JOB_FAILED_PARTIAL + default: + return InvalidValueError + } + return nil +} + diff --git a/src/conductor/persistence.go b/src/conductor/persistence.go new file mode 100644 index 0000000..7db4d04 --- /dev/null +++ b/src/conductor/persistence.go @@ -0,0 +1,315 @@ +// persistence.go +// +package main + +import ( + "fmt" + "path" + "os" + "unicode" + "strconv" + "sync/atomic" + "bufio" + "strings" + o "orchestra" +) + +// changing this will result in fire. you have been warned. +const bucketDepth = 2 + +var spoolDirectory = "" + +func SetSpoolDirectory(spooldir string) { + if spoolDirectory == "" { + spoolDirectory = spooldir + } else { + if spooldir != spoolDirectory { + o.Warn("Spool Directory Not Changed.") + } + } +} + +func GetSpoolDirectory() string { + if spoolDirectory == "" { + o.Assert("GetSpoolDirectory() called before set") + } + return spoolDirectory +} + + +const ( + IdCheckpointSafetySkip = 10e4 // Skip 10e4 entries if orchestra didn't shutdown cleanly for safety. +) + +var lastId uint64 = 0 + +func checkpointPath() string { + return path.Join(spoolDirectory, "last_id.checkpoint") +} + +func savePath() string { + return path.Join(spoolDirectory, "last_id") +} + +func loadLastId() { + fh, err := os.Open(checkpointPath()) + if err == nil { + defer fh.Close() + + // we have a checkpoint file. blah. + cbio := bufio.NewReader(fh) + l, err := cbio.ReadString('\n') + lastId, err = strconv.Atoui64(strings.TrimSpace(l)) + if err != nil { + o.Fail("Couldn't read Last ID from checkpoint file. Aborting for safety.") + } + lastId += IdCheckpointSafetySkip + } else { + pe, ok := err.(*os.PathError) + if !ok || pe.Error != os.ENOENT { + o.Fail("Found checkpoint file, but couldn't open it: %s", err) + } + fh,err := os.Open(savePath()) + if err != nil { + pe, ok = err.(*os.PathError) + if !ok || pe.Error == os.ENOENT { + lastId = 0; + return; + } + o.MightFail(err, "Couldn't open last_id file") + } + defer fh.Close() + cbio := bufio.NewReader(fh) + l, err := cbio.ReadString('\n') + lastId, err = strconv.Atoui64(strings.TrimSpace(l)) + if err != nil { + o.Fail("Couldn't read Last ID from last_id. Aborting for safety.") + } + } + writeIdCheckpoint() +} + +func writeIdCheckpoint() { + fh, err := os.OpenFile(checkpointPath(), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) + if err != nil { + o.Warn("Failed to create checkpoint file: %s", err) + return + } + defer fh.Close() + fmt.Fprintf(fh, "%d\n", lastId) +} + +func saveLastId() { + fh, err := os.OpenFile(savePath(), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) + if err != nil { + o.Warn("Failed to create last ID save file: %s", err) + return + } + defer fh.Close() + fmt.Fprintf(fh, "%d\n", lastId) + os.Remove(checkpointPath()) +} + +func NextRequestId() uint64 { + //FIXME: we should do this periodically, not on every new job. + defer writeIdCheckpoint() + return atomic.AddUint64(&lastId, 1) +} + +func FilenameForJobId(jobid uint64) (fullpath string) { + fnbits := make([]string, bucketDepth+1) + for i := 0; i < bucketDepth; i++ { + fnbits[i] = fmt.Sprintf("%01X", (jobid >> uint(i*4)) & 0xF) + } + fnbits[bucketDepth] = fmt.Sprintf("%016X", jobid) + + return path.Join(fnbits...) +} + +func makeSpoolDirInner(prefix string, depth int) { + for i := 0; i < 16; i++ { + dirname := path.Join(prefix, fmt.Sprintf("%01X", i)) + if (depth == 1) { + err := os.MkdirAll(dirname, 0700) + o.MightFail(err, "Couldn't make directory building spool tree") + } else { + makeSpoolDirInner(dirname, depth-1) + } + } +} + +func MakeSpoolDir() { + makeSpoolDirInner(path.Join(spoolDirectory, "active"), bucketDepth) + makeSpoolDirInner(path.Join(spoolDirectory, "finished"), bucketDepth) + os.MkdirAll(path.Join(spoolDirectory, "corrupt"), 0700) +} + +func UnlinkNodesForJobId(jobid uint64) { + suffix := FilenameForJobId(jobid) + + os.Remove(path.Join(spoolDirectory, "active", suffix)) + os.Remove(path.Join(spoolDirectory, "finished", suffix)) +} + +func shuffleToCorrupted(abspath, reason string) { + basename := path.Base(abspath) + targetname := path.Join(spoolDirectory, "corrupt", basename) + // make sure there's nothing in the target name. + os.Remove(targetname) + err := os.Rename(abspath, targetname) + o.MightFail(err, "Couldn't bin corrupt spoolfile %s", abspath) + o.Warn("Moved \"%s\" to corrupted spool: %s", abspath, reason) +} + +func loadSpoolFiles(dirname string, depth int) { + dh, err := os.Open(dirname) + o.MightFail(err, "Couldn't open %s", dirname) + nodes, err := dh.Readdir(-1) + o.MightFail(err, "Couldn't readdir on %s", dirname) + if depth > 0 { + for _, n := range nodes { + abspath := path.Join(dirname, n.Name) + if n.IsDirectory() { + // if not a single character, it's not a spool node. + if len(n.Name) != 1 { + continue; + } + if n.Name == "." { + // we're not interested in . + continue; + } + nrunes := []int(n.Name) + if unicode.Is(unicode.ASCII_Hex_Digit, nrunes[0]) { + loadSpoolFiles(abspath, depth-1) + } else { + o.Warn("Foreign dirent %s found in spool tree", abspath) + } + } + } + } else { + // depth == 0 - only interested in files. + for _, n := range nodes { + abspath := path.Join(dirname, n.Name) + if n.IsRegular() { + if len(n.Name) != 16 { + shuffleToCorrupted(abspath, "Filename incorrect length") + continue + } + id, err := strconv.Btoui64(n.Name, 16) + if err != nil { + shuffleToCorrupted(abspath, "Invalid Filename") + continue + } + fh, err := os.Open(abspath) + if err != nil { + shuffleToCorrupted(abspath, "Couldn't open") + continue + } + defer fh.Close() + jr, err := JobRequestFromReader(fh) + if err != nil || jr.Id != id { + o.Warn("Couldn't parse?! %s", err) + shuffleToCorrupted(abspath, "Parse Failure") + continue + } + // Add the request to the registry directly. + if !RestoreJobState(jr) { + shuffleToCorrupted(abspath, "Job State Invalid") + } + } + } + } +} + +// This takes an unmarshall'd job and stuffs it back into the job state. +func RestoreJobState(job *JobRequest) bool { + // check the valid players list. + var playersout []string = nil + resultsout := make(map[string]*TaskResponse) + for _, p := range job.Players { + if HostAuthorised(p) { + playersout = append(playersout, p) + // fix the result too. + resout, exists := job.Results[p] + if exists && resout != nil { + resout.id = job.Id + resultsout[p] = resout + } + // remove it so we can sweep it in pass2 for + // results from old hosts that matter. + job.Results[p] = nil, false + } + } + job.Players = playersout + if len(job.Players) == 0 { + // If there are no players left at this point, discard + // the job as corrupt. + return false + } + // now, do pass 2 over the remaining results. + for k, v := range job.Results { + if v != nil { + // if the results indicate completion, we + // always retain them. + if v.State.Finished() { + resultsout[k] = v + resultsout[k].id = job.Id + } + } + } + job.Results = resultsout + + // now, check the task data. ONEOF jobs are allowed to + // reset tasks that have never been sent. + var tasksout []*TaskRequest = nil + for _, t := range job.Tasks { + // rebuild the return link + t.job = job + // finished tasks we don't care about. + if t.State.Finished() { + tasksout = append(tasksout, t) + continue + } + if job.Scope == SCOPE_ONEOF { + if t.Player != "" && (t.State == TASK_QUEUED || !HostAuthorised(t.Player)) { + t.State = TASK_QUEUED + t.Player = "" + } + tasksout = append(tasksout, t) + continue + } else { + if HostAuthorised(t.Player) { + tasksout = append(tasksout, t) + } + } + } + job.Tasks = tasksout + if len(job.Tasks) == 0 { + o.Debug("Empty tasks in deserialised job") + // Tasks should never be empty. + return false + } + // put the job back into the system. + JobAdd(job) + JobReviewState(job.Id) + if (!job.State.Finished()) { + // now, redispatch anything that's not actually finished. + for _, t := range job.Tasks { + if !t.State.Finished() { + DispatchTask(t) + } + } + } + return true +} + +func LoadState() { + loadLastId() + dirname := path.Join(spoolDirectory, "active") + loadSpoolFiles(dirname, bucketDepth) +} + +func SaveState() { + JobWriteAll() + saveLastId() +} diff --git a/src/conductor/registry.go b/src/conductor/registry.go new file mode 100644 index 0000000..184b816 --- /dev/null +++ b/src/conductor/registry.go @@ -0,0 +1,586 @@ +// registry.go +// +// The Registry provides a 'threadsafe' interface to various global information stores. +// +// The registry dispatch thread is forbidden from performing any work that is likely to block. +// Result channels must be buffered with enough space for the full set of results. + +package main + +import ( + o "orchestra" + "sort" + "time" + "container/list" +) + +// Request Types +const ( + requestAddClient = iota + requestGetClient + requestDeleteClient + requestSyncClients + + requestAddJob + requestGetJob + requestAddJobResult + requestGetJobResult + requestGetJobResultNames + requestDisqualifyPlayer + requestReviewJobStatus + + requestWriteJobUpdate + requestWriteJobAll + + requestQueueSize = 10 + + jobLingerTime = int64(30e9) +) + +type registryRequest struct { + operation int + id uint64 + hostname string + hostlist []string + job *JobRequest + tresp *TaskResponse + responseChannel chan *registryResponse +} + +type registryResponse struct { + success bool + info *ClientInfo + tresp *TaskResponse + names []string + jobs []*JobRequest +} + +var ( + chanRegistryRequest = make(chan *registryRequest, requestQueueSize) + clientList = make(map[string]*ClientInfo) + jobRegister = make(map[uint64]*JobRequest) + expiryChan <-chan int64 + expiryJobid uint64 + expiryList *list.List + + expiryLoopFudge int64 = 10e6; /* 10 ms should be enough fudgefactor */ +) + +func init() { + expiryList = list.New() +} + +func regInternalAdd(hostname string) { + o.Warn("Registry: New Host \"%s\"", hostname) + clientList[hostname] = NewClientInfo() + // do this initialisation here since it'll help unmask sequencing errors + clientList[hostname].pendingTasks = make(map[uint64]*TaskRequest) + clientList[hostname].Player = hostname +} + +func regInternalDel(hostname string) { + o.Warn("Registry: Deleting Host \"%s\"", hostname) + /* remove it from the registry */ + clientList[hostname] = nil, false +} + +func regInternalExpireJob(jobid uint64) { + job, exists := jobRegister[jobid] + if exists { + if job.State.Finished() { + jobRegister[jobid] = nil, false + } else { + o.Assert("Tried to expire incomplete job.") + } + } +} + +func regInternalFindNextExpiry() { + if expiryChan != nil { + o.Assert("Attempted to Find Next Expiry avenue with expiry timer active.") + } + // if there's nothing to expire, do nothing. + if expiryList.Len() == 0 { + return + } + + for expiryChan == nil && expiryList.Len() > 0 { + jobif := expiryList.Remove(expiryList.Front()) + req, ok := jobif.(*JobRequest) + if !ok { + o.Assert("item in expiryList not a *JobRequest") + } + if (time.Nanoseconds() + expiryLoopFudge) > req.expirytime { + regInternalExpireJob(req.Id) + } else { + expiryChan = time.After(req.expirytime - time.Nanoseconds()) + expiryJobid = req.Id + } + } +} + +func regInternalMarkJobForExpiry(job *JobRequest) { + job.expirytime = time.Nanoseconds() + jobLingerTime + expiryList.PushBack(job) + // if there is no job pending expiry, feed it into the delay loop + if expiryChan == nil { + regInternalFindNextExpiry() + } +} + +var registryHandlers = map[int] func(*registryRequest, *registryResponse) { +requestAddClient: regintAddClient, +requestGetClient: regintGetClient, +requestDeleteClient: regintDeleteClient, +requestSyncClients: regintSyncClients, +requestAddJob: regintAddJob, +requestGetJob: regintGetJob, +requestAddJobResult: regintAddJobResult, +requestGetJobResult: regintGetJobResult, +requestGetJobResultNames: regintGetJobResultNames, +requestDisqualifyPlayer: regintDisqualifyPlayer, +requestReviewJobStatus: regintReviewJobStatus, +requestWriteJobUpdate: regintWriteJobUpdate, +requestWriteJobAll: regintWriteJobAll, +} + +func manageRegistry() { + for { + select { + case req := <-chanRegistryRequest: + resp := new(registryResponse) + // by default, we failed. + resp.success = false + // find the operation + handler, exists := registryHandlers[req.operation] + if exists { + handler(req, resp) + } + if req.responseChannel != nil { + req.responseChannel <- resp + } + case <-expiryChan: + o.Debug("job%d: Expiring Job Record", expiryJobid) + regInternalExpireJob(expiryJobid) + expiryChan = nil + regInternalFindNextExpiry() + } + } +} + +func StartRegistry() { + go manageRegistry() +} + +func newRequest(wants_response bool) (req *registryRequest) { + req = new(registryRequest) + if wants_response { + req.responseChannel = make(chan *registryResponse, 1) + } + + return req +} + +func ClientAdd(hostname string) (success bool) { + r := newRequest(true) + r.operation = requestAddClient + r.hostname = hostname + chanRegistryRequest <- r + resp := <- r.responseChannel + + return resp.success +} + +func regintAddClient(req *registryRequest, resp *registryResponse) { + _, exists := clientList[req.hostname] + if exists { + resp.success = false + } else { + regInternalAdd(req.hostname) + resp.success = true + } +} + +func ClientDelete(hostname string) (success bool) { + r := newRequest(true) + r.operation = requestDeleteClient + r.hostname = hostname + chanRegistryRequest <- r + resp := <- r.responseChannel + + return resp.success +} + +func regintDeleteClient(req *registryRequest, resp *registryResponse) { + _, exists := clientList[req.hostname] + if exists { + resp.success = true + regInternalDel(req.hostname) + } else { + resp.success = false + } +} + +func ClientGet(hostname string) (info *ClientInfo) { + r := newRequest(true) + r.operation = requestGetClient + r.hostname = hostname + chanRegistryRequest <- r + resp := <- r.responseChannel + if resp.success { + return resp.info + } + return nil +} + +func regintGetClient(req *registryRequest, resp *registryResponse) { + clinfo, exists := clientList[req.hostname] + if exists { + resp.success = true + resp.info = clinfo + } else { + resp.success = false + } +} + +func ClientUpdateKnown(hostnames []string) { + /* this is an asynchronous, we feed it into the registry + * and it'll look after itself. + */ + r := newRequest(false) + r.operation = requestSyncClients + r.hostlist = hostnames + chanRegistryRequest <- r +} + +func regintSyncClients(req *registryRequest, resp *registryResponse) { + // we need to make sure the registered clients matches the + // hostlist we're given. + // + // First, we transform the array into a map + newhosts := make(map[string]bool) + for k,_ := range req.hostlist { + newhosts[req.hostlist[k]] = true + } + // now, scan the current list, checking to see if they exist. + // Remove them from the newhosts map if they do exist. + for k,_ := range clientList { + _, exists := newhosts[k] + if exists { + // remove it from the newhosts map + newhosts[k] = false, false + } else { + regInternalDel(k) + } + } + // now that we're finished, we should only have new clients in + // the newhosts list left. + for k,_ := range newhosts { + regInternalAdd(k) + } + // and we're done. +} + +// Add a Job to the registry. Return true if successful, returns +// false if the job is lacking critical information (such as a JobId) +// and can't be registered. +func JobAdd(job *JobRequest) bool { + rr := newRequest(true) + rr.operation = requestAddJob + rr.job = job + + chanRegistryRequest <- rr + resp := <- rr.responseChannel + return resp.success +} + +func regintAddJob(req *registryRequest, resp *registryResponse) { + if nil == req.job { + return + } + // ensure that the players are sorted! + sort.Strings(req.job.Players) + // update the state + req.job.updateState() + // and register the job + _, overwrite := jobRegister[req.job.Id] + if !overwrite { + jobRegister[req.job.Id] = req.job + // force a queue update. + req.job.UpdateInSpool() + if req.job.State.Finished() { + regInternalMarkJobForExpiry(req.job) + } + resp.success = true + } +} + +// Get a Job from the registry. Returns the job if successful, +// returns nil if the job couldn't be found. +func JobGet(id uint64) *JobRequest { + rr := newRequest(true) + rr.operation = requestGetJob + rr.id = id + + chanRegistryRequest <- rr + resp := <- rr.responseChannel + if resp.jobs == nil { + return nil + } + return resp.jobs[0] +} + +func regintGetJob(req *registryRequest, resp *registryResponse) { + job, exists := jobRegister[req.id] + resp.success = exists + if exists { + resp.jobs = make([]*JobRequest, 1) + resp.jobs[0] = job + } else { + o.Warn("Received Request for job%d which is not in memory", req.id) + go regintGetJobDeferred(req.id, req.responseChannel) + // mask out the responseChannel so the deferred handler can use it. + req.responseChannel = nil + } +} + +func regintGetJobDeferred(jobid uint64, responseChannel chan<- *registryResponse) { + resp := new(registryResponse) + resp.success = false + defer func (resp *registryResponse, rChan chan<- *registryResponse) { + rChan <- resp; + }(resp, responseChannel) + + req, err := LoadFromFinished(jobid) + if err != nil { + o.Warn("Couldn't load job%d from disk. Doesn't exist?", jobid) + return + } + // fix up the state, and stuff it back into the system + RestoreJobState(req) + resp.jobs = make([]*JobRequest, 1) + resp.jobs[0] = req + resp.success = true +} + +// Attach a result to a Job in the Registry +// +// This exists in order to prevent nasty concurrency problems +// when trying to put results back onto the job. Reading a job is far +// less of a problem than writing to it. +func JobAddResult(playername string, task *TaskResponse) bool { + rr := newRequest(true) + rr.operation = requestAddJobResult + rr.tresp = task + rr.hostname = playername + chanRegistryRequest <- rr + resp := <- rr.responseChannel + return resp.success +} + +func regintAddJobResult(req *registryRequest, resp *registryResponse) { + job, exists := jobRegister[req.tresp.id] + resp.success = exists + if exists { + job.Results[req.hostname] = req.tresp + // force a queue update. + job.UpdateInSpool() + } +} + +// Get a result from the registry +func JobGetResult(id uint64, playername string) (tresp *TaskResponse) { + rr := newRequest(true) + rr.operation = requestGetJobResult + rr.id = id + rr.hostname = playername + chanRegistryRequest <- rr + resp := <- rr.responseChannel + return resp.tresp +} + +func regintGetJobResult(req *registryRequest, resp *registryResponse) { + job, exists := jobRegister[req.id] + if exists { + result, exists := job.Results[req.hostname] + resp.success = exists + if exists { + resp.tresp = result + } + } else { + resp.success = false + } +} + +// Get a list of names we have results for against a given job. +func JobGetResultNames(id uint64) (names []string) { + rr := newRequest(true) + rr.operation = requestGetJobResultNames + rr.id = id + + chanRegistryRequest <- rr + resp := <- rr.responseChannel + return resp.names +} + +func regintGetJobResultNames(req *registryRequest, resp *registryResponse) { + job, exists := jobRegister[req.id] + resp.success = exists + if exists { + resp.names = make([]string, len(job.Results)) + idx := 0 + for k, _ := range job.Results { + resp.names[idx] = k + idx++ + } + } +} + +// Disqualify a player from servicing a job +func JobDisqualifyPlayer(id uint64, playername string) bool { + rr := newRequest(true) + rr.operation = requestDisqualifyPlayer + rr.id = id + rr.hostname = playername + + chanRegistryRequest <- rr + resp := <- rr.responseChannel + + return resp.success +} + +func regintDisqualifyPlayer(req *registryRequest, resp *registryResponse) { + job, exists := jobRegister[req.id] + if exists { + idx := sort.Search(len(job.Players), func(idx int) bool { return job.Players[idx] >= req.hostname }) + if (job.Players[idx] == req.hostname) { + resp.success = true + newplayers := make([]string, len(job.Players)-1) + copy(newplayers[0:idx], job.Players[0:idx]) + copy(newplayers[idx:len(job.Players)-1], job.Players[idx+1:len(job.Players)]) + job.Players = newplayers + job.updateState() + // force a queue update. + job.UpdateInSpool() + } else { + resp.success = false + } + } else { + resp.success = false + } +} + +func JobReviewState(id uint64) bool { + rr := newRequest(true) + rr.operation = requestReviewJobStatus + rr.id = id + + chanRegistryRequest <- rr + resp := <- rr.responseChannel + + return resp.success +} + +func regintReviewJobStatus(req *registryRequest, resp *registryResponse) { + job, exists := jobRegister[req.id] + resp.success = exists + if exists { + job.updateState() + // force a queue update. + job.UpdateInSpool() + } +} + +func JobWriteUpdate(id uint64) { + rr := newRequest(false) + rr.operation = requestWriteJobUpdate + rr.id = id + chanRegistryRequest <- rr +} + +func regintWriteJobUpdate(req *registryRequest, resp *registryResponse) { + job, exists := jobRegister[req.id] + resp.success = exists + if exists { + job.UpdateInSpool() + } +} + +func JobWriteAll() bool { + rr := newRequest(true) + rr.operation = requestWriteJobAll + + chanRegistryRequest <- rr + resp := <-rr.responseChannel + + return resp.success +} + +func regintWriteJobAll(req *registryRequest, resp *registryResponse) { + for _, job := range jobRegister { + job.UpdateInSpool() + } + resp.success = true +} + +// Ugh. +func (job *JobRequest) updateState() { + if job.Results == nil { + o.Assert("job.Results nil for jobid %d", job.Id) + return + } + was_finished := job.State.Finished() + switch job.Scope { + case SCOPE_ONEOF: + // look for a success (any success) in the responses + var success bool = false + for host, res := range job.Results { + if res == nil { + o.Debug("nil result for %s?", host) + continue + } + if res.State == RESP_FINISHED { + success = true + break + } + } + // update the job state based upon these findings + if success { + job.State = JOB_SUCCESSFUL + } else { + if len(job.Players) < 1 { + job.State = JOB_FAILED + } else { + job.State = JOB_PENDING + } + } + case SCOPE_ALLOF: + var success int = 0 + var failed int = 0 + + for pidx := range job.Players { + p := job.Players[pidx] + resp, exists := job.Results[p] + if exists { + if resp.DidFail() { + failed++ + } else if resp.State == RESP_FINISHED { + success++ + } + } + } + if (success + failed) < len(job.Players) { + job.State = JOB_PENDING + } else if success == len(job.Players) { + job.State = JOB_SUCCESSFUL + } else if failed == len(job.Players) { + job.State = JOB_FAILED + } else { + job.State = JOB_FAILED_PARTIAL + } + } + if !was_finished && job.State.Finished() { + o.Debug("job%d: Finished - Setting Expiry Time", job.Id) + regInternalMarkJobForExpiry(job) + } +} diff --git a/src/conductor/resp_state.go b/src/conductor/resp_state.go new file mode 100644 index 0000000..146d08b --- /dev/null +++ b/src/conductor/resp_state.go @@ -0,0 +1,113 @@ +// resp_state.go + +package main + +import ( + "os" + "json" +) + +type ResponseState int + +const ( + // Response states + RESP_INVALID = ResponseState(iota) + RESP_PENDING // internal state, not wire. + RESP_RUNNING + RESP_FINISHED + RESP_FAILED + RESP_FAILED_UNKNOWN_SCORE + RESP_FAILED_HOST_ERROR + RESP_FAILED_UNKNOWN +) + +func (rs ResponseState) String() (strout string) { + switch rs { + case RESP_RUNNING: + return "PENDING" + case RESP_FINISHED: + return "OK" + case RESP_FAILED: + return "FAIL" + case RESP_FAILED_UNKNOWN_SCORE: + return "UNK_SCORE" + case RESP_FAILED_HOST_ERROR: + return "HOST_ERROR" + case RESP_FAILED_UNKNOWN: + return "UNKNOWN_FAILURE" + } + return "" +} + +func (rs ResponseState) MarshalJSON() (out []byte, err os.Error) { + strout := rs.String() + if strout != "" { + return json.Marshal(strout) + } + return nil, InvalidValueError +} + +func (rs *ResponseState) UnmarshalJSON(in []byte) (err os.Error) { + var statestr string + err = json.Unmarshal(in, &statestr) + if err != nil { + return err + } + switch statestr { + case "PENDING": + *rs = RESP_PENDING + case "OK": + *rs = RESP_FINISHED + case "FAIL": + *rs = RESP_FAILED + case "UNK_SCORE": + *rs = RESP_FAILED_UNKNOWN_SCORE + case "HOST_ERROR": + *rs = RESP_FAILED_HOST_ERROR + case "UNKNOWN_FAILURE": + *rs = RESP_FAILED_UNKNOWN + default: + return InvalidValueError + } + return nil +} + +func (rs ResponseState) Finished() bool { + switch rs { + case RESP_FINISHED: + fallthrough + case RESP_FAILED: + fallthrough + case RESP_FAILED_UNKNOWN_SCORE: + fallthrough + case RESP_FAILED_HOST_ERROR: + fallthrough + case RESP_FAILED_UNKNOWN: + return true + } + return false +} + +// true if the response says the task failed. false otherwise. +func (rs ResponseState) Failed() bool { + switch rs { + case RESP_RUNNING: + fallthrough + case RESP_FINISHED: + return false + } + return true +} + +// true if the task can be tried. +// precond: DidFail is true, job is a ONE_OF job. +// must return false otherwise. +func (rs ResponseState) CanRetry() bool { + switch rs { + case RESP_FAILED_UNKNOWN_SCORE: + fallthrough + case RESP_FAILED_HOST_ERROR: + return true + } + return false +} \ No newline at end of file diff --git a/src/conductor/server.go b/src/conductor/server.go new file mode 100644 index 0000000..1d81dfa --- /dev/null +++ b/src/conductor/server.go @@ -0,0 +1,105 @@ +/* server.go + * + * TLS and Connection Hell. +*/ + +package main + +import ( + "net" + "crypto/tls" + "fmt" + "os" + "crypto/x509" + o "orchestra" +) + +var ( + CACertPool *x509.CertPool = nil +) + + +func ServiceRequests() { + var sockConfig tls.Config + + // resolve the bind address + bindAddressStr := GetStringOpt("bind address") + var bindAddr *net.IPAddr = nil + if (bindAddressStr != "") { + var err os.Error + bindAddr, err = net.ResolveIPAddr("ip", bindAddressStr) + if (err != nil) { + o.Warn("Ignoring bind address. Couldn't resolve \"%s\": %s", bindAddressStr, err) + } else { + bindAddr = nil + } + } + // load the x509 certificate and key, then attach it to the tls config. + x509CertFilename := GetStringOpt("x509 certificate") + x509PrivateKeyFilename := GetStringOpt("x509 private key") + serverCert, err := tls.LoadX509KeyPair(x509CertFilename, x509PrivateKeyFilename) + o.MightFail(err, "Couldn't load certificates") + sockConfig.Certificates = append(sockConfig.Certificates, serverCert) + + // load the CA certs + CACertPool = x509.NewCertPool() + caCertNames := GetCACertList() + if caCertNames != nil { + for _, filename := range caCertNames { + fh, err := os.Open(filename) + if err != nil { + o.Warn("Whilst parsing CA certs, couldn't open %s: %s", filename, err) + continue + } + defer fh.Close() + fi, err := fh.Stat() + o.MightFail(err, "Couldn't stat CA certificate file: %s", filename) + data := make([]byte, fi.Size) + fh.Read(data) + CACertPool.AppendCertsFromPEM(data) + } + } + sockConfig.RootCAs = CACertPool + + // determine the server hostname. + servername := GetStringOpt("server name") + if servername != "" { + o.Info("Using %s as the server name", servername) + sockConfig.ServerName = servername + } else { + if bindAddr != nil { + o.Warn("Probing for fqdn for bind address as none was provided.") + hostnames, err := net.LookupAddr(bindAddr.String()) + o.MightFail(err, "Failed to get full hostname for bind address") + sockConfig.ServerName = hostnames[0] + } else { + o.Warn("Probing for fqdn as no server name was provided") + sockConfig.ServerName = o.ProbeHostname() + } + } + + // ask the client to authenticate + sockConfig.AuthenticateClient = true + + /* convert the bindAddress to a string suitable for the Listen call */ + var laddr string + if (bindAddr == nil) { + laddr = fmt.Sprintf(":%d", o.DefaultMasterPort) + } else { + laddr = fmt.Sprintf("%s:%d", bindAddr.String(), o.DefaultMasterPort) + } + o.Info("Binding to %s", laddr) + listener, err := tls.Listen("tcp", laddr, &sockConfig) + o.MightFail(err, "Couldn't bind TLS listener") + + for { + o.Warn("Waiting for Connection...") + c, err := listener.Accept() + o.MightFail(err, "Couldn't accept TLS connection") + o.Warn("Connection received from %s", c.RemoteAddr().String()) + HandleConnection(c) + } +} + + + diff --git a/src/conductor/signal.go b/src/conductor/signal.go new file mode 100644 index 0000000..509c187 --- /dev/null +++ b/src/conductor/signal.go @@ -0,0 +1,50 @@ +/* signal.go + * + * Signal Handlers + */ + +package main + +import ( + "os/signal" + "os" + "syscall" + "fmt" + o "orchestra" +) + + +// handle the signals. By default, we ignore everything, but the +// three terminal signals, HUP, INT, TERM, we want to explicitly +// handle. +func signalHandler() { + for { + sig := <-signal.Incoming + + ux, ok := sig.(os.UnixSignal) + if !ok { + o.Warn("Couldn't handle signal %s, Coercion failed", sig) + continue + } + + switch int(ux) { + case syscall.SIGHUP: + o.Warn("Reloading Configuration") + ConfigLoad() + case syscall.SIGINT: + fmt.Fprintln(os.Stderr, "Interrupt Received - Terminating") + //FIXME: Gentle Shutdown + SaveState() + os.Exit(1) + case syscall.SIGTERM: + fmt.Fprintln(os.Stderr, "Terminate Received - Terminating") + //FIXME: Gentle Shutdown + os.Exit(2) + } + } + +} + +func init() { + go signalHandler() +} diff --git a/src/conductor/task_request.go b/src/conductor/task_request.go new file mode 100644 index 0000000..f6f3fc0 --- /dev/null +++ b/src/conductor/task_request.go @@ -0,0 +1,47 @@ +// task_request.go +// +package main + +import ( + "sort" + o "orchestra" +) + +type TaskRequest struct { + job *JobRequest + Player string `json:"player"` + State TaskState `json:"state"` + RetryTime int64 `json:"retrytime"` +} + +func NewTaskRequest() (tr *TaskRequest) { + tr = new(TaskRequest) + tr.State = TASK_QUEUED + + return tr +} + +func (task *TaskRequest) Encode() (ptr *o.ProtoTaskRequest) { + ptr = new(o.ProtoTaskRequest) + ptr.Jobname = &task.job.Score + ptr.Id = new(uint64) + *ptr.Id = task.job.Id + ptr.Parameters = o.ProtoJobParametersFromMap(task.job.Params) + + return ptr +} + +func (task *TaskRequest) IsTarget(player string) (valid bool) { + valid = false + if task.Player == "" { + n := sort.SearchStrings(task.job.Players, player) + if task.job.Players[n] == player { + valid = true + } + } else { + if task.Player == player { + valid = true + } + } + return valid +} diff --git a/src/conductor/task_response.go b/src/conductor/task_response.go new file mode 100644 index 0000000..c76819a --- /dev/null +++ b/src/conductor/task_response.go @@ -0,0 +1,61 @@ +// task_response.go +// +package main + +import ( + o "orchestra" +) + +type TaskResponse struct { + id uint64 + State ResponseState `json:"state"` + Response map[string]string `json:"response"` +} + +// Response related magic + +func NewTaskResponse() (resp *TaskResponse) { + resp = new(TaskResponse) + resp.Response = make(map[string]string) + + return resp +} + +func (resp *TaskResponse) IsFinished() bool { + return resp.State.Finished() +} + +func (resp *TaskResponse) DidFail() bool { + return resp.State.Failed() +} + +func (resp *TaskResponse) CanRetry() bool { + return resp.State.CanRetry() +} + + +func ResponseFromProto(ptr *o.ProtoTaskResponse) (r *TaskResponse) { + r = new(TaskResponse) + + switch (*(ptr.Status)) { + case o.ProtoTaskResponse_JOB_INPROGRESS: + r.State = RESP_RUNNING + case o.ProtoTaskResponse_JOB_SUCCESS: + r.State = RESP_FINISHED + case o.ProtoTaskResponse_JOB_FAILED: + r.State = RESP_FAILED + case o.ProtoTaskResponse_JOB_HOST_FAILURE: + r.State = RESP_FAILED_HOST_ERROR + case o.ProtoTaskResponse_JOB_UNKNOWN: + r.State = RESP_FAILED_UNKNOWN_SCORE + case o.ProtoTaskResponse_JOB_UNKNOWN_FAILURE: + fallthrough + default: + r.State = RESP_FAILED_UNKNOWN + } + + r.id = *(ptr.Id) + r.Response = o.MapFromProtoJobParameters(ptr.Response) + + return r +} diff --git a/src/conductor/task_state.go b/src/conductor/task_state.go new file mode 100644 index 0000000..43160fb --- /dev/null +++ b/src/conductor/task_state.go @@ -0,0 +1,68 @@ +// task_state.go + +package main + +import ( + "os" + "json" +) + +type TaskState int + +const ( + TASK_INVALID = TaskState(iota) + // Task is fresh and has never been sent to the client. It can be rescheduled still. + TASK_QUEUED + // Task has been transmitted at least once + TASK_PENDINGRESULT + // Task has finished and we have received a result. + TASK_FINISHED +) + +func (ts TaskState) String() (strout string) { + switch ts { + case TASK_QUEUED: + strout = "QUEUED" + case TASK_PENDINGRESULT: + strout = "PENDING" + case TASK_FINISHED: + strout = "FINISHED" + default: + strout = "" + } + return strout +} + +func (ts TaskState) MarshalJSON() (out []byte, err os.Error) { + strout := ts.String() + if strout != "" { + return json.Marshal(strout) + } + return nil, InvalidValueError +} + +func (ts *TaskState) UnmarshalJSON(in []byte) (err os.Error) { + var statestr string + err = json.Unmarshal(in, &statestr) + if err != nil { + return err + } + switch statestr { + case "QUEUED": + *ts = TASK_QUEUED + case "PENDING": + *ts = TASK_PENDINGRESULT + case "FINISHED": + *ts = TASK_FINISHED + default: + return InvalidValueError + } + return nil +} + +func (ts TaskState) Finished() bool { + if ts == TASK_FINISHED { + return true + } + return false +} \ No newline at end of file diff --git a/src/getstatus/Makefile b/src/getstatus/Makefile new file mode 100644 index 0000000..778c9c5 --- /dev/null +++ b/src/getstatus/Makefile @@ -0,0 +1,8 @@ +include $(GOROOT)/src/Make.inc + +TARG=getstatus + +GOFILES=\ + getstatus.go\ + +include $(GOROOT)/src/Make.cmd diff --git a/src/getstatus/getstatus.go b/src/getstatus/getstatus.go new file mode 100644 index 0000000..34618ce --- /dev/null +++ b/src/getstatus/getstatus.go @@ -0,0 +1,137 @@ +// getstatus.go +// +// A sample Orchestra status polling client. + +package main + +import ( + "io" + "net" + "json" + "flag" + "fmt" + "os" + "strconv" +) + +type StatusRequest struct { + Op string `json:"op"` + Id uint64 `json:"id"` +} + +type PlayerStatus struct { + Status *string `json:"status"` + Response map[string]*string `json:"response"` +} + +type StatusResponse struct { + Status *string `json:"status"` + Players map[string]*PlayerStatus `json:"players"` +} + +var ( + AudienceSock = flag.String("audience-sock", "/var/spool/orchestra/conductor.sock", "Path for the audience submission socket") +) + +func NewStatusRequest() (sr *StatusRequest) { + sr = new(StatusRequest) + sr.Op = "status" + return sr +} + +func Usage() { + fmt.Fprintf(os.Stderr, "Usage:\n") + fmt.Fprintf(os.Stderr, " %s [] \n", os.Args[0]) + flag.PrintDefaults() +} + +func main() { + flag.Usage = Usage + flag.Parse() + + if flag.NArg() != 1 { + flag.Usage() + os.Exit(1) + } + + sr := NewStatusRequest() + var err os.Error + sr.Id, err = strconv.Atoui64(flag.Arg(0)) + if nil != err { + fmt.Fprintf(os.Stderr, "Failed to parse JobID: %s\n", err) + os.Exit(1) + } + + raddr, err := net.ResolveUnixAddr("unix", *AudienceSock) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to resolve sockaddr: %s\n", err) + os.Exit(1) + } + conn, err := net.DialUnix("unix", nil, raddr) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to connect to sockaddr: %s\n", err) + os.Exit(1) + } + + defer conn.Close() + + conn.SetTimeout(0) + + nc := net.Conn(conn) + + r, _ := nc.(io.Reader) + w, _ := nc.(io.Writer) + + dec := json.NewDecoder(r) + enc := json.NewEncoder(w) + + // send the message + err = enc.Encode(sr) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to marshal & send: %s\n", err) + os.Exit(1) + } + + response := new([2]interface{}) + sresp := new(StatusResponse) + response[1] = sresp + err = dec.Decode(response) + if err != nil { + // OK, the problem here is that an in the request will throw a null in the second field. + // This will cause Decode to softfault. We'll ignore these softfaults. + utye, ok := err.(*json.UnmarshalTypeError) + if ok { + fmt.Fprintf(os.Stderr, "Unmarshalling error: %s of Type %s\n", utye.Value, utye.Type) + } else { + ufe, ok := err.(*json.UnmarshalFieldError) + if ok { + fmt.Fprintf(os.Stderr, "Error decoding response: UFE %s of Type %s\n", ufe.Key, ufe.Type) + os.Exit(1) + } + ute, ok := err.(*json.UnsupportedTypeError) + if ok { + fmt.Fprintf(os.Stderr, "Error decoding response: UTE of Type %s\n", ute.Type) + os.Exit(1) + } + + fmt.Fprintf(os.Stderr, "Error decoding response: %s\n", err) + os.Exit(1) + } + } + + // coerce field 0 back into a string. + rerr,ok := response[0].(string) + if ok { + if rerr == "OK" { + // all OK, process the sresp. + fmt.Printf("Aggregate: %s\n", *sresp.Status) + os.Exit(0) + } else { + fmt.Fprintf(os.Stderr, "Server Error: %s\n", rerr) + os.Exit(1) + } + } else { + fmt.Fprintf(os.Stderr, "Couldn't unmarshal response correctly.\n"); + os.Exit(1) + } +} diff --git a/src/orchestra/Makefile b/src/orchestra/Makefile new file mode 100644 index 0000000..72c2a95 --- /dev/null +++ b/src/orchestra/Makefile @@ -0,0 +1,17 @@ +include $(GOROOT)/src/Make.inc + +#TARGDIR=../build/pkg/$(GOOS)_$(GOARCH) + +TARG=orchestra/orchestra +GOFILES=\ + orchestra.pb.go\ + wire.go\ + marshal.go\ + shared.go\ + registry.go\ + +include $(GOROOT)/src/Make.pkg + +ifdef HAVE_PROTOBUF +include $(GOROOT)/src/pkg/goprotobuf.googlecode.com/hg/Make.protobuf +endif diff --git a/src/orchestra/marshal.go b/src/orchestra/marshal.go new file mode 100644 index 0000000..0418a84 --- /dev/null +++ b/src/orchestra/marshal.go @@ -0,0 +1,173 @@ +/* marshal.go + * + * Common wire marshalling code. +*/ + +package orchestra; + +import ( + "os" + "goprotobuf.googlecode.com/hg/proto" +) + +var ( + ErrUnknownType = os.NewError("Unknown Type in Encode request") + ErrObjectTooLarge = os.NewError("Encoded Object exceeds maximum encoding size") +) + +/* ugh ugh ugh. As much as I love protocol buffers, not having maps + * as a native type is a PAIN IN THE ASS. + * + * Here's some common code to convert my K/V format in protocol + * buffers to and from native Go structures. +*/ +func MapFromProtoJobParameters(parray []*ProtoJobParameter) (mapparam map[string]string) { + mapparam = make(map[string]string) + + for p := range parray { + mapparam[*(parray[p].Key)] = *(parray[p].Value) + } + + return mapparam +} + +func ProtoJobParametersFromMap(mapparam map[string]string) (parray []*ProtoJobParameter) { + parray = make([]*ProtoJobParameter, len(mapparam)) + i := 0 + for k,v := range mapparam { + arg := new(ProtoJobParameter) + arg.Key = proto.String(k) + arg.Value = proto.String(v) + parray[i] = arg + i++ + } + + return parray +} + + + +func (p *WirePkt) Decode() (obj interface{}, err os.Error) { + switch (p.Type) { + case TypeNop: + if (p.Length != 0) { + /* throw error later... */ + return nil, ErrMalformedMessage; + } + return nil, nil + case TypeIdentifyClient: + ic := new(IdentifyClient) + err := proto.Unmarshal(p.Payload[0:p.Length], ic) + if err != nil { + return nil, err + } + return ic, nil + case TypeReadyForTask: + if (p.Length != 0) { + /* throw error later... */ + return nil, ErrMalformedMessage; + } + return nil, nil + case TypeTaskRequest: + tr := new(ProtoTaskRequest) + err := proto.Unmarshal(p.Payload[0:p.Length], tr) + if err != nil { + return nil, err + } + return tr, nil + case TypeTaskResponse: + tr := new(ProtoTaskResponse) + err := proto.Unmarshal(p.Payload[0:p.Length], tr) + if err != nil { + return nil, err + } + return tr, nil + case TypeAcknowledgement: + tr := new(ProtoAcknowledgement) + err := proto.Unmarshal(p.Payload[0:p.Length], tr) + if err != nil { + return nil, err + } + return tr, nil + } + return nil, ErrUnknownMessage +} + +func Encode(obj interface{}) (p *WirePkt, err os.Error) { + p = new(WirePkt) + switch obj.(type) { + case *IdentifyClient: + p.Type = TypeIdentifyClient + case *ProtoTaskRequest: + p.Type = TypeTaskRequest + case *ProtoTaskResponse: + p.Type = TypeTaskResponse + case *ProtoAcknowledgement: + p.Type = TypeAcknowledgement + default: + Warn("Encoding unknown type!") + return nil, ErrUnknownType + } + p.Payload, err = proto.Marshal(obj) + if err != nil { + return nil, err + } + if len(p.Payload) >= 0x10000 { + return nil, ErrObjectTooLarge + } + p.Length = uint16(len(p.Payload)) + + return p, nil +} + + +func MakeNop() (p *WirePkt) { + p = new(WirePkt) + p.Length = 0 + p.Type = TypeNop + p.Payload = nil + + return p +} + +func MakeIdentifyClient(hostname string) (p *WirePkt) { + s := new(IdentifyClient) + s.Hostname = proto.String(hostname) + + p, _ = Encode(s) + + return p +} + +func MakeReadyForTask() (p *WirePkt){ + p = new(WirePkt) + p.Type = TypeReadyForTask + p.Length = 0 + p.Payload = nil + + return p +} + +/* We use the failure code for negative acknowledgements */ +func MakeNack(id uint64) (p *WirePkt) { + a := new(ProtoAcknowledgement) + a.Id = proto.Uint64(id) + a.Response = new(ProtoAcknowledgement_AckType) + *(a.Response) = ProtoAcknowledgement_ACK_ERROR + + p, _ = Encode(a) + + return p +} + +// Construct a positive ACK for transmission +func MakeAck(id uint64) (p *WirePkt) { + a := new(ProtoAcknowledgement) + a.Id = proto.Uint64(id) + a.Response = new(ProtoAcknowledgement_AckType) + *(a.Response) = ProtoAcknowledgement_ACK_OK + + p, _ = Encode(a) + + return p +} diff --git a/src/orchestra/orchestra.pb.go b/src/orchestra/orchestra.pb.go new file mode 100644 index 0000000..504912d --- /dev/null +++ b/src/orchestra/orchestra.pb.go @@ -0,0 +1,127 @@ +// Code generated by protoc-gen-go from "orchestra.proto" +// DO NOT EDIT! + +package orchestra + +import proto "goprotobuf.googlecode.com/hg/proto" +import "math" +import "os" + +// Reference proto, math & os imports to suppress error if they are not otherwise used. +var _ = proto.GetString +var _ = math.Inf +var _ os.Error + + +type ProtoAcknowledgement_AckType int32 + +const ( + ProtoAcknowledgement_ACK_OK = 1 + ProtoAcknowledgement_ACK_ERROR = 3 +) + +var ProtoAcknowledgement_AckType_name = map[int32]string{ + 1: "ACK_OK", + 3: "ACK_ERROR", +} +var ProtoAcknowledgement_AckType_value = map[string]int32{ + "ACK_OK": 1, + "ACK_ERROR": 3, +} + +func NewProtoAcknowledgement_AckType(x int32) *ProtoAcknowledgement_AckType { + e := ProtoAcknowledgement_AckType(x) + return &e +} +func (x ProtoAcknowledgement_AckType) String() string { + return proto.EnumName(ProtoAcknowledgement_AckType_name, int32(x)) +} + +type ProtoTaskResponse_TaskStatus int32 + +const ( + ProtoTaskResponse_JOB_INPROGRESS = 2 + ProtoTaskResponse_JOB_SUCCESS = 3 + ProtoTaskResponse_JOB_FAILED = 4 + ProtoTaskResponse_JOB_HOST_FAILURE = 5 + ProtoTaskResponse_JOB_UNKNOWN = 6 + ProtoTaskResponse_JOB_UNKNOWN_FAILURE = 7 +) + +var ProtoTaskResponse_TaskStatus_name = map[int32]string{ + 2: "JOB_INPROGRESS", + 3: "JOB_SUCCESS", + 4: "JOB_FAILED", + 5: "JOB_HOST_FAILURE", + 6: "JOB_UNKNOWN", + 7: "JOB_UNKNOWN_FAILURE", +} +var ProtoTaskResponse_TaskStatus_value = map[string]int32{ + "JOB_INPROGRESS": 2, + "JOB_SUCCESS": 3, + "JOB_FAILED": 4, + "JOB_HOST_FAILURE": 5, + "JOB_UNKNOWN": 6, + "JOB_UNKNOWN_FAILURE": 7, +} + +func NewProtoTaskResponse_TaskStatus(x int32) *ProtoTaskResponse_TaskStatus { + e := ProtoTaskResponse_TaskStatus(x) + return &e +} +func (x ProtoTaskResponse_TaskStatus) String() string { + return proto.EnumName(ProtoTaskResponse_TaskStatus_name, int32(x)) +} + +type IdentifyClient struct { + Hostname *string `protobuf:"bytes,1,req,name=hostname"` + XXX_unrecognized []byte +} + +func (this *IdentifyClient) Reset() { *this = IdentifyClient{} } +func (this *IdentifyClient) String() string { return proto.CompactTextString(this) } + +type ProtoJobParameter struct { + Key *string `protobuf:"bytes,1,req,name=key"` + Value *string `protobuf:"bytes,2,req,name=value"` + XXX_unrecognized []byte +} + +func (this *ProtoJobParameter) Reset() { *this = ProtoJobParameter{} } +func (this *ProtoJobParameter) String() string { return proto.CompactTextString(this) } + +type ProtoTaskRequest struct { + Jobname *string `protobuf:"bytes,1,req,name=jobname"` + Id *uint64 `protobuf:"varint,2,req,name=id"` + Parameters []*ProtoJobParameter `protobuf:"bytes,3,rep,name=parameters"` + XXX_unrecognized []byte +} + +func (this *ProtoTaskRequest) Reset() { *this = ProtoTaskRequest{} } +func (this *ProtoTaskRequest) String() string { return proto.CompactTextString(this) } + +type ProtoAcknowledgement struct { + Id *uint64 `protobuf:"varint,1,req,name=id"` + Response *ProtoAcknowledgement_AckType `protobuf:"varint,2,req,name=response,enum=orchestra.ProtoAcknowledgement_AckType,def=1"` + XXX_unrecognized []byte +} + +func (this *ProtoAcknowledgement) Reset() { *this = ProtoAcknowledgement{} } +func (this *ProtoAcknowledgement) String() string { return proto.CompactTextString(this) } + +const Default_ProtoAcknowledgement_Response ProtoAcknowledgement_AckType = ProtoAcknowledgement_ACK_OK + +type ProtoTaskResponse struct { + Id *uint64 `protobuf:"varint,1,req,name=id"` + Status *ProtoTaskResponse_TaskStatus `protobuf:"varint,3,req,name=status,enum=orchestra.ProtoTaskResponse_TaskStatus"` + Response []*ProtoJobParameter `protobuf:"bytes,4,rep,name=response"` + XXX_unrecognized []byte +} + +func (this *ProtoTaskResponse) Reset() { *this = ProtoTaskResponse{} } +func (this *ProtoTaskResponse) String() string { return proto.CompactTextString(this) } + +func init() { + proto.RegisterEnum("orchestra.ProtoAcknowledgement_AckType", ProtoAcknowledgement_AckType_name, ProtoAcknowledgement_AckType_value) + proto.RegisterEnum("orchestra.ProtoTaskResponse_TaskStatus", ProtoTaskResponse_TaskStatus_name, ProtoTaskResponse_TaskStatus_value) +} diff --git a/src/orchestra/orchestra.proto b/src/orchestra/orchestra.proto new file mode 100644 index 0000000..f129531 --- /dev/null +++ b/src/orchestra/orchestra.proto @@ -0,0 +1,43 @@ +package orchestra; + +/* P->C : Provide Client Identity and negotiate other initial parameters */ +message IdentifyClient { + required string hostname = 1; +} + +message ProtoJobParameter { + required string key = 1; + required string value = 2; +} + +/* C->P : Do Shit kthxbye */ +message ProtoTaskRequest { + required string jobname = 1; + required uint64 id = 2; + repeated ProtoJobParameter parameters = 3; +} + +/* C->P, P->C : Acknowledge Message */ +message ProtoAcknowledgement { + required uint64 id = 1; + enum AckType { + ACK_OK = 1; + ACK_ERROR = 3; /* Other Error */ + } + required AckType response = 2 [default=ACK_OK]; +} + +/* P->C : Results from Task */ +message ProtoTaskResponse { + required uint64 id = 1; + enum TaskStatus { + JOB_INPROGRESS = 2; // Client has the job. + JOB_SUCCESS = 3; // everything was OK, we don't care. + JOB_FAILED = 4; // the job ran ok, but told us it blew up. + JOB_HOST_FAILURE = 5; // something internally blew up. + JOB_UNKNOWN = 6; // What Job? + JOB_UNKNOWN_FAILURE = 7;// somethign went wrong, but we don't know what. + } + required TaskStatus status = 3; + repeated ProtoJobParameter response = 4; +} diff --git a/src/orchestra/shared.go b/src/orchestra/shared.go new file mode 100644 index 0000000..ab78461 --- /dev/null +++ b/src/orchestra/shared.go @@ -0,0 +1,82 @@ +/* various important shared defaults. */ +package orchestra + +import ( + "os" + "net" + "syslog" + "fmt" + "runtime/debug" +) + +const ( + DefaultMasterPort = 2258 + DefaultHTTPPort = 2259 +) + +var logWriter *syslog.Writer = nil + +func SetLogName(name string) { + if nil != logWriter { + logWriter.Close() + logWriter = nil + } + var err os.Error + logWriter, err = syslog.New(syslog.LOG_DEBUG, name) + MightFail(err, "Couldn't reopen syslog") +} + + +func Debug(format string, args ...interface{}) { + if nil != logWriter { + logWriter.Debug(fmt.Sprintf(format, args...)) + } +} + +func Info(format string, args ...interface{}) { + if nil != logWriter { + logWriter.Info(fmt.Sprintf(format, args...)) + } +} + +func Warn(format string, args ...interface{}) { + if nil != logWriter { + logWriter.Warning(fmt.Sprintf(format, args...)) + } +} + +func Fail(mesg string, args ...interface {}) { + if nil != logWriter { + logWriter.Err(fmt.Sprintf(mesg, args...)) + } + fmt.Fprintf(os.Stderr, "ERR: "+mesg+"\n", args...); + os.Exit(1) +} + +func MightFail(err os.Error, mesg string, args ...interface {}) { + if (nil != err) { + imesg := fmt.Sprintf(mesg, args...) + Fail("%s: %s", imesg, err.String()) + } +} + +// Throws a generic assertion error, stacktraces, dies. +// only really to be used where the runtime-time configuration +// fucks up internally, not for user induced errors. +func Assert(mesg string, args ...interface{}) { + fmt.Fprintf(os.Stderr, mesg, args...) + debug.PrintStack() + os.Exit(1) +} + +func ProbeHostname() (fqdn string) { + var shortHostname string + + shortHostname, err := os.Hostname() + addr, err := net.LookupHost(shortHostname) + MightFail(err, "Failed to get address for hostname") + hostnames, err := net.LookupAddr(addr[0]) + MightFail(err, "Failed to get full hostname for address") + + return hostnames[0] +} diff --git a/src/orchestra/wire.go b/src/orchestra/wire.go new file mode 100644 index 0000000..fe28f61 --- /dev/null +++ b/src/orchestra/wire.go @@ -0,0 +1,107 @@ +/* wire.go + * + * Wire Level Encapsulation +*/ + +package orchestra; + +import ( + "os" + "net" + "fmt" +) + +type WirePkt struct { + Type byte + Length uint16 + Payload []byte +} + +const ( + TypeNop = 0 + TypeIdentifyClient = 1 + TypeReadyForTask = 2 + TypeTaskRequest = 3 + TypeTaskResponse = 4 + TypeAcknowledgement = 5 +) + +var ( + ErrMalformedMessage = os.NewError("Malformed Message") + ErrUnknownMessage = os.NewError("Unknown Message") +) + +func (p *WirePkt) ValidUnidentified() bool { + if p.Type == TypeNop { + return true + } + if p.Type == TypeIdentifyClient { + return true + } + + return false +} + +func (p *WirePkt) Send(c net.Conn) (n int, err os.Error) { + n = 0 + preamble := make([]byte, 3) + preamble[0] = p.Type + preamble[1] = byte((p.Length >> 8) & 0xFF) + preamble[2] = byte(p.Length & 0xFF) + ninc, err := c.Write(preamble) + n += ninc + if (err != nil) { + return n, err + } + ninc, err = c.Write(p.Payload[0:p.Length]) + n += ninc + if (err != nil) { + return n, err + } + return n, nil +} + +func (p *WirePkt) Dump() { + fmt.Printf("Packet Dump: Type %d, Len %d\n", p.Type, p.Length) + for i := 0; i < int(p.Length); i++ { + if i%16 == 0 { + fmt.Printf("%04x: ", i) + } + fmt.Printf("%02x ", p.Payload[i]) + if i%16 == 15 { + fmt.Println() + } + } + fmt.Println() +} + +func Receive(c net.Conn) (msg *WirePkt, err os.Error) { + msg = new(WirePkt) + preamble := make([]byte, 3) + + n, err := c.Read(preamble) + if err != nil { + return nil, err + } + if n < 3 { + /* short read! wtf! err? */ + return nil, ErrMalformedMessage + } + msg.Type = preamble[0] + msg.Length = (uint16(preamble[1]) << 8) | uint16(preamble[2]) + if msg.Length > 0 { + msg.Payload = make([]byte, msg.Length) + n, err = c.Read(msg.Payload) + if err != nil { + return nil, err + } + if n < int(msg.Length) { + /* short read! wtf! err? */ + return nil, ErrMalformedMessage + } + } + + /* Decode! */ + return msg, nil +} + diff --git a/src/player/Makefile b/src/player/Makefile new file mode 100644 index 0000000..a5cef90 --- /dev/null +++ b/src/player/Makefile @@ -0,0 +1,15 @@ +include $(GOROOT)/src/Make.inc + +TARG=player + +GOFILES=\ + player.go\ + scores.go\ + execution.go\ + interface.go\ + signal.go\ + config.go\ + if_env.go\ + if_pipe.go\ + +include $(GOROOT)/src/Make.cmd diff --git a/src/player/config.go b/src/player/config.go new file mode 100644 index 0000000..72f13ae --- /dev/null +++ b/src/player/config.go @@ -0,0 +1,85 @@ +// config.go +// +// configuration file handling for orchestra. + +package main + +import ( + o "orchestra" + "strings" + "github.com/kuroneko/configureit" + "crypto/tls" + "crypto/x509" + "os" +) + +var configFile = configureit.New() + +func init() { + configFile.Add("x509 certificate", configureit.NewStringOption("/etc/orchestra/player_crt.pem")) + configFile.Add("x509 private key", configureit.NewStringOption("/etc/orchestra/player_key.pem")) + configFile.Add("ca certificates", configureit.NewPathListOption(nil)) + configFile.Add("master", configureit.NewStringOption("conductor")) + configFile.Add("score directory", configureit.NewStringOption("/usr/lib/orchestra/scores")) + configFile.Add("player name", configureit.NewStringOption("")) +} + +func GetStringOpt(key string) string { + cnode := configFile.Get(key) + if cnode == nil { + o.Assert("tried to get a configuration option that doesn't exist.") + } + sopt, ok := cnode.(*configureit.StringOption) + if !ok { + o.Assert("tried to get a non-string configuration option with GetStringOpt") + } + return strings.TrimSpace(sopt.Value) +} + +func GetCACertList() []string { + cnode := configFile.Get("ca certificates") + if cnode == nil { + o.Assert("tried to get a configuration option that doesn't exist.") + } + plopt, _ := cnode.(*configureit.PathListOption) + return plopt.Values +} + +func ConfigLoad() { + // attempt to open the configuration file. + fh, err := os.Open(*ConfigFile) + if nil == err { + defer fh.Close() + // reset the config File data, then reload it. + configFile.Reset() + ierr := configFile.Read(fh, 1) + o.MightFail(ierr, "Couldn't parse configuration") + } else { + o.Warn("Couldn't open configuration file: %s. Proceeding anyway.", err) + } + + // load the x509 certificates + x509CertFilename := GetStringOpt("x509 certificate") + x509PrivateKeyFilename := GetStringOpt("x509 private key") + CertPair, err = tls.LoadX509KeyPair(x509CertFilename, x509PrivateKeyFilename) + o.MightFail(err, "Couldn't load certificates") + + // load the CA Certs + CACertPool = x509.NewCertPool() + caCertNames := GetCACertList() + if caCertNames != nil { + for _, filename := range caCertNames { + fh, err := os.Open(filename) + if err != nil { + o.Warn("Whilst parsing CA certs, couldn't open %s: %s", filename, err) + continue + } + defer fh.Close() + fi, err := fh.Stat() + o.MightFail(err, "Couldn't stat CA certificate file: %s", filename) + data := make([]byte, fi.Size) + fh.Read(data) + CACertPool.AppendCertsFromPEM(data) + } + } +} \ No newline at end of file diff --git a/src/player/execution.go b/src/player/execution.go new file mode 100644 index 0000000..a1c5713 --- /dev/null +++ b/src/player/execution.go @@ -0,0 +1,159 @@ +// execution.go + +package main + +import ( + "os" + "bufio" + "strings" + o "orchestra" +) + +func ExecuteTask(task *TaskRequest) <-chan *TaskResponse { + complete := make(chan *TaskResponse, 1) + go doExecution(task, complete) + + return complete +} + +func batchLogger(jobid uint64, errpipe *os.File) { + defer errpipe.Close() + + r := bufio.NewReader(errpipe) + for { + lb, _, err := r.ReadLine() + if err == os.EOF { + return + } + if err != nil { + o.Warn("executionLogger failed: %s", err) + return + } + o.Info("JOB %d:STDERR:%s", jobid, string(lb)) + } +} + +func peSetEnv(env []string, key string, value string) []string { + mkey := key+"=" + found := false + for i, v := range env { + if strings.HasPrefix(v, mkey) { + env[i] = key+"="+value + found = true + break + } + } + if !found { + env = append(env, key+"="+value) + } + return env +} + +func doExecution(task *TaskRequest, completionChannel chan<- *TaskResponse) { + // we must notify the parent when we exit. + defer func(c chan<- *TaskResponse, task *TaskRequest) { c <- task.MyResponse }(completionChannel,task) + + // first of all, verify that the score exists at all. + score, exists := Scores[task.Score] + if !exists { + o.Warn("job%d: Request for unknown score \"%s\"", task.Id, task.Score) + task.MyResponse.State = RESP_FAILED_UNKNOWN_SCORE + return + } + si := NewScoreInterface(task) + if si == nil { + o.Warn("job%d: Couldn't initialise Score Interface", task.Id) + task.MyResponse.State = RESP_FAILED_HOST_ERROR + return + } + if !si.Prepare() { + o.Warn("job%d: Couldn't Prepare Score Interface", task.Id) + task.MyResponse.State = RESP_FAILED_HOST_ERROR + return + } + defer si.Cleanup() + + eenv := si.SetupProcess() + task.MyResponse.State = RESP_RUNNING + + procenv := new(os.ProcAttr) + // Build the default environment. + procenv.Env = peSetEnv(procenv.Env, "PATH", "/usr/bin:/usr/sbin:/bin:/sbin") + procenv.Env = peSetEnv(procenv.Env, "IFS", " \t\n") + pwd, err := os.Getwd() + if err != nil { + task.MyResponse.State = RESP_FAILED_HOST_ERROR + o.Warn("job%d: Couldn't resolve PWD: %s", task.Id, err) + return + } + procenv.Env = peSetEnv(procenv.Env, "PWD", pwd) + // copy in the environment overrides + for k, v := range eenv.Environment { + procenv.Env = peSetEnv(procenv.Env, k, v) + } + + // attach FDs to procenv. + procenv.Files = make([]*os.File, 3) + + // first off, attach /dev/null to stdin and stdout + devNull, err := os.OpenFile(os.DevNull, os.O_RDWR | os.O_APPEND, 0666) + o.MightFail(err, "couldn't open DevNull") + defer devNull.Close() + for i := 0; i < 2; i++ { + procenv.Files[i] = devNull + } + // attach STDERR to to our logger via pipe. + lr, lw, err := os.Pipe() + o.MightFail(err, "Couldn't create pipe") + defer lw.Close() + // lr will be closed by the logger. + procenv.Files[2] = lw + // check the environment's configuration and allow it to override stdin, stdout, and FDs 3+ + if nil != eenv.Files { + for i := range eenv.Files { + if i < 2 { + procenv.Files[i] = eenv.Files[i] + } else { + procenv.Files = append(procenv.Files, eenv.Files[i]) + } + } + } + var args []string = nil + args = append(args, eenv.Arguments...) + + o.Info("job%d: Executing %s", task.Id, score.Executable) + go batchLogger(task.Id, lr) + proc, err := os.StartProcess(score.Executable, args, procenv) + if err != nil { + o.Warn("job%d: Failed to start processs", task.Id) + task.MyResponse.State = RESP_FAILED_HOST_ERROR + return + } + wm, err := proc.Wait(0) + if err != nil { + o.Warn("job%d: Error waiting for process", task.Id) + task.MyResponse.State = RESP_FAILED_UNKNOWN + // Worse of all, we don't even know if we succeeded. + return + } + if !(wm.WaitStatus.Signaled() || wm.WaitStatus.Exited()) { + o.Assert("Non Terminal notification received when not expected.") + return + } + if wm.WaitStatus.Signaled() { + o.Warn("job%d: Process got signalled", task.Id) + task.MyResponse.State = RESP_FAILED_UNKNOWN + return + } + if wm.WaitStatus.Exited() { + if 0 == wm.WaitStatus.ExitStatus() { + o.Warn("job%d: Process exited OK", task.Id) + task.MyResponse.State = RESP_FINISHED + } else { + o.Warn("job%d: Process exited with failure", task.Id) + task.MyResponse.State = RESP_FAILED + } + return + } + o.Assert("Should never get here.") +} \ No newline at end of file diff --git a/src/player/if_env.go b/src/player/if_env.go new file mode 100644 index 0000000..260a6e5 --- /dev/null +++ b/src/player/if_env.go @@ -0,0 +1,42 @@ +// if_env +// +// 'env' score interface + +package main + +const ( + envEnvironmentPrefix = "ORC_" +) + +func init() { + RegisterInterface("env", newEnvInterface) +} + +type EnvInterface struct { + task *TaskRequest +} + +func newEnvInterface(task *TaskRequest) (iface ScoreInterface) { + ei := new(EnvInterface) + ei.task = task + + return ei +} + +func (ei *EnvInterface) Prepare() bool { + // does nothing! + return true +} + +func (ei *EnvInterface) SetupProcess() (ee *ExecutionEnvironment) { + ee = NewExecutionEnvironment() + for k,v := range ei.task.Params { + ee.Environment[envEnvironmentPrefix+k] = v + } + + return ee +} + +func (ei *EnvInterface) Cleanup() { + // does nothing! +} \ No newline at end of file diff --git a/src/player/if_pipe.go b/src/player/if_pipe.go new file mode 100644 index 0000000..eab72f4 --- /dev/null +++ b/src/player/if_pipe.go @@ -0,0 +1,92 @@ +// if_pipe +// +// 'pipe' score interface +// +// The PIPE score interface works much like the ENV interace, but also attaches +// a pipe to STDOUT which captures = repsonse values. + +package main + +import ( + "strings" + "bufio" + "os" + o "orchestra" +) + +const ( + pipeEnvironmentPrefix = "ORC_" +) + +func init() { + RegisterInterface("pipe", newPipeInterface) +} + +type PipeInterface struct { + task *TaskRequest + pipew *os.File +} + +func newPipeInterface(task *TaskRequest) (iface ScoreInterface) { + ei := new(PipeInterface) + ei.task = task + + return ei +} + + +// pipeListener is the goroutine that sits on the stdout pipe and +// processes what it sees. +func pipeListener(task *TaskRequest, outpipe *os.File) { + defer outpipe.Close() + + r := bufio.NewReader(outpipe) + for { + lb, _, err := r.ReadLine() + if err == os.EOF { + return + } + if err != nil { + o.Warn("pipeListener failed: %s", err) + return + } + linein := string(lb) + if strings.Index(linein, "=") >= 0 { + bits := strings.SplitN(linein, "=", 2) + task.MyResponse.Response[bits[0]] = bits[1] + } + } +} + + +func (ei *PipeInterface) Prepare() bool { + lr, lw, err := os.Pipe() + if (err != nil) { + return false + } + // save the writing end of the pipe so we can close our local end of it during cleanup. + ei.pipew = lw + + // start the pipe listener + go pipeListener(ei.task, lr) + + return true +} + +func (ei *PipeInterface) SetupProcess() (ee *ExecutionEnvironment) { + ee = NewExecutionEnvironment() + for k,v := range ei.task.Params { + ee.Environment[pipeEnvironmentPrefix+k] = v + } + ee.Files = make([]*os.File, 2) + ee.Files[1] = ei.pipew + return ee +} + +func (ei *PipeInterface) Cleanup() { + // close the local copy of the pipe. + // + // if the child didn't start, this will also EOF the + // pipeListener which will clean up that goroutine. + ei.pipew.Close() +} \ No newline at end of file diff --git a/src/player/interface.go b/src/player/interface.go new file mode 100644 index 0000000..580bb4e --- /dev/null +++ b/src/player/interface.go @@ -0,0 +1,77 @@ +// interface.go +// +// Score Interface +// +// This provides the interface that the score interfaces need to conform to. + +package main + +import ( + o "orchestra" + "os" +) + +var ( + interfaces = make(map[string]func(*TaskRequest)(ScoreInterface)) +) + +type ExecutionEnvironment struct { + Environment map[string]string + Arguments []string + Files []*os.File +} + +func NewExecutionEnvironment() (ee *ExecutionEnvironment) { + ee = new(ExecutionEnvironment) + ee.Environment = make(map[string]string) + + return ee +} + +type ScoreInterface interface { + // prepare gets called up front before the fork. It should do + // any/all lifting required. + // + // returns false if there are any problems. + Prepare() bool + + // SetupEnvironment gets called prior to starting the child + // process. It should return an ExecutionEnvironment with the + // bits filled in the way it wants. + SetupProcess() *ExecutionEnvironment + + // Cleanup is responsible for mopping up the mess, filing any + // results that need to be stored, etc. This will be called + // only from the main thread to ensure that results can be updated + // safely. + Cleanup() +} + +func HasInterface(ifname string) bool { + _, exists := interfaces[ifname] + + return exists +} + +func RegisterInterface(ifname string, initfunc func(*TaskRequest)(ScoreInterface)) { + _, exists := interfaces[ifname] + if exists { + o.Assert("Multiple attempts to register %s interface", ifname) + } + interfaces[ifname] = initfunc +} + +func NewScoreInterface(task *TaskRequest) (iface ScoreInterface) { + score, exists := Scores[task.Score] + if !exists { + return nil + } + if !HasInterface(score.Interface) { + return nil + } + ifinit, _ := interfaces[score.Interface] + + iface = ifinit(task) + + return iface +} \ No newline at end of file diff --git a/src/player/player.go b/src/player/player.go new file mode 100644 index 0000000..0bb9b54 --- /dev/null +++ b/src/player/player.go @@ -0,0 +1,373 @@ +/* player.go +*/ + +package main + +import ( + "os" + "fmt" + "flag" + o "orchestra" + "crypto/tls" + "crypto/x509" + "net" + "time" + "container/list" +) + +const ( + InitialReconnectDelay = 5e9 + MaximumReconnectDelay = 300e9 + ReconnectDelayScale = 2 + KeepaliveDelay = 200e9 + RetryDelay = 5e9 +) + +type NewConnectionInfo struct { + conn net.Conn + timeout int64 +} + +var ( + ConfigFile = flag.String("config-file", "/etc/orchestra/player.conf", "Path to the configuration file") + DontVerifyPeer = flag.Bool("dont-verify-peer", false, "Ignore TLS verification for the peer") + CertPair tls.Certificate + CACertPool *x509.CertPool + LocalHostname string = "" + + receivedMessage = make(chan *o.WirePkt) + lostConnection = make(chan int) + reloadScores = make(chan int, 2) + pendingQueue = list.New() + unacknowledgedQueue = list.New() + newConnection = make(chan *NewConnectionInfo) + pendingTaskRequest = false + InvalidValueError = os.NewError("Invalid value") +) + +func getNextPendingTask() (task *TaskRequest) { + e := pendingQueue.Front() + if e != nil { + task, _ = e.Value.(*TaskRequest) + pendingQueue.Remove(e) + } + return task +} + +func appendPendingTask(task *TaskRequest) { + pendingTaskRequest = false + pendingQueue.PushBack(task) +} + +func getNextUnacknowledgedResponse() (resp *TaskResponse) { + e := unacknowledgedQueue.Front() + if e != nil { + resp, _ = e.Value.(*TaskResponse) + unacknowledgedQueue.Remove(e) + } + return resp +} + +func appendUnacknowledgedResponse(resp *TaskResponse) { + resp.RetryTime = time.Nanoseconds() + RetryDelay + unacknowledgedQueue.PushBack(resp) +} + +func acknowledgeResponse(jobid uint64) { + for e := unacknowledgedQueue.Front(); e != nil; e = e.Next() { + resp := e.Value.(*TaskResponse) + if resp.id == jobid { + unacknowledgedQueue.Remove(e) + } + } +} + +func sendResponse(c net.Conn, resp *TaskResponse) { + //FIXME: update retry time on Response + o.Debug("Sending Response!") + ptr := resp.Encode() + p, err := o.Encode(ptr) + o.MightFail(err, "Failed to encode response") + _, err = p.Send(c) + if err != nil { + o.Warn("Transmission error: %s", err) + c.Close() + prequeueResponse(resp) + lostConnection <- 1 + } else { + appendUnacknowledgedResponse(resp) + } +} + +func prequeueResponse(resp *TaskResponse) { + unacknowledgedQueue.PushFront(resp) +} + +func Reader(conn net.Conn) { + defer func(l chan int) { + l <- 1 + }(lostConnection) + + for { + pkt, err := o.Receive(conn) + if (err != nil) { + o.Warn("Error receiving message: %s", err) + break; + } + receivedMessage <- pkt + } +} + +func handleNop(c net.Conn, message interface{}) { + o.Debug("NOP Received") +} + +func handleIllegal(c net.Conn, message interface{}) { + o.Fail("Got Illegal Message") +} + +func handleRequest(c net.Conn, message interface{}) { + o.Debug("Request Recieved. Decoding!") + ptr, ok := message.(*o.ProtoTaskRequest) + if !ok { + o.Assert("CC stuffed up - handleRequest got something that wasn't a ProtoTaskRequest.") + } + task := TaskFromProto(ptr) + /* search the registry for the task */ + o.Debug("Request for Job.ID %d", task.Id) + existing := TaskGet(task.Id) + if nil != existing { + if (existing.MyResponse.IsFinished()) { + o.Debug("job%d: Resending Response", task.Id) + sendResponse(c, existing.MyResponse) + } + } else { + // check to see if we have the score + // add the Job to our Registry + task.MyResponse = NewTaskResponse() + task.MyResponse.id = task.Id + task.MyResponse.State = RESP_PENDING + TaskAdd(task) + o.Info("Added New Task (Job ID %d) to our local registry", task.Id) + // and then push it onto the pending job list so we know it needs actioning. + appendPendingTask(task) + } +} + +func handleAck(c net.Conn, message interface{}) { + o.Debug("Ack Received") + ack, ok := message.(*o.ProtoAcknowledgement) + if !ok { + o.Assert("CC stuffed up - handleAck got something that wasn't a ProtoAcknowledgement.") + } + if ack.Id != nil { + acknowledgeResponse(*ack.Id) + } +} + + +var dispatcher = map[uint8] func(net.Conn, interface{}) { + o.TypeNop: handleNop, + o.TypeTaskRequest: handleRequest, + o.TypeAcknowledgement: handleAck, + + /* P->C only messages, should never appear on the wire to us. */ + o.TypeIdentifyClient: handleIllegal, + o.TypeReadyForTask: handleIllegal, + o.TypeTaskResponse: handleIllegal, +} + +func connectMe(initialDelay int64) { + var backOff int64 = initialDelay + for { + // Sleep first. + if backOff > 0 { + o.Info("Sleeping for %d seconds", backOff/1e9) + err := time.Sleep(backOff) + o.MightFail(err, "Couldn't Sleep") + backOff *= ReconnectDelayScale + if backOff > MaximumReconnectDelay { + backOff = MaximumReconnectDelay + } + } else { + backOff = InitialReconnectDelay + } + + tconf := &tls.Config{ + RootCAs: CACertPool, + } + tconf.Certificates = append(tconf.Certificates, CertPair) + + // update our local hostname. + LocalHostname = GetStringOpt("player name") + if (LocalHostname == "") { + LocalHostname = o.ProbeHostname() + o.Warn("No hostname provided - probed hostname: %s", LocalHostname) + } + + masterHostname := GetStringOpt("master") + + raddr := fmt.Sprintf("%s:%d", masterHostname, 2258) + o.Info("Connecting to %s", raddr) + conn, err := tls.Dial("tcp", raddr, tconf) + if err == nil && !*DontVerifyPeer { + conn.Handshake() + err = conn.VerifyHostname(masterHostname) + } + if err == nil { + nc := new(NewConnectionInfo) + nc.conn = conn + nc.timeout = backOff + newConnection <- nc + return + } + o.Warn("Couldn't connect to master: %s", err) + } +} + +func ProcessingLoop() { + var conn net.Conn = nil + var nextRetryResp *TaskResponse = nil + var taskCompletionChan <-chan *TaskResponse = nil + var connectDelay int64 = 0 + var doScoreReload bool = false + // kick off a new connection attempt. + go connectMe(connectDelay) + + // and this is where we spin! + for { + var retryDelay int64 = 0 + var retryChan <-chan int64 = nil + + if conn != nil { + for nextRetryResp == nil { + nextRetryResp = getNextUnacknowledgedResponse() + if nil == nextRetryResp { + break + } + retryDelay = nextRetryResp.RetryTime - time.Nanoseconds() + if retryDelay < 0 { + sendResponse(conn, nextRetryResp) + nextRetryResp = nil + } + } + if nextRetryResp != nil { + retryChan = time.After(retryDelay) + } + } + if taskCompletionChan == nil { + nextTask := getNextPendingTask() + if nextTask != nil { + taskCompletionChan = ExecuteTask(nextTask) + } else { + if conn != nil && !pendingTaskRequest { + o.Debug("Asking for trouble") + p := o.MakeReadyForTask() + p.Send(conn) + o.Debug("Sent Request for trouble") + pendingTaskRequest = true + } + } + } + select { + // Currently executing job finishes. + case newresp := <- taskCompletionChan: + o.Debug("job%d: Completed with State %s\n", newresp.id, newresp.State) + // preemptively set a retrytime. + newresp.RetryTime = time.Nanoseconds() + // ENOCONN - sub it in as our next retryresponse, and prepend the old one onto the queue. + if nil == conn { + if nil != nextRetryResp { + prequeueResponse(nextRetryResp) + } + o.Debug("job%d: Queuing Initial Response", newresp.id) + nextRetryResp = newresp + } else { + o.Debug("job%d: Sending Initial Response", newresp.id) + sendResponse(conn, newresp) + } + if doScoreReload { + o.Info("Performing Deferred score reload") + LoadScores() + doScoreReload = false + } + taskCompletionChan = nil + // If the current unacknowledged response needs a retry, send it. + case <-retryChan: + sendResponse(conn, nextRetryResp) + nextRetryResp = nil + // New connection. Set up the receiver thread and Introduce ourselves. + case nci := <-newConnection: + if conn != nil { + conn.Close() + } + conn = nci.conn + connectDelay = nci.timeout + pendingTaskRequest = false + + // start the reader + go Reader(conn) + + /* Introduce ourself */ + p := o.MakeIdentifyClient(LocalHostname) + p.Send(conn) + // Lost connection. Shut downt he connection. + case <-lostConnection: + o.Warn("Lost Connection to Master") + conn.Close() + conn = nil + // restart the connection attempts + go connectMe(connectDelay) + // Message received from master. Decode and action. + case p := <-receivedMessage: + // because the message could possibly be an ACK, push the next retry response back into the queue so acknowledge can find it. + if nil != nextRetryResp { + prequeueResponse(nextRetryResp) + nextRetryResp = nil + } + var upkt interface{} = nil + if p.Length > 0 { + var err os.Error + upkt, err = p.Decode() + o.MightFail(err, "Couldn't decode packet from master") + } + handler, exists := dispatcher[p.Type] + if (exists) { + connectDelay = 0 + handler(conn, upkt) + } else { + o.Fail("Unhandled Pkt Type %d", p.Type) + } + // Reload scores + case <-reloadScores: + // fortunately this is actually completely safe as + // long as nobody's currently executing. + // who'd have thunk it? + if taskCompletionChan == nil { + o.Info("Reloading scores") + LoadScores() + } else { + o.Info("Deferring score reload (execution in progress)") + doScoreReload = true + } + // Keepalive delay expired. Send Nop. + case <-time.After(KeepaliveDelay): + if conn == nil { + break + } + o.Debug("Sending Nop") + p := o.MakeNop() + p.Send(conn) + } + } +} + +func main() { + o.SetLogName("player") + + flag.Parse() + + ConfigLoad() + LoadScores() + ProcessingLoop() +} diff --git a/src/player/registry.go b/src/player/registry.go new file mode 100644 index 0000000..2466bb7 --- /dev/null +++ b/src/player/registry.go @@ -0,0 +1,100 @@ +// registry.go +// +// Job Registry. +// +// The Registry provides a 'threadsafe' interface to various global +// information stores. +// +// The registry dispatch thread is forbidden from performing any work +// that is likely to block. Result channels must be buffered with +// enough space for the full set of results. + +package main + +const ( + requestAddTask = iota + requestGetTask + + requestQueueSize = 10 +) + +type registryRequest struct { + operation int + id uint64 + task *TaskRequest + responseChannel chan *registryResponse +} + +type registryResponse struct { + success bool + task *TaskRequest +} + +var chanRequest = make(chan *registryRequest, requestQueueSize) + +// bake a minimal request structure together. +func newRequest(wants_response bool) (r *registryRequest) { + r = new(registryRequest) + if wants_response { + r.responseChannel = make(chan *registryResponse, 1) + } + + return r +} + +// Add a Task to the registry. Return true if successful, returns +// false if the task is lacking critical information (such as a Job Id) +// and can't be registered. +func TaskAdd(task *TaskRequest) bool { + rr := newRequest(true) + rr.operation = requestAddTask + rr.task = task + + chanRequest <- rr + resp := <- rr.responseChannel + return resp.success +} + +// Get a Task from the registry. Returns the task if successful, +// returns nil if the task couldn't be found. +func TaskGet(id uint64) *TaskRequest { + rr := newRequest(true) + rr.operation = requestGetTask + rr.id = id + + chanRequest <- rr + resp := <- rr.responseChannel + return resp.task +} + +func manageRegistry() { + taskRegister := make(map[uint64]*TaskRequest) + + for { + req := <- chanRequest + resp := new (registryResponse) + switch (req.operation) { + case requestAddTask: + if nil != req.task { + // and register the job + taskRegister[req.task.Id] = req.task + resp.success = true + } else { + resp.success = false + } + case requestGetTask: + task, exists := taskRegister[req.id] + resp.success = exists + if exists { + resp.task = task + } + } + if req.responseChannel != nil { + req.responseChannel <- resp + } + } +} + +func init() { + go manageRegistry() +} diff --git a/src/player/resp_state.go b/src/player/resp_state.go new file mode 100644 index 0000000..32e11ac --- /dev/null +++ b/src/player/resp_state.go @@ -0,0 +1,112 @@ +// resp_state.go + +package main + +import ( + "os" + "json" +) + +type ResponseState int + +const ( + // Response states + RESP_PENDING = ResponseState(iota) // internal state, not wire. + RESP_RUNNING + RESP_FINISHED + RESP_FAILED + RESP_FAILED_UNKNOWN_SCORE + RESP_FAILED_HOST_ERROR + RESP_FAILED_UNKNOWN +) + +func (rs ResponseState) String() (strout string) { + switch rs { + case RESP_RUNNING: + return "PENDING" + case RESP_FINISHED: + return "OK" + case RESP_FAILED: + return "FAIL" + case RESP_FAILED_UNKNOWN_SCORE: + return "UNK_SCORE" + case RESP_FAILED_HOST_ERROR: + return "HOST_ERROR" + case RESP_FAILED_UNKNOWN: + return "UNKNOWN_FAILURE" + } + return "" +} + +func (rs ResponseState) MarshalJSON() (out []byte, err os.Error) { + strout := rs.String() + if strout != "" { + return json.Marshal(strout) + } + return nil, InvalidValueError +} + +func (rs ResponseState) UnmarshalJSON(in []byte) (err os.Error) { + var statestr string + err = json.Unmarshal(in, &statestr) + if err != nil { + return err + } + switch statestr { + case "PENDING": + rs = RESP_PENDING + case "OK": + rs = RESP_FINISHED + case "FAIL": + rs = RESP_FAILED + case "UNK_SCORE": + rs = RESP_FAILED_UNKNOWN_SCORE + case "HOST_ERROR": + rs = RESP_FAILED_HOST_ERROR + case "UNKNOWN_FAILURE": + rs = RESP_FAILED_UNKNOWN + default: + return InvalidValueError + } + return nil +} + +func (rs ResponseState) Finished() bool { + switch rs { + case RESP_FINISHED: + fallthrough + case RESP_FAILED: + fallthrough + case RESP_FAILED_UNKNOWN_SCORE: + fallthrough + case RESP_FAILED_HOST_ERROR: + fallthrough + case RESP_FAILED_UNKNOWN: + return true + } + return false +} + +// true if the response says the task failed. false otherwise. +func (rs ResponseState) Failed() bool { + switch rs { + case RESP_RUNNING: + fallthrough + case RESP_FINISHED: + return false + } + return true +} + +// true if the task can be tried. +// precond: DidFail is true, job is a ONE_OF job. +// must return false otherwise. +func (rs ResponseState) CanRetry() bool { + switch rs { + case RESP_FAILED_UNKNOWN_SCORE: + fallthrough + case RESP_FAILED_HOST_ERROR: + return true + } + return false +} \ No newline at end of file diff --git a/src/player/scores.go b/src/player/scores.go new file mode 100644 index 0000000..6f727e3 --- /dev/null +++ b/src/player/scores.go @@ -0,0 +1,134 @@ +// scores.go +// +// Score handling +// +// In here, we have the probing code that learns about scores, reads +// their configuration files, and does the heavy lifting for launching +// them, doing the privilege drop, etc. + +package main + +import ( + "os" + "io" + "strings" + o "orchestra" + "path" + "github.com/kuroneko/configureit" +) + +type ScoreInfo struct { + Name string + Executable string + InitialPwd string + InitialEnv map[string]string + + Interface string + + Config *configureit.Config +} + +type ScoreExecution struct { + Score *ScoreInfo + Task *TaskRequest +} + + +func NewScoreInfo() (si *ScoreInfo) { + si = new (ScoreInfo) + si.InitialEnv = make(map[string]string) + + config := NewScoreInfoConfig() + si.updateFromConfig(config) + + return si +} + +func NewScoreInfoConfig() (config *configureit.Config) { + config = configureit.New() + + config.Add("interface", configureit.NewStringOption("env")) + config.Add("dir", configureit.NewStringOption("")) + config.Add("path", configureit.NewStringOption("/usr/bin:/bin")) + config.Add("user", configureit.NewUserOption("")) + + return config +} + +func (si *ScoreInfo) updateFromConfig(config *configureit.Config) { + // propogate PATH overrides. + opt := config.Get("dir") + sopt, _ := opt.(*configureit.StringOption) + si.InitialEnv["PATH"] = sopt.Value + + // set the interface type. + opt = config.Get("interface") + sopt, _ = opt.(*configureit.StringOption) + si.Interface = sopt.Value + + // propogate initial Pwd + opt = config.Get("dir") + sopt, _ = opt.(*configureit.StringOption) + si.InitialPwd = sopt.Value +} + +var ( + Scores map[string]*ScoreInfo +) + +func ScoreConfigure(si *ScoreInfo, r io.Reader) { + config := NewScoreInfoConfig() + err := config.Read(r, 1) + o.MightFail(err, "Error Parsing Score Configuration for %s", si.Name) + si.updateFromConfig(config) +} + +func LoadScores() { + scoreDirectory := GetStringOpt("score directory") + + dir, err := os.Open(scoreDirectory) + o.MightFail(err, "Couldn't open Score directory") + defer dir.Close() + + Scores = make(map[string]*ScoreInfo) + + files, err := dir.Readdir(-1) + for i := range files { + // skip ., .. and other dotfiles. + if strings.HasPrefix(files[i].Name, ".") { + continue + } + // emacs backup files. ignore these. + if strings.HasSuffix(files[i].Name, "~") || strings.HasPrefix(files[i].Name, "#") { + continue + } + // .conf is reserved for score configurations. + if strings.HasSuffix(files[i].Name, ".conf") { + continue + } + if !files[i].IsRegular() && !files[i].IsSymlink() { + continue + } + + // check for the executionable bit + if (files[i].Permission() & 0111) != 0 { + fullpath := path.Join(scoreDirectory, files[i].Name) + conffile := fullpath+".conf" + o.Warn("Considering %s as score", files[i].Name) + + si := NewScoreInfo() + si.Name = files[i].Name + si.Executable = fullpath + + conf, err := os.Open(conffile) + if err == nil { + o.Warn("Parsing configuration for %s", fullpath) + ScoreConfigure(si, conf) + conf.Close() + } else { + o.Warn("Couldn't open config file for %s, assuming defaults: %s", files[i].Name, err) + } + Scores[files[i].Name] = si + } + } +} \ No newline at end of file diff --git a/src/player/signal.go b/src/player/signal.go new file mode 100644 index 0000000..626fb73 --- /dev/null +++ b/src/player/signal.go @@ -0,0 +1,49 @@ +/* signal.go + * + * Signal Handlers + */ + +package main + +import ( + "os/signal" + "os" + "syscall" + "fmt" + o "orchestra" +) + + +// handle the signals. By default, we ignore everything, but the +// three terminal signals, HUP, INT, TERM, we want to explicitly +// handle. +func signalHandler() { + for { + sig := <-signal.Incoming + + ux, ok := sig.(os.UnixSignal) + if !ok { + o.Warn("Couldn't handle signal %s, Coercion failed", sig) + continue + } + + switch int(ux) { + case syscall.SIGHUP: + o.Warn("Reloading Configuration") + reloadScores <- 1 + case syscall.SIGINT: + fmt.Fprintln(os.Stderr, "Interrupt Received - Terminating") + //FIXME: Gentle Shutdown + os.Exit(1) + case syscall.SIGTERM: + fmt.Fprintln(os.Stderr, "Terminate Received - Terminating") + //FIXME: Gentle Shutdown + os.Exit(2) + } + } + +} + +func init() { + go signalHandler() +} diff --git a/src/player/task_request.go b/src/player/task_request.go new file mode 100644 index 0000000..b571c27 --- /dev/null +++ b/src/player/task_request.go @@ -0,0 +1,33 @@ +// task_request.go +// + +package main + +import ( + o "orchestra" +) + +type TaskRequest struct { + Id uint64 `json:"id"` + Score string `json:"score"` + Params map[string]string `json:"params"` + MyResponse *TaskResponse `json:"response"` +} + +func NewTaskRequest() (req *TaskRequest) { + req = new(TaskRequest) + return req +} + +/* Map a wire task to an internal Task Request. +*/ +func TaskFromProto(ptr *o.ProtoTaskRequest) (t *TaskRequest) { + t = NewTaskRequest() + + t.Score = *(ptr.Jobname) + t.Id = *(ptr.Id) + t.Params = o.MapFromProtoJobParameters(ptr.Parameters) + + return t +} + diff --git a/src/player/task_response.go b/src/player/task_response.go new file mode 100644 index 0000000..f284b5c --- /dev/null +++ b/src/player/task_response.go @@ -0,0 +1,88 @@ +// task_response.go +// +package main + +import ( + o "orchestra" +) + +type TaskResponse struct { + id uint64 + State ResponseState `json:"state"` + Response map[string]string `json:"response"` + // player only fields + RetryTime int64 `json:"retrytime"` +} + +// Response related magic + +func NewTaskResponse() (resp *TaskResponse) { + resp = new(TaskResponse) + resp.Response = make(map[string]string) + + return resp +} + +func (resp *TaskResponse) IsFinished() bool { + return resp.State.Finished() +} + +func (resp *TaskResponse) DidFail() bool { + return resp.State.Failed() +} + +func (resp *TaskResponse) CanRetry() bool { + return resp.State.CanRetry() +} + + +func ResponseFromProto(ptr *o.ProtoTaskResponse) (r *TaskResponse) { + r = new(TaskResponse) + + switch (*(ptr.Status)) { + case o.ProtoTaskResponse_JOB_INPROGRESS: + r.State = RESP_RUNNING + case o.ProtoTaskResponse_JOB_SUCCESS: + r.State = RESP_FINISHED + case o.ProtoTaskResponse_JOB_FAILED: + r.State = RESP_FAILED + case o.ProtoTaskResponse_JOB_HOST_FAILURE: + r.State = RESP_FAILED_HOST_ERROR + case o.ProtoTaskResponse_JOB_UNKNOWN: + r.State = RESP_FAILED_UNKNOWN_SCORE + case o.ProtoTaskResponse_JOB_UNKNOWN_FAILURE: + fallthrough + default: + r.State = RESP_FAILED_UNKNOWN + } + + r.id = *(ptr.Id) + r.Response = o.MapFromProtoJobParameters(ptr.Response) + + return r +} + +func (resp *TaskResponse) Encode() (ptr *o.ProtoTaskResponse) { + ptr = new(o.ProtoTaskResponse) + + switch resp.State { + case RESP_RUNNING: + ptr.Status = o.NewProtoTaskResponse_TaskStatus(o.ProtoTaskResponse_JOB_INPROGRESS) + case RESP_FINISHED: + ptr.Status = o.NewProtoTaskResponse_TaskStatus(o.ProtoTaskResponse_JOB_SUCCESS) + case RESP_FAILED: + ptr.Status = o.NewProtoTaskResponse_TaskStatus(o.ProtoTaskResponse_JOB_FAILED) + case RESP_FAILED_UNKNOWN_SCORE: + ptr.Status = o.NewProtoTaskResponse_TaskStatus(o.ProtoTaskResponse_JOB_UNKNOWN) + case RESP_FAILED_HOST_ERROR: + ptr.Status = o.NewProtoTaskResponse_TaskStatus(o.ProtoTaskResponse_JOB_HOST_FAILURE) + case RESP_FAILED_UNKNOWN: + ptr.Status = o.NewProtoTaskResponse_TaskStatus(o.ProtoTaskResponse_JOB_UNKNOWN_FAILURE) + } + ptr.Id = new(uint64) + *ptr.Id = resp.id + ptr.Response = o.ProtoJobParametersFromMap(resp.Response) + + return ptr +} + diff --git a/src/submitjob/Makefile b/src/submitjob/Makefile new file mode 100644 index 0000000..f5f972e --- /dev/null +++ b/src/submitjob/Makefile @@ -0,0 +1,8 @@ +include $(GOROOT)/src/Make.inc + +TARG=submitjob + +GOFILES=\ + submitjob.go\ + +include $(GOROOT)/src/Make.cmd diff --git a/src/submitjob/submitjob.go b/src/submitjob/submitjob.go new file mode 100644 index 0000000..2142e5a --- /dev/null +++ b/src/submitjob/submitjob.go @@ -0,0 +1,142 @@ +// submitjob.go +// +// A sample Orchestra submission client. + +package main + +import ( + "io" + "net" + "json" + "flag" + "fmt" + "os" +) + +type JobRequest struct { + Op string `json:"op"` + Score string `json:"score"` + Players []string `json:"players"` + Scope string `json:"scope"` + Params map[string]string `json:"params"` +} + +var ( + AllOf = flag.Bool("all-of", false, "Send request to all named players") + AudienceSock = flag.String("audience-sock", "/var/spool/orchestra/conductor.sock", "Path for the audience submission socket") +) + +func NewJobRequest() (jr *JobRequest) { + jr = new(JobRequest) + jr.Params = make(map[string]string) + + return jr +} + +func Usage() { + fmt.Fprintf(os.Stderr, "Usage:\n") + fmt.Fprintf(os.Stderr, " %s [] [...] [! [ ]...]\n", os.Args[0]) + flag.PrintDefaults() +} + +func main() { + flag.Usage = Usage + flag.Parse() + + args := flag.Args() + + if len(args) < 2 { + flag.Usage() + os.Exit(1) + } + + jr := NewJobRequest() + jr.Op = "queue" + jr.Score = args[0] + if *AllOf { + jr.Scope = "all" + } else { + jr.Scope = "one" + } + + var k int + for k = 1; k < len(args); k++ { + if args[k] == "!" { + break; + } + insertionpoint := 0 + if nil == jr.Players { + jr.Players = make([]string, 1) + } else { + insertionpoint = len(jr.Players) + newplayers := make([]string, insertionpoint+1) + copy(newplayers, jr.Players) + jr.Players = newplayers + } + jr.Players[insertionpoint] = args[k] + } + if (k < len(args)) { + // skip the ! + k++ + if (len(args) - (k))%2 != 0 { + fmt.Fprintf(os.Stderr, "Error: Odd number of param arguments.\n") + os.Exit(1) + } + for ; k < len(args); k+=2 { + jr.Params[args[k]] = args[k+1] + } + } + + raddr, err := net.ResolveUnixAddr("unix", *AudienceSock) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to resolve sockaddr: %s\n", err) + os.Exit(1) + } + conn, err := net.DialUnix("unix", nil, raddr) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to connect to sockaddr: %s\n", err) + os.Exit(1) + } + + defer conn.Close() + + conn.SetTimeout(0) + + nc := net.Conn(conn) + + r, _ := nc.(io.Reader) + w, _ := nc.(io.Writer) + + dec := json.NewDecoder(r) + enc := json.NewEncoder(w) + + // send the message + err = enc.Encode(jr) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to marshal & send: %s\n", err) + os.Exit(1) + } + + response := new([2]interface{}) + err = dec.Decode(response) + if err != nil { + fmt.Fprintf(os.Stderr, "Error decoding response: %s\n", err) + os.Exit(1) + } + // coerce field 0 back into a string. + rerr,ok := response[0].(string) + if ok { + if rerr == "OK" { + // all OK! get the JobID + jobid, _ := response[1].(float64) + fmt.Printf("%d\n", uint64(jobid)) + os.Exit(0) + } else { + fmt.Fprintf(os.Stderr, "Server Error: %s\n", rerr) + os.Exit(1) + } + } else { + fmt.Fprintf(os.Stderr, "Couldn't unmarshal response correctly.\n"); + os.Exit(1) + } +} -- 2.30.2