Imported Upstream version 0.0~git20110829
[debian/mudpuppy.git] / util.py
1 #!/usr/bin/env python
2
3 import config
4 import socket
5 import auto
6 import json
7 import time
8 import functools
9 import logging
10
11 class TimeoutException(Exception): pass
12 class OrchestraError(Exception): pass
13 class OrchestraJSONError(OrchestraError): pass
14
15 def timecall(func, *args, **argd):
16         start = time.time()
17         try: ret = func(*args,**argd)
18         finally: print time.time() - start
19         return ret
20
21 def poll_until_not_none(func, delay=0.1, backoff=1, max_delay=None, timeout=None):
22         '''call a function regularly until it returns not None and then return the result
23
24         the function is called every 'delay' seconds
25         after every unsuccessful call, the delay is multiplied by 'backoff', and limited to max_delay
26
27         if timeout is set, a TimeoutException is raised if that much time has passed without result
28         the function will always be called at least once without delay, regardless of the value of timeout
29         if a timeout is set the function may be called immediately before timing out, even if the delay was longer
30         '''
31         #if timeout is not None:
32         start = time.time()
33         while True:
34                 beforecall = time.time()-start
35                 ret = func()
36                 if ret is not None: return ret
37                 elapsed = time.time()-start
38                 if timeout is not None and elapsed >= timeout:
39                         raise TimeoutException()
40                 sleepfor = delay - (elapsed-beforecall)
41                 if timeout is not None and elapsed+sleepfor > timeout:
42                         sleepfor = timeout-elapsed
43                 if sleepfor > 0:
44                         time.sleep(sleepfor)
45                 delay *= backoff
46                 if max_delay:
47                         delay = min(delay, max_delay)
48
49 class OrchestraClient(object):
50         '''Simple Orchestra Audience
51
52         >>> oc = OrchestraClient()
53         >>> oc.submit_job('helloworld','one','orchestra.player.hostname.example')
54         200034
55         >>> oc.get_status(200034)
56         {u'Status': u'OK', u'Players': {u'orchestra.player.hostname.example': {u'Status': u'OK', u'Response': {}}}}
57         >>> oc.submit_job('madworld','one','orchestra.player.hostname.example')
58         200035
59         >>> oc.get_status(200035)
60         {u'Status': u'FAIL', u'Players': {u'orchestra.player.hostname.example': {u'Status': u'HOST_ERROR', u'Response': {}}}}
61         >>> oc.submit_job('echo','one','orchestra.player.hostname.example', {'foo': 12345, 'nyan': 54321, 'cat': 'dog'})
62         200038
63         >>> oc.wait_for_completion(200038, timeout=30)
64         {u'Status': u'OK', u'Players': {u'orchestra.player.hostname.example': {u'Status': u'OK', u'Response': {u'echo': u'{"IFS": " \\t\\n", "ORC_nyan": "54321", "PWD": "/var/lib/service/player", "ORC_foo": "12345", "PATH": "/usr/bin:/usr/sbin:/bin:/sbin", "ORC_cat": "dog"}'}}}}
65         '''
66         def _get_conductor_socket(self):
67                 '''get a socket connected to the conductor
68                 current implementation is a UNIX socket'''
69                 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
70                 sock.connect(config.orchestra_socket)
71                 return sock
72
73         def orchestra_request(self, **argd):
74                 '''make a request to the Orchestra conductor
75                 pre: all non-positional arguments passed form a well-formed orchestra request
76                 '''
77                 sock = self._get_conductor_socket()
78                 f = sock.makefile()
79                 try:
80                         json.dump(argd, f)
81                         f.flush()
82                         data = f.read()
83                         response = json.loads(data)
84                 except ValueError:
85                         if data == '':
86                                 raise OrchestraError('Conductor burned socket. Probably invalid call')
87                         raise OrchestraJSONError(("Couldn't decode as JSON",data))
88                 finally:
89                         sock.close()
90
91                 if response[0] == 'OK':
92                         # FIXME: This is not demarshalling any json coming back from individual players!
93                         return response[1]      
94                 else:
95                         raise OrchestraError(response[1])
96
97         def submit_job(self, score, scope, target, args={}):
98                 '''submit a job to orchestra as per the Audience API
99                 Any keys and values in args will be coerced into unicode objects
100                 '''
101                 if isinstance(target, basestring):
102                         target = [target]
103                 args = dict((unicode(k),unicode(v)) for k,v in args.items())
104                 request = dict(Op='queue', Score=score, Scope=scope, Players=list(target), Params=args)
105                 logging.debug('Submitting Orchestra job: %s', repr(request))
106                 return self.orchestra_request(**request)
107
108         def get_status(self, jobid):
109                 return self.orchestra_request(Op='status', Id=jobid)
110
111         def completed_status_or_none(self, jobid):
112                 '''return the job status if completed or failed, or None if still Pending'''
113                 status = self.get_status(jobid)
114                 return None if status['Status'] == 'PENDING' else status
115
116         def wait_for_completion(self, jobid, timeout=None, delay=0.1, backoff=1.41414, max_poll_delay=2):
117                 '''Block until an orchestra job leaves the Pending state
118                 returns the status response of the job when it is available
119                 raises TimeoutException iff timeout is not None and timeout seconds 
120                 have passed without the job leaving the Pending state
121
122                 Orchestra calls are async, so we just keep polling until we get
123                 somewhere
124                 '''
125                 pf = functools.partial(self.completed_status_or_none, jobid)
126                 return poll_until_not_none(pf, delay=delay, backoff=backoff, timeout=timeout, max_delay=max_poll_delay)