]> git.lizzy.rs Git - plan9front.git/blob - sys/lib/python/asyncore.py
dist/mkfile: run binds in subshell
[plan9front.git] / sys / lib / python / asyncore.py
1 # -*- Mode: Python -*-
2 #   Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 #   Author: Sam Rushing <rushing@nightmare.com>
4
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
7 #
8 #                         All Rights Reserved
9 #
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
17 # permission.
18 #
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
27
28 """Basic infrastructure for asynchronous socket service clients and servers.
29
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time".  Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
38
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background."  Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
47 """
48
49 import select
50 import socket
51 import sys
52 import time
53
54 import os
55 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
56      ENOTCONN, ESHUTDOWN, EINTR, EISCONN, errorcode
57
58 try:
59     socket_map
60 except NameError:
61     socket_map = {}
62
63 class ExitNow(Exception):
64     pass
65
66 def read(obj):
67     try:
68         obj.handle_read_event()
69     except ExitNow:
70         raise
71     except:
72         obj.handle_error()
73
74 def write(obj):
75     try:
76         obj.handle_write_event()
77     except ExitNow:
78         raise
79     except:
80         obj.handle_error()
81
82 def _exception (obj):
83     try:
84         obj.handle_expt_event()
85     except ExitNow:
86         raise
87     except:
88         obj.handle_error()
89
90 def readwrite(obj, flags):
91     try:
92         if flags & (select.POLLIN | select.POLLPRI):
93             obj.handle_read_event()
94         if flags & select.POLLOUT:
95             obj.handle_write_event()
96         if flags & (select.POLLERR | select.POLLHUP | select.POLLNVAL):
97             obj.handle_expt_event()
98     except ExitNow:
99         raise
100     except:
101         obj.handle_error()
102
103 def poll(timeout=0.0, map=None):
104     if map is None:
105         map = socket_map
106     if map:
107         r = []; w = []; e = []
108         for fd, obj in map.items():
109             is_r = obj.readable()
110             is_w = obj.writable()
111             if is_r:
112                 r.append(fd)
113             if is_w:
114                 w.append(fd)
115             if is_r or is_w:
116                 e.append(fd)
117         if [] == r == w == e:
118             time.sleep(timeout)
119         else:
120             try:
121                 r, w, e = select.select(r, w, e, timeout)
122             except select.error, err:
123                 if err[0] != EINTR:
124                     raise
125                 else:
126                     return
127
128         for fd in r:
129             obj = map.get(fd)
130             if obj is None:
131                 continue
132             read(obj)
133
134         for fd in w:
135             obj = map.get(fd)
136             if obj is None:
137                 continue
138             write(obj)
139
140         for fd in e:
141             obj = map.get(fd)
142             if obj is None:
143                 continue
144             _exception(obj)
145
146 def poll2(timeout=0.0, map=None):
147     # Use the poll() support added to the select module in Python 2.0
148     if map is None:
149         map = socket_map
150     if timeout is not None:
151         # timeout is in milliseconds
152         timeout = int(timeout*1000)
153     pollster = select.poll()
154     if map:
155         for fd, obj in map.items():
156             flags = 0
157             if obj.readable():
158                 flags |= select.POLLIN | select.POLLPRI
159             if obj.writable():
160                 flags |= select.POLLOUT
161             if flags:
162                 # Only check for exceptions if object was either readable
163                 # or writable.
164                 flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
165                 pollster.register(fd, flags)
166         try:
167             r = pollster.poll(timeout)
168         except select.error, err:
169             if err[0] != EINTR:
170                 raise
171             r = []
172         for fd, flags in r:
173             obj = map.get(fd)
174             if obj is None:
175                 continue
176             readwrite(obj, flags)
177
178 poll3 = poll2                           # Alias for backward compatibility
179
180 def loop(timeout=30.0, use_poll=False, map=None, count=None):
181     if map is None:
182         map = socket_map
183
184     if use_poll and hasattr(select, 'poll'):
185         poll_fun = poll2
186     else:
187         poll_fun = poll
188
189     if count is None:
190         while map:
191             poll_fun(timeout, map)
192
193     else:
194         while map and count > 0:
195             poll_fun(timeout, map)
196             count = count - 1
197
198 class dispatcher:
199
200     debug = False
201     connected = False
202     accepting = False
203     closing = False
204     addr = None
205
206     def __init__(self, sock=None, map=None):
207         if map is None:
208             self._map = socket_map
209         else:
210             self._map = map
211
212         if sock:
213             self.set_socket(sock, map)
214             # I think it should inherit this anyway
215             self.socket.setblocking(0)
216             self.connected = True
217             # XXX Does the constructor require that the socket passed
218             # be connected?
219             try:
220                 self.addr = sock.getpeername()
221             except socket.error:
222                 # The addr isn't crucial
223                 pass
224         else:
225             self.socket = None
226
227     def __repr__(self):
228         status = [self.__class__.__module__+"."+self.__class__.__name__]
229         if self.accepting and self.addr:
230             status.append('listening')
231         elif self.connected:
232             status.append('connected')
233         if self.addr is not None:
234             try:
235                 status.append('%s:%d' % self.addr)
236             except TypeError:
237                 status.append(repr(self.addr))
238         return '<%s at %#x>' % (' '.join(status), id(self))
239
240     def add_channel(self, map=None):
241         #self.log_info('adding channel %s' % self)
242         if map is None:
243             map = self._map
244         map[self._fileno] = self
245
246     def del_channel(self, map=None):
247         fd = self._fileno
248         if map is None:
249             map = self._map
250         if map.has_key(fd):
251             #self.log_info('closing channel %d:%s' % (fd, self))
252             del map[fd]
253         self._fileno = None
254
255     def create_socket(self, family, type):
256         self.family_and_type = family, type
257         self.socket = socket.socket(family, type)
258         self.socket.setblocking(0)
259         self._fileno = self.socket.fileno()
260         self.add_channel()
261
262     def set_socket(self, sock, map=None):
263         self.socket = sock
264 ##        self.__dict__['socket'] = sock
265         self._fileno = sock.fileno()
266         self.add_channel(map)
267
268     def set_reuse_addr(self):
269         # try to re-use a server port if possible
270         try:
271             self.socket.setsockopt(
272                 socket.SOL_SOCKET, socket.SO_REUSEADDR,
273                 self.socket.getsockopt(socket.SOL_SOCKET,
274                                        socket.SO_REUSEADDR) | 1
275                 )
276         except socket.error:
277             pass
278
279     # ==================================================
280     # predicates for select()
281     # these are used as filters for the lists of sockets
282     # to pass to select().
283     # ==================================================
284
285     def readable(self):
286         return True
287
288     def writable(self):
289         return True
290
291     # ==================================================
292     # socket object methods.
293     # ==================================================
294
295     def listen(self, num):
296         self.accepting = True
297         if os.name == 'nt' and num > 5:
298             num = 1
299         return self.socket.listen(num)
300
301     def bind(self, addr):
302         self.addr = addr
303         return self.socket.bind(addr)
304
305     def connect(self, address):
306         self.connected = False
307         err = self.socket.connect_ex(address)
308         # XXX Should interpret Winsock return values
309         if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
310             return
311         if err in (0, EISCONN):
312             self.addr = address
313             self.connected = True
314             self.handle_connect()
315         else:
316             raise socket.error, (err, errorcode[err])
317
318     def accept(self):
319         # XXX can return either an address pair or None
320         try:
321             conn, addr = self.socket.accept()
322             return conn, addr
323         except socket.error, why:
324             if why[0] == EWOULDBLOCK:
325                 pass
326             else:
327                 raise
328
329     def send(self, data):
330         try:
331             result = self.socket.send(data)
332             return result
333         except socket.error, why:
334             if why[0] == EWOULDBLOCK:
335                 return 0
336             else:
337                 raise
338             return 0
339
340     def recv(self, buffer_size):
341         try:
342             data = self.socket.recv(buffer_size)
343             if not data:
344                 # a closed connection is indicated by signaling
345                 # a read condition, and having recv() return 0.
346                 self.handle_close()
347                 return ''
348             else:
349                 return data
350         except socket.error, why:
351             # winsock sometimes throws ENOTCONN
352             if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
353                 self.handle_close()
354                 return ''
355             else:
356                 raise
357
358     def close(self):
359         self.del_channel()
360         self.socket.close()
361
362     # cheap inheritance, used to pass all other attribute
363     # references to the underlying socket object.
364     def __getattr__(self, attr):
365         return getattr(self.socket, attr)
366
367     # log and log_info may be overridden to provide more sophisticated
368     # logging and warning methods. In general, log is for 'hit' logging
369     # and 'log_info' is for informational, warning and error logging.
370
371     def log(self, message):
372         sys.stderr.write('log: %s\n' % str(message))
373
374     def log_info(self, message, type='info'):
375         if __debug__ or type != 'info':
376             print '%s: %s' % (type, message)
377
378     def handle_read_event(self):
379         if self.accepting:
380             # for an accepting socket, getting a read implies
381             # that we are connected
382             if not self.connected:
383                 self.connected = True
384             self.handle_accept()
385         elif not self.connected:
386             self.handle_connect()
387             self.connected = True
388             self.handle_read()
389         else:
390             self.handle_read()
391
392     def handle_write_event(self):
393         # getting a write implies that we are connected
394         if not self.connected:
395             self.handle_connect()
396             self.connected = True
397         self.handle_write()
398
399     def handle_expt_event(self):
400         self.handle_expt()
401
402     def handle_error(self):
403         nil, t, v, tbinfo = compact_traceback()
404
405         # sometimes a user repr method will crash.
406         try:
407             self_repr = repr(self)
408         except:
409             self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
410
411         self.log_info(
412             'uncaptured python exception, closing channel %s (%s:%s %s)' % (
413                 self_repr,
414                 t,
415                 v,
416                 tbinfo
417                 ),
418             'error'
419             )
420         self.close()
421
422     def handle_expt(self):
423         self.log_info('unhandled exception', 'warning')
424
425     def handle_read(self):
426         self.log_info('unhandled read event', 'warning')
427
428     def handle_write(self):
429         self.log_info('unhandled write event', 'warning')
430
431     def handle_connect(self):
432         self.log_info('unhandled connect event', 'warning')
433
434     def handle_accept(self):
435         self.log_info('unhandled accept event', 'warning')
436
437     def handle_close(self):
438         self.log_info('unhandled close event', 'warning')
439         self.close()
440
441 # ---------------------------------------------------------------------------
442 # adds simple buffered output capability, useful for simple clients.
443 # [for more sophisticated usage use asynchat.async_chat]
444 # ---------------------------------------------------------------------------
445
446 class dispatcher_with_send(dispatcher):
447
448     def __init__(self, sock=None, map=None):
449         dispatcher.__init__(self, sock, map)
450         self.out_buffer = ''
451
452     def initiate_send(self):
453         num_sent = 0
454         num_sent = dispatcher.send(self, self.out_buffer[:512])
455         self.out_buffer = self.out_buffer[num_sent:]
456
457     def handle_write(self):
458         self.initiate_send()
459
460     def writable(self):
461         return (not self.connected) or len(self.out_buffer)
462
463     def send(self, data):
464         if self.debug:
465             self.log_info('sending %s' % repr(data))
466         self.out_buffer = self.out_buffer + data
467         self.initiate_send()
468
469 # ---------------------------------------------------------------------------
470 # used for debugging.
471 # ---------------------------------------------------------------------------
472
473 def compact_traceback():
474     t, v, tb = sys.exc_info()
475     tbinfo = []
476     assert tb # Must have a traceback
477     while tb:
478         tbinfo.append((
479             tb.tb_frame.f_code.co_filename,
480             tb.tb_frame.f_code.co_name,
481             str(tb.tb_lineno)
482             ))
483         tb = tb.tb_next
484
485     # just to be safe
486     del tb
487
488     file, function, line = tbinfo[-1]
489     info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
490     return (file, function, line), t, v, info
491
492 def close_all(map=None):
493     if map is None:
494         map = socket_map
495     for x in map.values():
496         x.socket.close()
497     map.clear()
498
499 # Asynchronous File I/O:
500 #
501 # After a little research (reading man pages on various unixen, and
502 # digging through the linux kernel), I've determined that select()
503 # isn't meant for doing asynchronous file i/o.
504 # Heartening, though - reading linux/mm/filemap.c shows that linux
505 # supports asynchronous read-ahead.  So _MOST_ of the time, the data
506 # will be sitting in memory for us already when we go to read it.
507 #
508 # What other OS's (besides NT) support async file i/o?  [VMS?]
509 #
510 # Regardless, this is useful for pipes, and stdin/stdout...
511
512 if os.name == 'posix':
513     import fcntl
514
515     class file_wrapper:
516         # here we override just enough to make a file
517         # look like a socket for the purposes of asyncore.
518
519         def __init__(self, fd):
520             self.fd = fd
521
522         def recv(self, *args):
523             return os.read(self.fd, *args)
524
525         def send(self, *args):
526             return os.write(self.fd, *args)
527
528         read = recv
529         write = send
530
531         def close(self):
532             os.close(self.fd)
533
534         def fileno(self):
535             return self.fd
536
537     class file_dispatcher(dispatcher):
538
539         def __init__(self, fd, map=None):
540             dispatcher.__init__(self, None, map)
541             self.connected = True
542             self.set_file(fd)
543             # set it to non-blocking mode
544             flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
545             flags = flags | os.O_NONBLOCK
546             fcntl.fcntl(fd, fcntl.F_SETFL, flags)
547
548         def set_file(self, fd):
549             self._fileno = fd
550             self.socket = file_wrapper(fd)
551             self.add_channel()