""" Module that contain all the parsers. To know more about the parser go to service documentation. """ import json import inspect from tools.layer import MasterLayer, Layer, OutLayer, NonLocalOutLayer, \ NonLocalLayer class Parser: """ Master class for all the parsers. All the parser must be inherited from this class. Has common methods like get_parser and check. Attributes ---------- __allowed_parsers: list List of string with the allowed parsers. Parameters ---------- task: monitorT.task.Task Task object which the instance of the parser belongs to. usr: str User who launch the task func: obj Function object to be executed. func_args: dict `func` function parameters ina dict like form. Raises ------ NotAllowedParser: The selected parses could not be found NotAllParameterDefined: There are some required parameters in the executing function that are not defined. """ class Parser(object): _allowed_parsers = ['default', 'itasker', 'None', 'NonBlocking', 'Nonlocal', 'Sources', 'Nonclear'] not_allowed_parser = type('NotAllowedParser', (Exception, ), {}) not_all_parameter_defined = type('NotAllParameterDefined', (Exception, ), {}) def __init__(self, task, usr, func, func_args): self.task = task self.user = usr self.func = func self.func_args = func_args self.data_container = {} @classmethod def get_parser(cls, parser_type): """ Get parser class by name Parameters ---------- parser_type: str Name of the parser Returns ------- Parser: Class inherited from Parse that corresponds with `parser_type` Raises ------ NotAllowedParser: The parser `parser_type` could not be found in __allowed_parsers """ if parser_type not in cls._allowed_parsers: raise cls.not_allowed_parser('Parser "{}" not allowed'.format(parser_type)) parser = [subclass for subclass in cls.__subclasses__() if subclass.__name__ == '{}Parser'.format(parser_type.title())][0] return parser def check(self, *exceptions): """ Check parameters required by `func` except `exceptions` Parameters ---------- exceptions: list List of arguments that wanna be excepted. Raises ------ NotAllParameterDefined: There are some argument in `func` that is required but not exist in `func_args` """ sign = inspect.signature(self.func) sign_parameters = list(sign.parameters.items()) check_attributes = [key_arg[0] in self.func_args for key_arg in sign_parameters if key_arg[0] not in exceptions] if not all(check_attributes): wrong_parameters = [] for arg, check in zip(sign_parameters, check_attributes): if not check: wrong_parameters.append(arg[0]) raise self.not_all_parameter_defined('Parameters "{}" not found' .format('" & "'.join(wrong_parameters))) def parse_in(self): """ This method must be implemented in all subclassed parsers """ raise NotImplementedError def parse_out(self, result): """ This method must be implemented in all subclassed parsers """ raise NotImplementedError class NoneParser(Parser): """ None parser. Not download the sources and check anything. Only performs a type casting between the type introduced and the __annotations__ type. """ def parse_in(self): """ Transforms the type in `func_args` parameters to the type specified in the `func` function annotations. The result is saved to `func_args` """ sign = inspect.signature(self.func) other_parameters = [sign_parameter for sign_parameter in self.func_args.keys() if sign_parameter not in ['task', 'user'] and sign_parameter in sign.parameters.keys() and not(sign.parameters[sign_parameter].annotation is inspect.Parameter.empty) and not issubclass(sign.parameters[sign_parameter].annotation.__class__, MasterLayer) ] for other in other_parameters: if isinstance(self.func_args[other], str) and not \ self.func.__annotations__[other] == str: self.func_args[other] = \ self.func.__annotations__[other](json.loads(self.func_args[other])) def parse_out(self, result): """ Return the result returned by the execution of `func` Parameters ---------- result: Any Result of the execution of `func` with `func_args` arguments """ return result class DefaultParser(Parser): """ This parser would be the default parser in the case that any parser will be specified. Check the parameter without type casting and executes layer.to_local and layer.to_cloud in all arguments inherited from MasterLayer. """ def parse_in(self): """ Change all the layer configuration to local configuration (layer.to_local) """ annotations = self.func.__annotations__ self.check('task') self.task.status_description = 'Downloading root sources' for arg in annotations: if arg in self.func_args.keys() and (annotations[arg] == Layer or annotations[arg] == OutLayer or annotations[arg] == NonLocalOutLayer or annotations[arg] == NonLocalLayer): layer_cls = MasterLayer.set_layer_from_type(annotations[arg].LAYER_TYPE) layer = layer_cls(self.user, self.func_args[arg]) layer.to_local() self.func_args[arg] = layer def parse_out(self, result): """ Execute the method to_cloud to all returned layers. And clean the layers and the process Parameters ---------- result: Any Result of the execution of `func` with `func_args` arguments Returns ------- list: The result after to_cloud operations. """ result = [result] parsed_result = [] self.task.status_description = 'Uploading results' for res in result: json_res = res if issubclass(res.__class__, MasterLayer): json_res = res.to_cloud() parsed_result.append(json_res) # Cleaning process self.task.status_description = 'Cleaning results' for func_arg in self.func_args: if issubclass(self.func_args[func_arg].__class__, MasterLayer): layer = self.func_args[func_arg] layer.clean_residuals() layer.clean_layer() parsed_result = parsed_result[0] if len(parsed_result) == 1 else parsed_result return parsed_result class SourcesParser(Parser): """ Parser used to the new schema of sources and connections. This parser is equal to default but not execute the clean process. """ def parse_in(self): annotations = self.func.__annotations__ self.check('task') self.task.status_description = 'Downloading root sources' for arg in annotations: if arg in self.func_args.keys() and (annotations[arg] == Layer or annotations[arg] == OutLayer or annotations[arg] == NonLocalOutLayer or annotations[arg] == NonLocalLayer): layer_cls = MasterLayer.set_layer_from_type(annotations[arg].LAYER_TYPE) layer = layer_cls(self.user, self.func_args[arg]) layer.to_local() self.func_args[arg] = layer def parse_out(self, result): result = [result] parsed_result = [] self.task.status_description = 'Uploading results' for res in result: json_res = res if issubclass(res.__class__, MasterLayer): json_res = res.to_cloud() parsed_result.append(json_res) parsed_result = parsed_result[0] if len(parsed_result) == 1 else parsed_result return parsed_result class NonlocalParser(Parser): """ This parser is used when is not wanted to execute the change to local parameters of layers. The output process is the same as DefaultParser """ def parse_in(self): annotations = self.func.__annotations__ self.check('task') self.task.status_description = 'Downloading root sources' for arg in annotations: if arg in self.func_args.keys() and (annotations[arg] == Layer or annotations[arg] == OutLayer or annotations[arg] == NonLocalOutLayer or annotations[arg] == NonLocalLayer): layer_cls = MasterLayer.set_layer_from_type(annotations[arg].LAYER_TYPE) layer = layer_cls(self.user, self.func_args[arg]) self.func_args[arg] = layer def parse_out(self, result): result = [result] parsed_result = [] self.task.status_description = 'Uploading results' for res in result: json_res = res if issubclass(res.__class__, MasterLayer): json_res = res.to_cloud() parsed_result.append(json_res) # Cleaning process self.task.status_description = 'Cleaning results' for func_arg in self.func_args: if issubclass(self.func_args[func_arg].__class__, MasterLayer): layer = self.func_args[func_arg] layer.clean_residuals() layer.clean_layer() parsed_result = parsed_result[0] if len(parsed_result) == 1 else parsed_result return parsed_result class NonclearParser(Parser): def parse_in(self): annotations = self.func.__annotations__ self.check('task') self.task.status_description = 'Downloading root sources' for arg in annotations: if arg in self.func_args.keys() and (annotations[arg] == Layer or annotations[arg] == OutLayer or annotations[arg] == NonLocalOutLayer or annotations[arg] == NonLocalLayer): layer_cls = MasterLayer.set_layer_from_type(annotations[arg].LAYER_TYPE) layer = layer_cls(self.user, self.func_args[arg]) self.func_args[arg] = layer def parse_out(self, result): # TEMPORAL fix # if not isinstance(result, (list, dict)): # result = [result] ######################## result = [result] ######################## parsed_result = [] self.task.status_description = 'Uploading results' for res in result: json_res = res if issubclass(res.__class__, MasterLayer): json_res = res.to_cloud() parsed_result.append(json_res) # Cleaning process self.task.status_description = 'Cleaning results' for func_arg in self.func_args: if issubclass(self.func_args[func_arg].__class__, MasterLayer): layer = self.func_args[func_arg] parsed_result = parsed_result[0] if len(parsed_result) == 1 else parsed_result return parsed_result class ItaskerParser(Parser): """ Parser only used for ITasker. It does anything. """ def parse_in(self): self.check('task', 'task_id') @staticmethod def parse_out(result): return result class NonblockingParser(Parser): """ Parser to check the parameter but without blocking the execution. """ def parse_in(self): exceptions = ['task'] sign = inspect.signature(self.func) sign_parameters = list(sign.parameters.items()) for sign_parameter in sign_parameters: if sign_parameter[0] not in exceptions and \ sign_parameter[1].default in \ [inspect.Parameter.empty] and \ sign_parameter[0] not in self.func_args.keys(): raise self.not_all_parameter_defined('Parameter "{}" not found and required' .format(sign_parameter[0])) self.task.status_description = 'Downloading root sources' parameters_layerable = [sign_parameter for sign_parameter in self.func_args.keys() if sign_parameter not in ['task'] and issubclass(sign.parameters[sign_parameter].annotation , MasterLayer) ] for sign_parameter in parameters_layerable: layer_cls = MasterLayer.set_layer_from_type(sign.parameters[sign_parameter] .annotation.LAYER_TYPE) layer = layer_cls(self.user, self.func_args[sign_parameter]) layer.to_local() self.func_args[sign_parameter] = layer def parse_out(self, result): result = [result] parsed_result = [] self.task.status_description = 'Uploading results' for res in result: json_res = res if issubclass(res.__class__, MasterLayer): json_res = res.to_cloud() parsed_result.append(json_res) # Cleaning process self.task.status_description = 'Cleaning results' for func_arg in self.func_args: if issubclass(self.func_args[func_arg].__class__, MasterLayer): layer = self.func_args[func_arg] layer.clean_residuals() layer.clean_layer() parsed_result = parsed_result[0] if len(parsed_result) == 1 else parsed_result return parsed_result