Improve asynchronous error handling

Add the ability to register callbacks for errors that occur on the
asynchronous variable sampling thread. Implement __del__ and call
close in case the user forgot. Join the asynchronous sampling thread
in close before returning.

Refs #365
This commit is contained in:
Derek Bankieris 2017-01-30 09:25:48 -06:00
parent a909c6400a
commit 39af68cc1d

View File

@ -189,6 +189,7 @@ class VariableServer:
"""
self._variables = []
self._callbacks = {}
self._error_callbacks = {}
self._lock = threading.Lock()
port = int(port)
self._synchronous_socket = socket.create_connection((hostname, port))
@ -203,9 +204,10 @@ class VariableServer:
while True:
try:
values = self._read_values(False)
except Exception:
except Exception as exception:
if self._open:
raise
for function, args in self._error_callbacks.items():
function(*args[0], exception=exception, **args[1])
return
# We must lock here to ensure that variables are not
# removed while we are processing an update.
@ -248,10 +250,20 @@ class VariableServer:
# process cannot terminate while non-daemon threads are running,
# so make this tread a deamon in case the user fails to call
# close when finished with this instance.
thread = threading.Thread(
self._thread = threading.Thread(
target=update_variables, name='Asynchronous Variable Sampler')
thread.daemon = True
thread.start()
self._thread.daemon = True
self._thread.start()
def __del__(self):
"""
Call close in case the user forgot. Don't rely on this to clean
up for you. You should really explicitly call close yourself.
"""
try:
self.close()
except:
pass
def __enter__(self):
return self
@ -599,6 +611,39 @@ class VariableServer:
"""
self._callbacks.pop(function, None)
def register_error_callback(self, function, args=(), kwargs={}):
"""
Call function if an error occurs while sampling variable values.
Registering an aleady-registered function replaces its existing
registration. The order in which functions are called is not
specified. Functions are executed on the asynchronous sampling
thread.
Paramaters
----------
function : callable
The function to call. It must accept a keyword argument
named 'exception' which will contain the error.
args : tuple
The positional arguments to be passed to the function.
kwargs : dict
The keyword arguments to be passed to the function.
"""
self._error_callbacks[function] = args, kwargs
def deregister_error_callback(self, function):
"""
Do not call function if an error occurs while sampling variable
values. Deregistering an unregistered function has no effect.
Parameters
----------
function : any
A function previously added via register_error_callback.
"""
self._error_callbacks.pop(function, None)
def pause(self, pause=True, channel=Channel.ASYNC):
"""
Pause or unpause sampling.
@ -918,6 +963,7 @@ class VariableServer:
self._asynchronous_socket.shutdown(socket.SHUT_RDWR)
self._synchronous_socket.close()
self._asynchronous_socket.close()
self._thread.join()
def fromPID(pid, timeout=None):
"""