Notification to clients

This commit is contained in:
grossmj 2014-03-19 18:48:42 -06:00
parent 99cc7345b8
commit c4d9e8371f
5 changed files with 91 additions and 23 deletions

View File

@ -183,14 +183,15 @@ class IModule(multiprocessing.Process):
self._current_call_id)) self._current_call_id))
self._stream.send_json(response) self._stream.send_json(response)
def send_notification(self, results): def send_notification(self, destination, results):
""" """
Sends a notification Sends a notification
:param results: JSON results to the ZeroMQ server :param destination: destination (or method)
:param results: JSON results to the ZeroMQ router
""" """
jsonrpc_response = jsonrpc.JSONRPCNotification(results)() jsonrpc_response = jsonrpc.JSONRPCNotification(destination, results)()
# add session to the response # add session to the response
response = [self._current_session, jsonrpc_response] response = [self._current_session, jsonrpc_response]

View File

@ -115,8 +115,8 @@ class Dynamips(IModule):
self._working_dir = self._projects_dir self._working_dir = self._projects_dir
self._dynamips = "" self._dynamips = ""
#self._callback = self.add_periodic_callback(self.test, 1000) self._callback = self.add_periodic_callback(self._check_hypervisors, 5000)
#self._callback.start() self._callback.start()
def stop(self): def stop(self):
""" """
@ -127,6 +127,27 @@ class Dynamips(IModule):
self._hypervisor_manager.stop_all_hypervisors() self._hypervisor_manager.stop_all_hypervisors()
IModule.stop(self) # this will stop the I/O loop IModule.stop(self) # this will stop the I/O loop
def _check_hypervisors(self):
"""
Periodic callback to check if Dynamips hypervisors are running.
Sends a notification to the client if not.
"""
if self._hypervisor_manager:
for hypervisor in self._hypervisor_manager.hypervisors:
if hypervisor.started and not hypervisor.is_running():
notification = {"module": self.name}
stdout = hypervisor.read_stdout()
device_names = []
for device in hypervisor.devices:
device_names.append(device.name)
notification["message"] = "Dynamips has stopped running"
notification["details"] = stdout
notification["devices"] = device_names
self.send_notification("{}.dynamips_stopped".format(self.name), notification)
hypervisor.stop()
@IModule.route("dynamips.reset") @IModule.route("dynamips.reset")
def reset(self, request): def reset(self, request):
""" """

View File

@ -54,6 +54,7 @@ class Hypervisor(DynamipsHypervisor):
self._command = [] self._command = []
self._process = None self._process = None
self._stdout_file = "" self._stdout_file = ""
self._started = False
# settings used the load-balance hypervisors # settings used the load-balance hypervisors
# (for the hypervisor manager) # (for the hypervisor manager)
@ -70,6 +71,16 @@ class Hypervisor(DynamipsHypervisor):
return(self._id) return(self._id)
@property
def started(self):
"""
Returns either this hypervisor has been started or not.
:returns: boolean
"""
return self._started
@property @property
def path(self): def path(self):
""" """
@ -199,6 +210,7 @@ class Hypervisor(DynamipsHypervisor):
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
cwd=self._working_dir) cwd=self._working_dir)
log.info("Dynamips started PID={}".format(self._process.pid)) log.info("Dynamips started PID={}".format(self._process.pid))
self._started = True
except EnvironmentError as e: except EnvironmentError as e:
log.error("could not start Dynamips: {}".format(e)) log.error("could not start Dynamips: {}".format(e))
raise DynamipsError("could not start Dynamips: {}".format(e)) raise DynamipsError("could not start Dynamips: {}".format(e))
@ -222,6 +234,8 @@ class Hypervisor(DynamipsHypervisor):
if self._process.poll() == None: if self._process.poll() == None:
log.warn("Dynamips process {} is still running".format(self._process.pid)) log.warn("Dynamips process {} is still running".format(self._process.pid))
self._started = False
def read_stdout(self): def read_stdout(self):
""" """
Reads the standard output of the Dynamips process. Reads the standard output of the Dynamips process.

View File

@ -82,7 +82,8 @@ class IOU(IModule):
self._working_dir = self._projects_dir self._working_dir = self._projects_dir
self._iourc = "" self._iourc = ""
self._iou_callback = self.add_periodic_callback(self._check_iou, 5000) # check every 5 seconds
self._iou_callback = self.add_periodic_callback(self._check_iou_is_alive, 5000)
self._iou_callback.start() self._iou_callback.start()
def stop(self): def stop(self):
@ -97,15 +98,30 @@ class IOU(IModule):
IModule.stop(self) # this will stop the I/O loop IModule.stop(self) # this will stop the I/O loop
def _check_iou(self): def _check_iou_is_alive(self):
"""
Periodic callback to check if IOU and iouyap are alive
for each IOU instance.
Sends a notification to the client if not.
"""
for iou_id in self._iou_instances: for iou_id in self._iou_instances:
iou_instance = self._iou_instances[iou_id] iou_instance = self._iou_instances[iou_id]
if iou_instance.started and not iou_instance.is_running(): if iou_instance.started and (not iou_instance.is_running() or not iou_instance.is_iouyap_running()):
self.send_notification({"module": self.name, notification = {"module": self.name,
"id": iou_id, "id": iou_id,
"name": iou_instance.name, "name": iou_instance.name}
"message": "IOU is not running"}) if not iou_instance.is_running():
stdout = iou_instance.read_iou_stdout()
notification["message"] = "IOU has stopped running"
notification["details"] = stdout
self.send_notification("{}.iou_stopped".format(self.name), notification)
elif not iou_instance.is_iouyap_running():
stdout = iou_instance.read_iouyap_stdout()
notification["message"] = "iouyap has stopped running"
notification["details"] = stdout
self.send_notification("{}.iouyap_stopped".format(self.name), notification)
iou_instance.stop() iou_instance.stop()
@IModule.route("iou.reset") @IModule.route("iou.reset")

View File

@ -77,7 +77,8 @@ class IOUDevice(object):
self._command = [] self._command = []
self._process = None self._process = None
self._iouyap_process = None self._iouyap_process = None
self._stdout_file = "" self._iou_stdout_file = ""
self._iouyap_stdout_file = ""
self._ioucon_thead = None self._ioucon_thead = None
self._ioucon_thread_stop_event = None self._ioucon_thread_stop_event = None
self._host = host self._host = host
@ -391,9 +392,9 @@ class IOUDevice(object):
self._update_iouyap_config() self._update_iouyap_config()
command = [self._iouyap, str(self._id + 512)] # iouyap has always IOU ID + 512 command = [self._iouyap, str(self._id + 512)] # iouyap has always IOU ID + 512
log.info("starting iouyap: {}".format(command)) log.info("starting iouyap: {}".format(command))
self._stdout_file = os.path.join(self._working_dir, "iouyap.log") self._iouyap_stdout_file = os.path.join(self._working_dir, "iouyap.log")
log.info("logging to {}".format(self._stdout_file)) log.info("logging to {}".format(self._iouyap_stdout_file))
with open(self._stdout_file, "w") as fd: with open(self._iouyap_stdout_file, "w") as fd:
self._iouyap_process = subprocess.Popen(command, self._iouyap_process = subprocess.Popen(command,
stdout=fd, stdout=fd,
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
@ -423,9 +424,9 @@ class IOUDevice(object):
self._command = self._build_command() self._command = self._build_command()
try: try:
log.info("starting IOU: {}".format(self._command)) log.info("starting IOU: {}".format(self._command))
self._stdout_file = os.path.join(self._working_dir, "iou.log") self._iou_stdout_file = os.path.join(self._working_dir, "iou.log")
log.info("logging to {}".format(self._stdout_file)) log.info("logging to {}".format(self._iou_stdout_file))
with open(self._stdout_file, "w") as fd: with open(self._iou_stdout_file, "w") as fd:
self._process = subprocess.Popen(self._command, self._process = subprocess.Popen(self._command,
stdout=fd, stdout=fd,
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
@ -481,19 +482,34 @@ class IOUDevice(object):
self._id)) self._id))
self._iouyap_process = None self._iouyap_process = None
def read_stdout(self): def read_iou_stdout(self):
""" """
Reads the standard output of the IOU process. Reads the standard output of the IOU process.
Only use when the process has been stopped or has crashed. Only use when the process has been stopped or has crashed.
""" """
output = "" output = ""
if self._stdout_file: if self._iou_stdout_file:
try: try:
with open(self._stdout_file) as file: with open(self._iou_stdout_file) as file:
output = file.read() output = file.read()
except EnvironmentError as e: except EnvironmentError as e:
log.warn("could not read {}: {}".format(self._stdout_file, e)) log.warn("could not read {}: {}".format(self._iou_stdout_file, e))
return output
def read_iouyap_stdout(self):
"""
Reads the standard output of the iouyap process.
Only use when the process has been stopped or has crashed.
"""
output = ""
if self._iouyap_stdout_file:
try:
with open(self._iouyap_stdout_file) as file:
output = file.read()
except EnvironmentError as e:
log.warn("could not read {}: {}".format(self._iouyap_stdout_file, e))
return output return output
def is_running(self): def is_running(self):