from monitorT.task import Task import time from ..iGraph.nodes import Node import datetime import networkx as nx from tools.layer import Layer, TmpLayer, OutLayer, NonLocalLayer, NonLocalOutLayer from tools.protocols.protocol import Protocol import importlib from django.conf import settings from monitorT.task import KillingThread import string import numpy as np import random class ITaskerNode(Node): INFO_REFRESH = 0.5 # seconds def __init__(self, node_id, parent_graph, **kwargs): self.user = parent_graph.user super().__init__(node_id, parent_graph, **kwargs) def report_finish(self, info_dict): info_dict[self.id]['finishedAt'] = datetime.datetime.now().isoformat() info_dict[self.id]['description'] = '"{}" node finished without errors'.format(self.id) info_dict[self.id]['progress'] = 100 info_dict[self.id]['status'] = Task.KEY_DONE def report_error(self, info_dict, error_str=""): info_dict[self.id]['finishedAt'] = datetime.datetime.now().isoformat() info_dict[self.id]['description'] = 'Node {}: ERROR => {}'.format(self.id, error_str) info_dict[self.id]['status'] = Task.KEY_ERROR def run_into_task(self, info_dict, func, parameters, task_cls=None): task_obj = settings.MASTER_TASK_LIST.add_task(self.user, func, parameters, task_cls, parser='None') t = KillingThread(target=task_obj.run, name=task_obj.task_id) t.start() status = Task.KEY_INPROGRESS while t.is_alive(): time.sleep(self.INFO_REFRESH) info_dict[self.id]['progress'] = task_obj.progress info_dict[self.id]['status'] = task_obj.STATUS info_dict[self.id]['description'] = task_obj.status_description status = task_obj.STATUS else: if status == Task.KEY_ERROR: info_dict[self.id]['description'] = task_obj.status_description raise Exception(task_obj.status_description) result = task_obj.result return result class StemNode(ITaskerNode): @classmethod def parse(cls, json, parent_graph): json_copy = json.copy() node_id = json_copy['id'] del json_copy['id'] node = cls(node_id, parent_graph, **json_copy) return node class ConnectionNode(ITaskerNode): ARG_NAME_KEY = 'argName' ARG_NAME_KEY_VALUE_SEP = ':' ARG_NAME_SUBTASK_SEP = ';' def __init__(self, node_id, parent_graph, node_json): super().__init__(node_id, parent_graph, **node_json) self.id = node_id self.param_id = { key_value.split(self.ARG_NAME_KEY_VALUE_SEP)[0]: key_value.split(self.ARG_NAME_KEY_VALUE_SEP)[1] for key_value in self.params_dict[self.ARG_NAME_KEY].replace(' ', '').split(self.ARG_NAME_SUBTASK_SEP) } self.parent_graph = parent_graph self.result = None self.backward = {} self.forward = {} if 'layer' in self.params_dict.keys() and self.params_dict['layer']: self.layer = self.create_layer(self.user, self.create_connection_params()) self.container = {} def run(self, task_list, info_dict, **kwargs): info_dict[self.id]['startedAt'] = datetime.datetime.now().isoformat() # Default behaviour, returns itself self.report_finish(info_dict) return self.layer def collect_parameters(self): super().collect_parameters() ancestors = nx.ancestors(self.parent_graph.nx_graph, self.id) result_list = [] for ancestor in ancestors: try: length = nx.shortest_path_length(self.parent_graph.nx_graph, ancestor, self.id) if length > 1: continue except nx.NetworkXNoPath: continue result_list.append(self.parent_graph.nodes[ancestor].result) result_list = result_list[0] if len(result_list) == 1 else result_list self.backward.update({'layer_result': result_list}) def create_connection_params(self): return None def create_layer(self, user, parameters): return None @staticmethod def random_string(length=10): return ''.join(np.random.choice(list(string.ascii_letters), size=length)) class ConnectionOnlyOneNode(ConnectionNode): @staticmethod def _add_map_server_element(source_id, name, mse_type, url): mse_element = {'sourceId': source_id, 'name': name, 'type': mse_type, 'url': url} return mse_element def create_connection_params(self): required_values = {'autorefresh': 'autorefresh', 'mapServerElementType': 'options', 'refreshLatency': 'refreshLatency', 'createMapServerElement': 'is_map_server_element'} protocol = Protocol.get_from_dict(self.params_dict) parsed_parameters = protocol.parse_parameters(self.params_dict, **required_values) return parsed_parameters def create_layer(self, user, parameters): layer = Layer(user, parameters) return layer def run(self, task_list, info_dict, **kwargs): info_dict[self.id]['startedAt'] = datetime.datetime.now().isoformat() # TODO: Change to run function inside tasks, avoiding try-except try: self.layer.to_local() self.layer.attach_sources([self.layer['table_view_name']]) result = self.layer.publish(self.layer['autorefresh'], self.layer['refreshLatency']) except Exception as e: self.report_error(info_dict, e.__str__()) raise e mse_list = [] for url, name in zip(result['url'], result['layer']): mse_element = self._add_map_server_element(self.id, name, self.layer['options'], url) mse_list.append(mse_element) self.container['mse'] = mse_list self.report_finish(info_dict) return self.layer class ConnectionNodeRoot(ConnectionNode): def create_connection_params(self): required_values = {} protocol = Protocol.get_from_dict(self.params_dict) parsed_parameters = protocol.parse_parameters(self.params_dict, **required_values) return parsed_parameters def create_layer(self, user, parameters): layer = Layer(user, parameters) return layer def run(self, task_list, info_dict, **kwargs): info_dict[self.id]['startedAt'] = datetime.datetime.now().isoformat() # TODO: Change to run function inside tasks, avoiding try-except try: self.layer.to_local() except Exception as e: self.report_error(info_dict, e.__str__()) raise e self.report_finish(info_dict) return self.layer class ConnectionNodeNonLocalRoot(ConnectionNode): def create_connection_params(self): required_values = {} protocol = Protocol.get_from_dict(self.params_dict) parsed_parameters = protocol.parse_parameters(self.params_dict, **required_values) return parsed_parameters def create_layer(self, user, parameters): layer = NonLocalLayer(user, parameters) return layer def run(self, task_list, info_dict, **kwargs): info_dict[self.id]['startedAt'] = datetime.datetime.now().isoformat() # TODO: Change to run function inside tasks, avoiding try-except try: self.layer.to_local() except Exception as e: self.report_error(info_dict, e.__str__()) raise e self.report_finish(info_dict) return self.layer class ConnectionNodeTemp(ConnectionNode): def create_connection_params(self): return {} def create_layer(self, user, parameters): layer = TmpLayer(user, parameters) return layer class ConnectionNodeLeave(ConnectionNode): @staticmethod def _add_map_server_element(source_id, name, mse_type, url): mse_element = {'sourceId': source_id, 'name': name, 'type': mse_type, 'url': url} return mse_element def create_connection_params(self): # With the new version of connection and source the outputs nodes has not layer name # so the name of the source is used required_values = {'autorefresh': 'autorefresh', 'mapServerElementType': 'options', 'refreshLatency': 'refreshLatency', 'createMapServerElement': 'is_map_server_element', 'name': 'name'} protocol = Protocol.get_from_dict(self.params_dict) parsed_parameters = protocol.parse_parameters(self.params_dict, **required_values) return parsed_parameters def create_layer(self, user, parameters): name = parameters["name"] if not name: name = self.random_string() parameters['table_view_name'] = name # For PostgreSQL protocol parameters['layer_name'] = name # For other protocols layer = OutLayer(user, parameters) return layer def run(self, task_list, info_dict, **kwargs): info_dict[self.id]['startedAt'] = datetime.datetime.now().isoformat() # Only load default local parameters # self.layer.to_local() # TODO: Change to run function inside tasks, avoiding try-except try: # Upload files self.layer.to_cloud() if self.layer['is_map_server_element']: result = self.layer.publish(self.layer['autorefresh'], self.layer['refreshLatency']) mse_list = [] for url, name in zip(result['url'], result['layer']): mse_element = self._add_map_server_element(self.id, name, self.layer['options'], url) mse_list.append(mse_element) self.container['mse'] = mse_list result = self.layer else: result = self.layer except Exception as e: self.report_error(info_dict, e.__str__()) raise e self.report_finish(info_dict) return result class ConnectionNodeNonLocalLeave(ConnectionNode): @staticmethod def _add_map_server_element(source_id, name, mse_type, url): mse_element = {'sourceId': source_id, 'name': name, 'type': mse_type, 'url': url} return mse_element def create_connection_params(self): # With the new version of connection and source the outputs nodes has not layer name # so the name of the source is used required_values = {'autorefresh': 'autorefresh', 'mapServerElementType': 'options', 'refreshLatency': 'refreshLatency', 'createMapServerElement': 'is_map_server_element', 'name': 'name'} protocol = Protocol.get_from_dict(self.params_dict) parsed_parameters = protocol.parse_parameters(self.params_dict, **required_values) return parsed_parameters def create_layer(self, user, parameters): name = parameters["name"] if not name: name = self.random_string() parameters['table_view_name'] = name # For PostgreSQL protocol parameters['layer_name'] = name # For other protocols layer = NonLocalOutLayer(user, parameters) return layer def run(self, task_list, info_dict, **kwargs): info_dict[self.id]['startedAt'] = datetime.datetime.now().isoformat() # Only load default local parameters # self.layer.to_local() # TODO: Change to run function inside tasks, avoiding try-except try: # Upload files self.layer.to_cloud() if self.layer['is_map_server_element']: result = self.layer.publish(self.layer['autorefresh'], self.layer['refreshLatency']) mse_list = [] for url, name in zip(result['url'], result['layer']): mse_element = self._add_map_server_element(self.id, name, self.layer['options'], url) mse_list.append(mse_element) self.container['mse'] = mse_list result = self.layer else: result = self.layer except Exception as e: self.report_error(info_dict, e.__str__()) raise e self.report_finish(info_dict) return result class ConnectionNodeLinker(ConnectionNode): @staticmethod def _add_map_server_element(source_id, name, mse_type, url): mse_element = {'sourceId': source_id, 'name': name, 'type': mse_type, 'url': url} return mse_element def create_connection_params(self): required_values = {'autorefresh': 'autorefresh', 'mapServerElementType': 'options', 'refreshLatency': 'refreshLatency', 'createMapServerElement': 'is_map_server_element'} protocol = Protocol.get_from_dict(self.params_dict) parsed_parameters = protocol.parse_parameters(self.params_dict, **required_values) return parsed_parameters def create_layer(self, user, parameters): layer = OutLayer(user, parameters) return layer def run(self, task_list, info_dict, **kwargs): info_dict[self.id]['startedAt'] = datetime.datetime.now().isoformat() if 'layer_result' in self.params_dict: layer_result = self.params_dict['layer_result'] else: layer_result = self.layer # TODO: Change to run function inside tasks, avoiding try-except try: layer_result.to_cloud() if layer_result['is_map_server_element']: result = layer_result.publish(layer_result['autorefresh'], layer_result['refreshLatency']) mse_list = [] for url, name in zip(result['url'], result['layer']): mse_element = self._add_map_server_element(self.id, name, layer_result['options'], url) mse_list.append(mse_element) self.container['mse'] = mse_list result = layer_result else: result = layer_result except Exception as e: self.report_error(info_dict, e.__str__()) raise e self.report_finish(info_dict) return result class SubTaskNode(ITaskerNode): METHOD_URL_KEY = 'methodUrl' def __init__(self, node_id, param_id, parent_graph, url, params_dict): super().__init__(node_id, parent_graph, **params_dict) self.url = url self.params_dict = params_dict.copy() self.result = None self.backward = {} self.forward = {} self.param_id = param_id self.func, self.func_task_cls = self._url_to_method(self.url) @staticmethod def _url_to_method(url): func_address = url.split('/') if not func_address[0]: func_address = func_address[1:] func_address.insert(-1, 'calls') func_address = '.'.join(func_address) module = importlib.import_module(func_address) try: task_custom = module.TaskCustom except AttributeError: task_custom = None return module.post, task_custom @classmethod def parse(cls, node_json, parent_graph): url = node_json[cls.METHOD_URL_KEY] node_id = node_json["id"] params_dict = {} for param in node_json['paramList']: try: # Pass to the function the default value of the parameters if param['value'] is None: continue ########################################################### params_dict[param['id']] = param['value'] except KeyError: # Pass to the function the default value of the parameters continue ########################################################### node_obj = cls(node_id=node_id, param_id={"layer_result": ""}, parent_graph=parent_graph, url=url, params_dict=params_dict) return node_obj def collect_parameters(self): super().collect_parameters() successors = self.parent_graph.nx_graph.successors(self.id) for predecessor in successors: if self.id in self.parent_graph.nodes[predecessor].param_id.keys(): self.forward[self.parent_graph.nodes[predecessor].param_id[self.id]] = \ self.parent_graph.nodes[predecessor].layer self.params_dict.update(self.forward) def run(self, task_list, info_dict, multi_thread=False): info_dict[self.id]['startedAt'] = datetime.datetime.now().isoformat() parameters = self.params_dict.copy() parameters.update({'user': self.user}) result = self.run_into_task(info_dict, self.func, parameters, task_cls=self.func_task_cls) self.report_finish(info_dict) return result