import datetime as dt import json from django.conf import settings from django.http import HttpResponse, HttpResponseBadRequest, StreamingHttpResponse from django.views.decorators.csrf import csrf_exempt from Tierra_2.core.connect import Connector from monitorT.task import Task class CustomTask(Task): CONTROL = True def cancel(self): pass def pause(self): pass """ CONSTANTES """ _MULTI = "MULTI" FORMATO_FECHA = "%Y-%m-%d %H:%M:%S:" SELECT_DATOS_ORIGEN = """ SELECT {columnasOrigen} from "{esquema}"."{tablaOrigen}" WHERE ST_IsValid({geom}) AND {geom} IS NOT NULL {where} """ INSERT_DATOS_DESTINO = """ INSERT INTO "{esquema}"."{tablaDestino}" ({columnasDestino}) VALUES ({valoresDestino}) """ # MÁS ADELANTE SE COMPROBARÁ SI EN LAS COLUMNAS VA INCORPORADO EL GEOMETRY Y EL GID CREATE_TABLE_GENERICO = """ CREATE TABLE IF NOT EXISTS "{esquema}"."{tablaDestino}" ({columnasDestinoCreate}); "ALTER TABLE "{esquema}"."{tablaDestino}" OWNER TO postgres; """ CREATE_GID_GENERICO = """ CREATE SEQUENCE IF NOT EXISTS "{esquema}"."{tablaDestino}_gid_seq" AS integer START WITH 1 INCREMENT BY 1 NO MINVALUE NO MAXVALUE CACHE 1; "ALTER TABLE "{esquema}"."{tablaDestino}_gid_seq" OWNER TO postgres; "ALTER SEQUENCE "{esquema}"."{tablaDestino}_gid_seq" OWNED BY "{esquema}"."{tablaDestino}".gid; "ALTER TABLE ONLY "{esquema}"."{tablaDestino}" ALTER COLUMN gid SET DEFAULT nextval(' {esquema}.{tablaDestino}_gid_seq'::regclass); "ALTER TABLE ONLY "{esquema}"."{tablaDestino}" ADD CONSTRAINT "{tablaDestino}_pkey" PRIMARY KEY (gid); """ CREATE_IDX_GEOM_GENERICO = """ CREATE INDEX IF NOT EXISTS "{tablaDestino}_geometry_idx" ON "{esquema}"."{tablaDestino}" USING gist (geometry);""" def init_datos(): """DATOS COMUNES """ return { "ini": dt.datetime.now(), "fin": None, "duracion": None, "metodo": "", "esquema": "public", "capa_origen": None, "srid": None, "campos": "", "valores": [], "geom": "geom", "tipo_geom": None, # POINT,LINE, POLYGON, ... "resultado": "OK", # OK, ERROR, AVISO "geom_ignorada": [], "mensajes": [], "columnasOrigen": "", "columnasDestino": "" } def add_msg(datos, msg=""): """ Añade un nuevo mensaje a los datos y lo muestra en pantalla """ datos["mensajes"].append((dt.datetime.now().strftime(FORMATO_FECHA) + "::" + msg)) print(datos["mensajes"][len(datos["mensajes"]) - 1]) @csrf_exempt @settings.MASTER_TASK_LIST.as_decorator @CustomTask.as_decorator def origen2destino(task, request=None): """ origen, columnasOrigen, destino, columnasDestino """ datos = init_datos() config = None if request.method == 'POST': try: data = request.body.decode("utf-8") params = json.loads(data) datos_temp = {} # read file try: with open("Tierra_2/core/config_md/" + params["mapeo_md"] + ".json", "r") as file: config_file = file.read() # parse file config = json.loads(config_file) print("JSON de mapeo: Tierra_2/core/config_md/" + params["mapeo_md"] + ".json") except Exception as error: print("Error al abrir JSON de mapeo: " + error.message) raise Exception("Error al abrir JSON de mapeo: " + error.message) connOrigen = Connector() cursorOrigen = connOrigen.openParams(params["connOrigen"]) connDestino = Connector() cursorDestino = connDestino.openParams(params["connDestino"]) inc = int(100 / len(config["mapeos"])) progreso = 0 for mapeo in config["mapeos"]: # Ponemos las variables incrementales a vacio para esta iteración datos["columnasOrigen"] = "" datos["columnasDestino"] = "" datos["campos"] = "" del datos["valores"][:] columnas = mapeo["campos"] for columna in columnas: if "is_geom" in columna and columna["is_geom"] == "True": if columna["campo_orig"].find(" as ") >= 0: datos["geom"] = columna["campo_orig"].split(" as ")[0] else: datos["geom"] = columna["campo_orig"] datos["columnasOrigen"] += columna["campo_orig"] + ", " datos["columnasDestino"] += columna["campo_dest"] + ", " datos["columnasOrigen"] = datos["columnasOrigen"][:-2] datos["columnasDestino"] = datos["columnasDestino"][:-2] try: if "preproceso" in mapeo and len(mapeo["preproceso"]) > 0: for sql in mapeo["preproceso"]: print("PRE PROCESO:{}".format(sql)) cursorOrigen.execute(sql) if (mapeo["datos_temp"] is None or mapeo["datos_temp"] == "") or (mapeo["datos_temp"] is not None and mapeo["datos_temp"] != "" and mapeo["datos_temp"] not in datos_temp): # add_msg(datos, "Datos origen: " + str(len(datos["datos_origen"]))) if mapeo["tabla_orig"][:6] == "SELECT": cursorOrigen.execute(mapeo["tabla_orig"]) else: sqlOrigen = SELECT_DATOS_ORIGEN.format(tablaOrigen=mapeo["tabla_orig"], columnasOrigen=datos["columnasOrigen"], esquema=mapeo["esq_orig"], geom=datos["geom"], where=mapeo["where"]) print(sqlOrigen) cursorOrigen.execute(sqlOrigen) print("DATOS ORIGEN: {}".format(cursorOrigen.query)) datos_origen = cursorOrigen.fetchall() campos_orig = connOrigen.fields(cursorOrigen) if mapeo["datos_temp"] is not None or mapeo["datos_temp"] != "" and mapeo["datos_temp"] is None: datos_temp[mapeo["datos_temp"]] = [datos_origen, campos_orig] elif mapeo["datos_temp"] is not None or mapeo["datos_temp"] != "" and datos_temp[mapeo["datos_temp"] is not None]: print("DATOS ORIGEN CACHE: {}".format(mapeo["datos_temp"])) datos_origen = datos_temp[mapeo["datos_temp"]][0] campos_orig = datos_temp[mapeo["datos_temp"]][1] if len(datos_origen) == 0: # raise Exception("No se encuentra la tabla o no tiene datos") print(mapeo["tabla_orig"] + ' --> TABLA SIN DATOS') for dato in datos_origen: for columna in columnas: if columna["campo_orig"].find(" as ") >= 0: nombreColumna = columna["campo_orig"].split(" as ")[1] else: nombreColumna = columna["campo_orig"] valor = dato[campos_orig[nombreColumna]] if "correspondencia" in columna: valorC = None correspondencias = columna["correspondencia"] for corresp in correspondencias: if valor in corresp: valorC = corresp[valor] valor = valorC if "transformacion" in columna: datos["campos"] = columna["transformacion"].format(valor="%s") + ", " else: datos["campos"] += "%s, " datos["valores"].append(valor) datos["campos"] = datos["campos"][:-2] # Quitar última ", " sql = INSERT_DATOS_DESTINO.format(tablaDestino=mapeo["tabla_dest"], columnasDestino=datos["columnasDestino"], esquema=mapeo["esq_dest"], valoresDestino=datos["campos"]) print("DATOS DESTINO:", sql, datos["valores"]) cursorDestino.execute(sql, datos["valores"]) datos["campos"] = "" del datos["valores"][:] if "postproceso" in mapeo and len(mapeo["postproceso"]) > 0: for sql in mapeo["postproceso"]: print("POST PROCESO:{}".format(sql)) cursorOrigen.execute(sql) if "postprocesodest" in mapeo and len(mapeo["postprocesodest"]) > 0: for sql in mapeo["postprocesodest"]: print("POST PROCESO:{}".format(sql)) cursorDestino.execute(sql) connDestino.commit() print("{} ==> {}: OK".format(mapeo["tabla_orig"], mapeo["tabla_dest"])) except Exception as error: add_msg(datos, "ERROR:: " + str(error)) if connOrigen is not None: connOrigen.rollback() if task is not None: progreso += inc task.set_progress(progreso) status = {'status': 'ok', 'resultado': 'Los datos han sido copiados con éxito'} task.set_progress(100) return HttpResponse(status) except ValueError as e: print('error {}'.format(e)) return HttpResponseBadRequest(e) finally: if connOrigen is not None: connOrigen.close() if connOrigen is not None: connDestino.close() else: return StreamingHttpResponse('it was GET request dtm_tiff_rm') @csrf_exempt @settings.MASTER_TASK_LIST.as_decorator @CustomTask.as_decorator def origen2destinoGenerico(task, request=None): """ origen, columnasOrigen, destino, columnasDestino """ datos = init_datos() if request.method == 'POST': try: data = request.body.decode("utf-8") params = json.loads(data) datos_temp = {} # CREACIÓN DE LAS CONEXIONES connOrigen = Connector() cursorOrigen = connOrigen.openParams(params["connOrigen"]) connDestino = Connector() cursorDestino = connDestino.openParams(params["connDestino"]) inc = int(100 / len(params["mapeos"])) progreso = 0 for mapeo in params["mapeos"]: datos["columnasOrigen"] = "" datos["columnasDestinoCreate"] = "" datos["columnasDestino"] = "" datos["campos"] = "" datos["columnaGid"] = False datos["columnaGeom"] = False del datos["valores"][:] columnas = mapeo["campos"] # OBTENCIÓN DE LAS COLUMNAS PARA CREAR TABLA, LEER DE ORIGEN E INSERTAR EN DESTINO for columna in columnas: if "is_geom" in columna and columna["is_geom"] == "True": datos["geom"] = columna["campo_orig"] datos["columnaGeom"] = True if columna["campo_orig"] != 'gid': datos["columnasOrigen"] += columna["campo_orig"] + ", " datos["columnasDestino"] += columna["campo_dest"] + ", " else: datos["columnaGid"] = True datos["columnasDestinoCreate"] += columna["campo_dest"] + " " + columna["tipo_campo_dest"] + ", " datos["columnasOrigen"] = datos["columnasOrigen"][:-2] datos["columnasDestino"] = datos["columnasDestino"][:-2] datos["columnasDestinoCreate"] = datos["columnasDestinoCreate"][:-2] sqlCreateTable = CREATE_TABLE_GENERICO.format(esquema=mapeo["esq_dest"], tablaDestino=mapeo["tabla_dest"], columnasDestinoCreate=datos["columnasDestinoCreate"]) cursorDestino.execute(sqlCreateTable) if datos["columnaGid"]: sqlCreateGid = CREATE_GID_GENERICO.format(esquema=mapeo["esq_dest"], tablaDestino=mapeo["tabla_dest"]) cursorDestino.execute(sqlCreateGid) if datos["columnaGeom"]: sqlCreateGeom = CREATE_IDX_GEOM_GENERICO.format(esquema=mapeo["esq_dest"], tablaDestino=mapeo["tabla_dest"]) cursorDestino.execute(sqlCreateGeom) sqlOrigen = SELECT_DATOS_ORIGEN.format(tablaOrigen=mapeo["tabla_orig"], columnasOrigen=datos["columnasOrigen"], esquema=mapeo["esq_orig"], geom=datos["geom"], where=mapeo["where"]) print(sqlOrigen) cursorOrigen.execute(sqlOrigen) print("DATOS ORIGEN: {}".format(cursorOrigen.query)) datos_origen = cursorOrigen.fetchall() campos_orig = connOrigen.fields(cursorOrigen) if mapeo["datos_temp"] is not None or mapeo["datos_temp"] != "" and mapeo["datos_temp"] is None: datos_temp[mapeo["datos_temp"]] = [datos_origen, campos_orig] if len(datos_origen) == 0: # raise Exception("No se encuentra la tabla o no tiene datos") print(mapeo["tabla_orig"] + ' --> TABLA SIN DATOS') for dato in datos_origen: for columna in columnas: nombreColumna = columna["campo_orig"] if nombreColumna != 'gid': valor = dato[campos_orig[nombreColumna]] datos["campos"] += "%s, " datos["valores"].append(valor) datos["campos"] = datos["campos"][:-2] # Quitar última ", " sql = INSERT_DATOS_DESTINO.format(tablaDestino=mapeo["tabla_dest"], columnasDestino=datos["columnasDestino"], esquema=mapeo["esq_dest"], valoresDestino=datos["campos"]) print("DATOS DESTINO:", sql, datos["valores"]) cursorDestino.execute(sql, datos["valores"]) datos["campos"] = "" del datos["valores"][:] connDestino.commit() print("{} ==> {}: OK".format(mapeo["tabla_orig"], mapeo["tabla_dest"])) if task is not None: progreso += inc task.set_progress(progreso) status = {'status': 'ok', 'resultado': 'Los datos han sido copiados con éxito'} task.set_progress(100) return HttpResponse(status) inc = int(100 / len(params["mapeos"])) progreso = 0 except ValueError as e: print('error {}'.format(e)) return HttpResponseBadRequest(e) finally: if connOrigen is not None: connOrigen.close() if connOrigen is not None: connDestino.close() else: return StreamingHttpResponse('it was GET request dtm_tiff_rm') @csrf_exempt @settings.MASTER_TASK_LIST.as_decorator @CustomTask.as_decorator def origen2destinoEsquemas(task, request=None): """ origen, columnasOrigen, destino, columnasDestino """ datos = init_datos() config = None if request.method == 'POST': try: data = request.body.decode("utf-8") params = json.loads(data) datos_temp = {} # read file try: with open("Tierra_2/core/config_md/" + params["mapeo_md"] + ".json", "r") as file: config_file = file.read() # parse file config = json.loads(config_file) print("JSON de mapeo: Tierra_2/core/config_md/" + params["mapeo_md"] + ".json") except Exception as error: print("Error al abrir JSON de mapeo: " + error.message) raise Exception("Error al abrir JSON de mapeo: " + error.message) connOrigen = Connector() cursorOrigen = connOrigen.openParams(params["connOrigen"]) connDestino = Connector() cursorDestino = connDestino.openParams(params["connDestino"]) inc = int(100 / len(config["mapeos"])) progreso = 0 for mapeo in config["mapeos"]: # Ponemos las variables incrementales a vacio para esta iteración datos["columnasOrigen"] = "" datos["columnasDestino"] = "" datos["campos"] = "" del datos["valores"][:] # Cambiar el esquema origen y destino mapeo["esq_orig"]=params["schemaOrigen"] mapeo["esq_dest"]=params["schemaDestiny"] columnas = mapeo["campos"] for columna in columnas: if "is_geom" in columna and columna["is_geom"] == "True": if columna["campo_orig"].find(" as ") >= 0: datos["geom"] = columna["campo_orig"].split(" as ")[0] else: datos["geom"] = columna["campo_orig"] datos["columnasOrigen"] += columna["campo_orig"] + ", " datos["columnasDestino"] += columna["campo_dest"] + ", " datos["columnasOrigen"] = datos["columnasOrigen"][:-2] datos["columnasDestino"] = datos["columnasDestino"][:-2] try: if "preproceso" in mapeo and len(mapeo["preproceso"]) > 0: for sql in mapeo["preproceso"]: print("PRE PROCESO:{}".format(sql)) cursorOrigen.execute(sql) if (mapeo["datos_temp"] is None or mapeo["datos_temp"] == "") or ( mapeo["datos_temp"] is not None and mapeo["datos_temp"] != "" and mapeo[ "datos_temp"] not in datos_temp): # add_msg(datos, "Datos origen: " + str(len(datos["datos_origen"]))) if mapeo["tabla_orig"][:6] == "SELECT": cursorOrigen.execute(mapeo["tabla_orig"]) else: sqlOrigen = SELECT_DATOS_ORIGEN.format(tablaOrigen=mapeo["tabla_orig"], columnasOrigen=datos["columnasOrigen"], esquema=mapeo["esq_orig"], geom=datos["geom"], where=mapeo["where"]) print(sqlOrigen) cursorOrigen.execute(sqlOrigen) print("DATOS ORIGEN: {}".format(cursorOrigen.query)) datos_origen = cursorOrigen.fetchall() campos_orig = connOrigen.fields(cursorOrigen) if mapeo["datos_temp"] is not None or mapeo["datos_temp"] != "" and mapeo["datos_temp"] is None: datos_temp[mapeo["datos_temp"]] = [datos_origen, campos_orig] elif mapeo["datos_temp"] is not None or mapeo["datos_temp"] != "" and datos_temp[ mapeo["datos_temp"] is not None]: print("DATOS ORIGEN CACHE: {}".format(mapeo["datos_temp"])) datos_origen = datos_temp[mapeo["datos_temp"]][0] campos_orig = datos_temp[mapeo["datos_temp"]][1] if len(datos_origen) == 0: # raise Exception("No se encuentra la tabla o no tiene datos") print(mapeo["tabla_orig"] + ' --> TABLA SIN DATOS') for dato in datos_origen: for columna in columnas: if columna["campo_orig"].find(" as ") >= 0: nombreColumna = columna["campo_orig"].split(" as ")[1] else: nombreColumna = columna["campo_orig"] valor = dato[campos_orig[nombreColumna]] if "correspondencia" in columna: valorC = None correspondencias = columna["correspondencia"] for corresp in correspondencias: if valor in corresp: valorC = corresp[valor] valor = valorC if "transformacion" in columna: datos["campos"] = columna["transformacion"].format(valor="%s") + ", " else: datos["campos"] += "%s, " datos["valores"].append(valor) datos["campos"] = datos["campos"][:-2] # Quitar última ", " sql = INSERT_DATOS_DESTINO.format(tablaDestino=mapeo["tabla_dest"], columnasDestino=datos["columnasDestino"], esquema=mapeo["esq_dest"], valoresDestino=datos["campos"]) print("DATOS DESTINO:", sql, datos["valores"]) cursorDestino.execute(sql, datos["valores"]) datos["campos"] = "" del datos["valores"][:] if "postproceso" in mapeo and len(mapeo["postproceso"]) > 0: for sql in mapeo["postproceso"]: print("POST PROCESO:{}".format(sql)) cursorOrigen.execute(sql) if "postprocesodest" in mapeo and len(mapeo["postprocesodest"]) > 0: for sql in mapeo["postprocesodest"]: print("POST PROCESO:{}".format(sql)) cursorDestino.execute(sql) connDestino.commit() print("{} ==> {}: OK".format(mapeo["tabla_orig"], mapeo["tabla_dest"])) except Exception as error: add_msg(datos, "ERROR:: " + str(error)) if connOrigen is not None: connOrigen.rollback() if task is not None: progreso += inc task.set_progress(progreso) status = {'status': 'ok', 'resultado': 'Los datos han sido copiados con éxito'} task.set_progress(100) return HttpResponse(status) except ValueError as e: print('error {}'.format(e)) return HttpResponseBadRequest(e) finally: if connOrigen is not None: connOrigen.close() if connOrigen is not None: connDestino.close() else: return StreamingHttpResponse('it was GET request')