]> git.lizzy.rs Git - plan9front.git/blob - sys/lib/python/dummy_thread.py
igfx: fix sandybridge fdi link training bits and ordering
[plan9front.git] / sys / lib / python / dummy_thread.py
1 """Drop-in replacement for the thread module.
2
3 Meant to be used as a brain-dead substitute so that threaded code does
4 not need to be rewritten for when the thread module is not present.
5
6 Suggested usage is::
7
8     try:
9         import thread
10     except ImportError:
11         import dummy_thread as thread
12
13 """
14 __author__ = "Brett Cannon"
15 __email__ = "brett@python.org"
16
17 # Exports only things specified by thread documentation
18 # (skipping obsolete synonyms allocate(), start_new(), exit_thread())
19 __all__ = ['error', 'start_new_thread', 'exit', 'get_ident', 'allocate_lock',
20            'interrupt_main', 'LockType']
21
22 import traceback as _traceback
23 import warnings
24
25 class error(Exception):
26     """Dummy implementation of thread.error."""
27
28     def __init__(self, *args):
29         self.args = args
30
31 def start_new_thread(function, args, kwargs={}):
32     """Dummy implementation of thread.start_new_thread().
33
34     Compatibility is maintained by making sure that ``args`` is a
35     tuple and ``kwargs`` is a dictionary.  If an exception is raised
36     and it is SystemExit (which can be done by thread.exit()) it is
37     caught and nothing is done; all other exceptions are printed out
38     by using traceback.print_exc().
39
40     If the executed function calls interrupt_main the KeyboardInterrupt will be
41     raised when the function returns.
42
43     """
44     if type(args) != type(tuple()):
45         raise TypeError("2nd arg must be a tuple")
46     if type(kwargs) != type(dict()):
47         raise TypeError("3rd arg must be a dict")
48     global _main
49     _main = False
50     try:
51         function(*args, **kwargs)
52     except SystemExit:
53         pass
54     except:
55         _traceback.print_exc()
56     _main = True
57     global _interrupt
58     if _interrupt:
59         _interrupt = False
60         raise KeyboardInterrupt
61
62 def exit():
63     """Dummy implementation of thread.exit()."""
64     raise SystemExit
65
66 def get_ident():
67     """Dummy implementation of thread.get_ident().
68
69     Since this module should only be used when threadmodule is not
70     available, it is safe to assume that the current process is the
71     only thread.  Thus a constant can be safely returned.
72     """
73     return -1
74
75 def allocate_lock():
76     """Dummy implementation of thread.allocate_lock()."""
77     return LockType()
78
79 def stack_size(size=None):
80     """Dummy implementation of thread.stack_size()."""
81     if size is not None:
82         raise error("setting thread stack size not supported")
83     return 0
84
85 class LockType(object):
86     """Class implementing dummy implementation of thread.LockType.
87
88     Compatibility is maintained by maintaining self.locked_status
89     which is a boolean that stores the state of the lock.  Pickling of
90     the lock, though, should not be done since if the thread module is
91     then used with an unpickled ``lock()`` from here problems could
92     occur from this class not having atomic methods.
93
94     """
95
96     def __init__(self):
97         self.locked_status = False
98
99     def acquire(self, waitflag=None):
100         """Dummy implementation of acquire().
101
102         For blocking calls, self.locked_status is automatically set to
103         True and returned appropriately based on value of
104         ``waitflag``.  If it is non-blocking, then the value is
105         actually checked and not set if it is already acquired.  This
106         is all done so that threading.Condition's assert statements
107         aren't triggered and throw a little fit.
108
109         """
110         if waitflag is None:
111             self.locked_status = True
112             return None
113         elif not waitflag:
114             if not self.locked_status:
115                 self.locked_status = True
116                 return True
117             else:
118                 return False
119         else:
120             self.locked_status = True
121             return True
122
123     __enter__ = acquire
124
125     def __exit__(self, typ, val, tb):
126         self.release()
127
128     def release(self):
129         """Release the dummy lock."""
130         # XXX Perhaps shouldn't actually bother to test?  Could lead
131         #     to problems for complex, threaded code.
132         if not self.locked_status:
133             raise error
134         self.locked_status = False
135         return True
136
137     def locked(self):
138         return self.locked_status
139
140 # Used to signal that interrupt_main was called in a "thread"
141 _interrupt = False
142 # True when not executing in a "thread"
143 _main = True
144
145 def interrupt_main():
146     """Set _interrupt flag to True to have start_new_thread raise
147     KeyboardInterrupt upon exiting."""
148     if _main:
149         raise KeyboardInterrupt
150     else:
151         global _interrupt
152         _interrupt = True