import random from django.conf import settings from django.http import HttpResponse import json from threading import Thread import threading import copy import datetime from monitorT import const as mon_t_cont from tools.parser import Parser import importlib import time import shutil import ctypes libc = ctypes.cdll.LoadLibrary('libc.so.6') class KillingThread(Thread): def cancel(self): exec = ctypes.py_object(SystemExit) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self.ident), exec) if res == 0: print("thread not found!") elif res > 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self.ident), None) class TaskList(object): _ID_LENGTH = 10 def __init__(self): """ Clase que gestiona la creación y control de tareas (Tasks). Solo se crea un objeto de esta clase, MASTER_TASK_LIST que se guarda en la configuración de django (django.conf.settings). Cada vez que el servidor de django se reinicia se borra MASTER_TASK_LIST y con ello cualquier información sobre las tareas en ejecución. """ self.task_list = [] self.task_dict = {} self.task_concat_list = [] @staticmethod def _url_to_method(url): func_address = url.split('/') if not func_address[0]: func_address = func_address[1:] func_address.insert(-1, 'calls') func_address = '.'.join(func_address) module = importlib.import_module(func_address) try: task_custom = module.TaskCustom except AttributeError: task_custom = None return module.post, task_custom def run_into_task(self, user, url, parameters, task_progress=None, scale_progress=1, task_description=None): """ Running a server method by an url into a task and monitor it. Parameters ---------- user: str User ID url: str Url to get the function parameters: dict Dictionary with the parameters of the method. Keys as argument names and values as argument values. task_progress scale_progress task_description Returns ------- The result of the task executed """ func, task_cls = self._url_to_method(url) task_obj = self.add_task(user, func, parameters, task_cls, parser='None') t = KillingThread(target=task_obj.run, name=task_obj.task_id) t.start() status = Task.KEY_INPROGRESS while t.is_alive(): if task_description is not None: task_description = task_obj.status_description if task_progress is not None: task_progress(task_obj.progress/scale_progress) time.sleep(1) else: if status == Task.KEY_ERROR: raise Exception(task_obj.status_description) result = task_obj.result return result def create_id(self): """ Crea una string aleatoria de longitud `Task.ID_LENGTH` a partir de `possible`. Existen len(`possible`)*`Task.ID_LENGTH` (3.40506e^16) combinaciones. Returns ------- string: str String id aleatoria """ possible = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"#$¿&/=¡-_+<>' string = ''.join(random.sample(possible, self._ID_LENGTH)) return string def add_task(self, user, func, func_args, task_custom=None, parser='default'): """ Añade una variable task a la lista MASTER Parameters ---------- user: str Id de usuario func: function Función a ejecutar en la tarea func_args: dict Diccionario que contiene los parámetros de la función ´func´ task_custom: class Task, optional Subclase de la clase Task con comportamiento personalizado parser: class Parser, optional Subclase de la clase Parser. Returns ------- task: Task Created task """ while True: task_id = self.create_id() if task_id not in self.task_list: break if task_custom: task = task_custom(user, task_id, func, func_args, parser) else: task = Task(user, task_id, func, func_args, parser) self.task_list.append(task_id) self.task_dict[task_id] = task return task def search_by_user(self, user): """ Buscar todas las tareas por id de usuario Parameters ---------- user: str Id de usuario Returns ------- tasks: list of Task Todas las tareas para el usuario `user` """ tasks = [] for task in self.task_dict.copy(): if self.task_dict[task].user == user: tasks.append(self.task_dict[task]) return tasks def search_by_id(self, user, task_id): """ Buscar tareas por `task_id` y id de usuario. Parameters ---------- user: str Id de usuario task_id: str Id de tarea Returns ------- task: Task or False Tarea para el usuario `user` y con la id `task_id` """ if self.task_dict[task_id].user == user: return self.task_dict[task_id] else: return False def remove_task(self, task_id): """ Eliminar tarea Parameters ---------- task_id: str Id de task """ self.task_list.remove(task_id) del self.task_dict[task_id] def get_result(self, user, task_id): """ Método para obtener el resultado de una tarea. Si el resultado se pide cuando la variable STATUS de la tarea tiene el valor INPROGRESS o PAUSED se retornará el propio STATUS y un False. Si el resultado se pide cuando la variable STATUS de la tarea es DONE se retornará el resultado. Si el estatus es ERROR o CANCEL se retornará un diccionario vacío. Parameters ---------- user: str Id de usuario task_id: str Id de tarea Returns ------- result: dict or list Diccionario o lista de resultado o errores """ task = self.search_by_id(user, task_id) if task: if task.STATUS == task.KEY_DONE or task.STATUS == task.KEY_ERROR or task.STATUS == task.KEY_CANCEL: result = copy.deepcopy(task.result) self.remove_task(task_id) else: result = ["Status {}".format(task.STATUS), False] else: result = ["Wrong id or user", False] return result def as_decorator(self, func_and_class): """ Función para utlizar la utilidad de tareas como un decorador sobre una view de Django. Parameters ---------- func_and_class: list of function and class Task Función a ejecutar y clase de Task Returns ------- decor_func: function Función decorada """ func, class_task = func_and_class if not class_task: class_task = Task def decor_func(*args, **kwargs): data = args[0].body.decode('utf8') received_json_data = json.loads(u'{}'.format(data)) user = 'Anonymous' if 'user' in received_json_data.keys(): user = received_json_data['user'] task_obj = self.add_task(user, func, {'request': args[0]}, class_task) t = KillingThread(target=task_obj.run, name=task_obj.task_id) t.start() if task_obj.CONTROL: result = {'task_id': task_obj.task_id} else: t.join() result = task_obj.result settings.MASTER_TASK_LIST.remove_task(task_obj.task_id) json_string = json.dumps(result) return HttpResponse(json_string) return decor_func class Task(object): PROGRESS = 0 CONTROL = False STATUS = 'STOPPED' KEY_ERROR = mon_t_cont.ERROR_KEY KEY_INPROGRESS = mon_t_cont.INPROGRESS_KEY KEY_DONE = mon_t_cont.DONE_KEY KEY_PAUSE = mon_t_cont.PAUSE_KEY KEY_CANCEL = mon_t_cont.CANCEL_KEY TYPE = '' """ Attributes ---------- PROGESS: float Progreso en tanto por ciento CONTROL: bool Parámetro para controlar si el método devuelve una id de tarea o espera al resultado. STATUS: str Estado de la tarea. KEY_ERROR: str Clave para indicar el error en la tarea KEY_INPROGRESS: str Clave para indicar que la tarea se encuentra en progreso. KEY_DONE: str Clave para indicar que la tarea ha acabado. KEY_PAUSE: str Clave para indicar que la tarea está pausada. KEY_CANCEL: str Clave para indicar que la tarea ha sido cancelada. TYPE: str Tipo de tarea. """ def __init__(self, user, task_id, func, func_args, parser): """ Clase para controlar el estado de una tarea. Parameters ---------- user: str Id de usuario task_id: str Id de tarea func: function Función a ejecutar func_args: dict Parámetros que pasar a la función. """ self.user = user self.task_id = task_id self.func = func self.func_args = func_args self.progress = 0 # % self.description = "" self.status_description = "" self.result = {} self.create_at = "" self.finished_at = "" self.parser = Parser.get_parser(parser)(self, self.user, self.func, self.func_args) self.run_thread = KillingThread() self.residuals = [] @classmethod def as_decorator(cls, func): """ Función para poder usar la clases Task personalizdas como un decorador Parameters ---------- func: function Función a decorar Returns ------- result: list of function and class Task """ def custom_func(task, request): return func(task, request) return [custom_func, cls] def set_status(self, status_key): """ Cambiar estado de tarea Parameters ---------- status_key: KEY_ERROR or KEY_INPROGRESS or KEY_DONE or KEY_PAUSE or KEY_PAUSE or KEY_CANCEL """ self.STATUS = status_key def set_progress(self, progress): """ Método para cambiar el progreso de la tarea Parameters ---------- progress: float Progreso en tanto por ciento Returns ------- """ self.progress = progress return self.progress def get_info(self): """ Método para obtener información sobre la tarea Returns ------- result: dict Diccionario con los parámetros de la tarea """ result = {"status": self.STATUS, "progress": self.progress, "description": self.status_description} return result def _run(self): """ Método privado para ejecutar la tarea """ self.run_thread = threading.current_thread() try: self.create_at = datetime.datetime.now().isoformat() self.set_status(self.KEY_INPROGRESS) self.set_up() self.result = super(Task, self).__getattribute__('run')() self.finished_at = datetime.datetime.now().isoformat() if self.STATUS not in [self.KEY_CANCEL, self.KEY_ERROR]: self.set_status(self.KEY_DONE) self.set_progress(100) self.status_description = 'Finished' except Exception as e: self.result = e.__str__() self.status_description = "Error: {}".format(e) self.set_status(self.KEY_ERROR) self.finished_at = datetime.datetime.now().isoformat() if settings.DEBUG: raise e for residual in self.residuals: shutil.rmtree(residual, ignore_errors=True) def _start(self): """ Método privado para empezar la tarea """ try: super(Task, self).__getattribute__('start')() self.set_status(self.KEY_INPROGRESS) except NotImplementedError as not_implemented: if settings.DEBUG: raise not_implemented else: return ['Error', False] except Exception as e: self.status_description = "Error: {}".format(e) self.set_status(self.KEY_ERROR) if settings.DEBUG: raise e def _pause(self): """ Método privado para pausar la tarea """ try: self.set_status(self.KEY_PAUSE) super(Task, self).__getattribute__('pause')() except NotImplementedError as not_implemented: if settings.DEBUG: raise not_implemented else: return ['Error', False] except Exception as e: self.status_description = "{}".format(e) self.set_status(self.KEY_ERROR) if settings.DEBUG: raise e def _cancel(self): """ Método privado para cancelar la tarea """ try: super(Task, self).__getattribute__('cancel')() self.set_status(self.KEY_CANCEL) self.run_thread.cancel() for residual in self.residuals: shutil.rmtree(residual, ignore_errors=True) except NotImplementedError as not_implemented: if settings.DEBUG: raise not_implemented else: return ['Error', False] except Exception as e: self.status_description = "Error: {}".format(e) self.set_status(self.KEY_ERROR) if settings.DEBUG: raise e def run(self): """ Método para ejecutar la tarea de forma personalizada """ self.parser.parse_in() try: result = self.func(self, **self.func_args) except: result = self.func(**self.func_args) parsed_result = self.parser.parse_out(result) return parsed_result def set_up(self): """ Método para inicializar variables adicionales necesarias. Se ejecuta justo antes de la función run. """ pass def start(self): """ Método para empezar la tarea. Si no se implementa en una subclase no se dispondrá de la utilidad de empezar la tarea. """ raise NotImplementedError('Function to start not implemented') def pause(self): """ Método para pausar la tarea. Si no se implementa en una subclase no se dispondrá de la utlidad de pausar la tarea. """ raise NotImplementedError('Function to pause not implemented') def cancel(self): """ Método para cancelar la tarea. Si no se implementa en una subclase no se dispondrá de la utlidad de cancelar la tarea. """ raise NotImplementedError('Function to cancel not implemented') def __getattribute__(self, item): if item in ['start', 'pause', 'run', 'cancel']: return super(Task, self).__getattribute__('_{}'.format(item)) else: return super(Task, self).__getattribute__(item)