def my_udf(variables): import csv import psycopg2 import pandas as pd postgres_info = variables['connection_postgres'] archivo = variables['file'] conn = psycopg2.connect(host=postgres_info['ip'], port=postgres_info['port'], database=postgres_info['databaseName'], user=postgres_info['user'], password=postgres_info['password']) cursor = conn.cursor() print(archivo) df = pd.read_excel(archivo) # Obtener los nombres de las cabeceras (columnas) cabeceras = df.columns # print("Cabeceras del archivo:") # print(cabeceras) fecha = False index_fecha = False hora = False index_hora = False ubi_x = False index_ubi_x = False ubi_y = False index_ubi_y = False for (index, item) in enumerate(cabeceras): item = str(item) if item == "Fecha": fecha = True index_fecha = index continue if item == "Hora": hora = True index_hora = index continue if item == "Ubi_X": ubi_x = True index_ubi_x = index if item == "Ubi_Y": ubi_y = True index_ubi_y = index if index == 0: append_query_create_table = f'"{str(item)}" VARCHAR,' elif len(cabeceras) - 1 == index: append_query_create_table = f'{append_query_create_table} "{str(item)}" VARCHAR' if fecha and hora: append_query_create_table = f'{append_query_create_table}, "Fecha_Hora" TIMESTAMP' fecha = False hora = False if ubi_x and ubi_y: append_query_create_table = f'{append_query_create_table}, geometry geometry(Point, 4326)' ubi_x = False ubi_y = False else: append_query_create_table = f'{append_query_create_table} "{str(item)}" VARCHAR,' if fecha and hora: append_query_create_table = f'{append_query_create_table} "Fecha_Hora" TIMESTAMP,' fecha = False hora = False if ubi_x and ubi_y: append_query_create_table = f'{append_query_create_table} geometry geometry(Point, 4326),' ubi_x = False ubi_y = False query_create_table = f"""CREATE TABLE {postgres_info["schema"]}.{postgres_info["tableName"]} ({append_query_create_table});""" try: cursor.execute(query_create_table) conn.commit() #print(query_create_table) except: conn.rollback() query_type_columns = f""" SELECT column_name, data_type FROM information_schema.columns WHERE table_name = %s AND table_schema = %s; """ cursor.execute(query_type_columns, (postgres_info["tableName"], postgres_info["schema"])) type_columns = cursor.fetchall() #print(f"Index a tener en cuenta: {index_fecha}, {index_hora}, {index_ubi_x}, {index_ubi_y}") # Obtener las primeras 10 filas # print("Primeras 10 filas:") files_failed = '' for (index, row) in df.iterrows(): # .head(10) selecciona las primeras 10 filas datos_to_append = [] point = [] string_cabeceras = '' string_cabeceras_asociado = '' for (index_separado, item) in enumerate(row): item = str(item) if item == "nan": item = "" for (index_cabeceras, element) in enumerate(cabeceras): if index_separado == index_cabeceras: if element == "Ubi_X" or element == "Ubi_Y": number = float(item) if number == 0: item = '' elif abs(number) > 100: item = str(number / 10) point.append(float(item)) else: point.append(float(item)) if element == "Fecha": fecha = str(item) continue if element == "Hora": hora = (str(item).replace("p. m.","")).replace("a. m.","").replace("p.m.","").replace("a.m.","") continue datos_to_append.append(item) if len(row) - 1 == index_separado: string_cabeceras = f'{string_cabeceras} "{element}"' string_cabeceras_asociado = f'{string_cabeceras_asociado} %s' else: string_cabeceras = f'{string_cabeceras} "{element}",' string_cabeceras_asociado = f'{string_cabeceras_asociado} %s,' if fecha and hora: if fecha.find("29/03/2023") != -1: fecha = "28/03/2023" datos_to_append.append(f"{fecha} {hora}") string_cabeceras = f'{string_cabeceras} "Fecha_Hora",' string_cabeceras_asociado = f"{string_cabeceras_asociado} TO_TIMESTAMP(%s, 'DD/MM/YYYY HH24:MI')," fecha = False hora = False if point: for number in point: datos_to_append.append(number) string_cabeceras = f'{string_cabeceras}, geometry' string_cabeceras_asociado = f'{string_cabeceras_asociado}, ST_SetSRID(ST_MakePoint(%s, %s), 4326)' query_insertar_en_tabla = f"INSERT INTO {postgres_info['schema']}.{postgres_info['tableName']} ({string_cabeceras}) VALUES ({string_cabeceras_asociado});" datos_to_append = tuple(datos_to_append) # print("============ VAMOS A METER A DATABASE") # print(query_insertar_en_tabla) # print(datos_to_append) # print("===================================================") try: cursor.execute(query_insertar_en_tabla, datos_to_append) conn.commit() # print("LO HA SUBIDO BIEN") except Exception as e: conn.rollback() files_failed += f'{str(query_insertar_en_tabla)}\n' files_failed += f'{str(datos_to_append)}\n' files_failed += f'{e}\n' files_failed += f'--------------------------------------------------------------\n' # print("HA DADO ERROR") # if files_failed: # print(files_failed) # primeras_10_filas = df.head(10) # print("\nPrimeras 10 filas del archivo:") # print(primeras_10_filas) cursor.close() conn.close() connection_postgres = {"ip":"146.59.69.180", "port":30038, "databaseName":"ar", "user":"postgres", "password":"pg", "schema":"delitos", "tableName":"delitos_rojas"} file = "/home/data3/master_folder/files_uploaded/TB_Delitos.xlsx" variables = {"connection_postgres":connection_postgres, "file":file} my_udf(variables)