11 class TimeoutException(Exception): pass
12 class OrchestraError(Exception): pass
13 class OrchestraJSONError(OrchestraError): pass
15 def timecall(func, *args, **argd):
17 try: ret = func(*args,**argd)
18 finally: print time.time() - start
21 def poll_until_not_none(func, delay=0.1, backoff=1, max_delay=None, timeout=None):
22 '''call a function regularly until it returns not None and then return the result
24 the function is called every 'delay' seconds
25 after every unsuccessful call, the delay is multiplied by 'backoff', and limited to max_delay
27 if timeout is set, a TimeoutException is raised if that much time has passed without result
28 the function will always be called at least once without delay, regardless of the value of timeout
29 if a timeout is set the function may be called immediately before timing out, even if the delay was longer
31 #if timeout is not None:
34 beforecall = time.time()-start
36 if ret is not None: return ret
37 elapsed = time.time()-start
38 if timeout is not None and elapsed >= timeout:
39 raise TimeoutException()
40 sleepfor = delay - (elapsed-beforecall)
41 if timeout is not None and elapsed+sleepfor > timeout:
42 sleepfor = timeout-elapsed
47 delay = min(delay, max_delay)
49 class OrchestraClient(object):
50 '''Simple Orchestra Audience
52 >>> oc = OrchestraClient()
53 >>> oc.submit_job('helloworld','one','orchestra.player.hostname.example')
55 >>> oc.get_status(200034)
56 {u'Status': u'OK', u'Players': {u'orchestra.player.hostname.example': {u'Status': u'OK', u'Response': {}}}}
57 >>> oc.submit_job('madworld','one','orchestra.player.hostname.example')
59 >>> oc.get_status(200035)
60 {u'Status': u'FAIL', u'Players': {u'orchestra.player.hostname.example': {u'Status': u'HOST_ERROR', u'Response': {}}}}
61 >>> oc.submit_job('echo','one','orchestra.player.hostname.example', {'foo': 12345, 'nyan': 54321, 'cat': 'dog'})
63 >>> oc.wait_for_completion(200038, timeout=30)
64 {u'Status': u'OK', u'Players': {u'orchestra.player.hostname.example': {u'Status': u'OK', u'Response': {u'echo': u'{"IFS": " \\t\\n", "ORC_nyan": "54321", "PWD": "/var/lib/service/player", "ORC_foo": "12345", "PATH": "/usr/bin:/usr/sbin:/bin:/sbin", "ORC_cat": "dog"}'}}}}
66 def _get_conductor_socket(self):
67 '''get a socket connected to the conductor
68 current implementation is a UNIX socket'''
69 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
70 sock.connect(config.orchestra_socket)
73 def orchestra_request(self, **argd):
74 '''make a request to the Orchestra conductor
75 pre: all non-positional arguments passed form a well-formed orchestra request
77 sock = self._get_conductor_socket()
83 response = json.loads(data)
86 raise OrchestraError('Conductor burned socket. Probably invalid call')
87 raise OrchestraJSONError(("Couldn't decode as JSON",data))
91 if response[0] == 'OK':
92 # FIXME: This is not demarshalling any json coming back from individual players!
95 raise OrchestraError(response[1])
97 def submit_job(self, score, scope, target, args={}):
98 '''submit a job to orchestra as per the Audience API
99 Any keys and values in args will be coerced into unicode objects
101 if isinstance(target, basestring):
103 args = dict((unicode(k),unicode(v)) for k,v in args.items())
104 request = dict(Op='queue', Score=score, Scope=scope, Players=list(target), Params=args)
105 logging.debug('Submitting Orchestra job: %s', repr(request))
106 return self.orchestra_request(**request)
108 def get_status(self, jobid):
109 return self.orchestra_request(Op='status', Id=jobid)
111 def completed_status_or_none(self, jobid):
112 '''return the job status if completed or failed, or None if still Pending'''
113 status = self.get_status(jobid)
114 return None if status['Status'] == 'PENDING' else status
116 def wait_for_completion(self, jobid, timeout=None, delay=0.1, backoff=1.41414, max_poll_delay=2):
117 '''Block until an orchestra job leaves the Pending state
118 returns the status response of the job when it is available
119 raises TimeoutException iff timeout is not None and timeout seconds
120 have passed without the job leaving the Pending state
122 Orchestra calls are async, so we just keep polling until we get
125 pf = functools.partial(self.completed_status_or_none, jobid)
126 return poll_until_not_none(pf, delay=delay, backoff=backoff, timeout=timeout, max_delay=max_poll_delay)