""" Module controlling the behaviour of Postgresql driver. """ import os import tempfile import geoserver # Fix gdal 2.40 and 3.3 integration problems try: import ogr import gdal except ModuleNotFoundError: from osgeo import ogr from osgeo import gdal import psycopg2 import psycopg2.extras from tools import const from tools.system_tools import fetch_method from MG.tools.postgis_api import PostGis, External_PostGis from MG.tools.geoserver_api import Geoserver from .masterDriver import MasterDriver class PostgreSQL(MasterDriver): """ Class to control Postgresql driver """ format = 'pg' gdal_driver = 'PostgreSQL' driver_type = const.VECTOR_KEY name = const.POSTGRESQL_KEY def to_local(self, layer, *args, **kwargs): """ Get remote source to out local environment Parameters ---------- layer: :obj:`Layer` args: optional Arguments kwargs: optional Arguments Returns ------- success: dict New parameters of the layer """ user = layer.user # No sabemos muy bien esto... # if ((layer['driver_type'] == PostGis.PG_KEY) and # (layer['ip'] == PostGis(user).ip) and # (layer['port'] == PostGis(user).port) and # (layer['database_name'] == # PostGis(user).dbname) # and (layer['schema'] == user)): # status = {'status': 'Ya existe la capa', # 'layer': layer['table_view_name']} # return {'status': status} # Ñapa para cuando se crea una tabla colaborativa en # nuestra base de datos if layer.parameters['domain'] == 'SRM': int_pg = PostGis('public') user = int_pg.schema_colaborative layer.user = int_pg.schema_colaborative layer.parameters['ip'] = int_pg.ip layer.parameters['port'] = int_pg.port layer.parameters['user'] = int_pg.user layer.parameters['password'] = int_pg.passw layer.parameters['database_name'] = int_pg.dbname_colaborative layer.parameters['schema'] = int_pg.schema_colaborative layer.parameters['table_view_name'] = layer['table_view_name'] layer.parameters['domain'] = False foreign_table_name = "PG_{}_ft".format(layer['table_view_name']) server_name = "{}_PG_{}_{}".format(user, layer['schema'], layer['table_view_name']) status_wrapper_pg = PostGis('public'). \ wrapper_postgresql_pg(user, server_name, layer.protocol.ip, layer.protocol.port, layer.protocol.user, layer.protocol.password, layer.protocol.dbname, layer['schema'], foreign_table_name) postgis = PostGis(user) new_parameters = {'ip': postgis.ip, 'port': postgis.port, 'protocol': const.POSTGRESQL_KEY, 'driver_type': const.POSTGRESQL_KEY, 'user': postgis.user, 'password': postgis.passw, 'database_name': postgis.dbname, 'table_view_name': status_wrapper_pg['layer'], 'layer_name': layer['table_view_name'], 'schema': user, 'schema_origin':layer['schema']} return new_parameters def download(self, *args, **kwargs): """ Method to download the source from remote to local file. Parameters ---------- layer: :obj:`Layer` Layer from remote source dst: str Path to save the file args: list Arguments kwargs: dict Arguments Returns ------- success: str Path where the fil has been saved """ # Download a postgres's table it is the same that foreign wrapper # did on "to_local" return None def upload(self, layer, orig, dst, *args, **kwargs): """ Method to upload file from out local system to remote Parameters ---------- layer: :obj:`Layer` Layer from remote source orig: str Path in local system dst: str Path in remote system args: list Arguments kwargs: dict Arguments Returns ------- success: None """ is_append = kwargs['is_append'] is_truncate = kwargs['is_truncate'] layer.update_driver_and_protocol(layer.local_parameters) gdal_input_layer = layer.gdal_url('{}.{}'.format(layer.user, orig)) layer.update_driver_and_protocol(layer.cloud_parameters) ogr_string = "ogr2ogr {} -lco LAUNDER=NO -nln \"{}\" -f \"{}\" " \ "\"{}\" \"{}\""\ .format("-overwrite" * (not is_append) + "-preserve_fid -append" * is_append + " --config OGR_TRUNCATE YES" * is_truncate, dst.split('.')[-1], const.POSTGRESQL_KEY, layer.gdal_url('{}.'.format(layer['schema'])), gdal_input_layer) if layer.local_parameters['table_view_name']: layer.local_parameters['table_view_name'] += ":{}".format(orig) else: layer.local_parameters['table_view_name'] = orig os.system(ogr_string) def remove(self, source, *args): """ Remove sources Parameters ---------- source: str Path to remove args: list Arguments Returns ------- bool True if successful, False otherwise. """ layer = args[0] postgis_obj = PostGis('Public') # First check what is. schemas = list(postgis_obj.list_schemas()) if source in schemas: postgis_obj.delete_schema(source) return None foreign_tables = list(postgis_obj.list_foreign_tables()) if source in foreign_tables: # Removing, if exists, table in user's schema. # The method "to_local" has been executed earlier. servers_to_remove = postgis_obj.get_foreign_server_from_ft( layer.user, source) for server in servers_to_remove: postgis_obj.delete_server_data_Wraper(server) postgis_obj.delete_tables(layer.user, source) elif '_ft' not in layer["table_view_name"]: try: postgis_obj.send_sql_command( """ DROP VIEW IF EXISTS "{}"."{}" CASCADE; """.format( layer['schema'], layer["table_view_name"])) return True except psycopg2.errors.WrongObjectType: return False else: try: server_name = list(PostGis(layer.user).send_sql_command( """ SELECT foreign_server_name from information_schema.foreign_tables where foreign_table_name like '%{layer}%'; """.format(layer=layer["table_view_name"]))) PostGis('public').delete_server_data_Wraper(server_name[0][0]) return True except Exception: return False def translate(self, src_layer, dst_driver): """ Translate layer from one driver to another Parameters ---------- src_layer: :obj:`Layer` Origin layer dst_driver: :obj:`Layer` Destiny layer Returns ------- success: list Resulting files """ tmp_dir = tempfile.mkdtemp(dir=const.TMP_DIRECTORY) result_files = [] table_view_names = [] for source in src_layer.attached_sources: gdal_layer_source = src_layer.gdal_url('{}.{}'.format( src_layer.user, source)) file_result = os.path.join(tmp_dir, source) file_result = dst_driver.check_source(file_result) table_view_names.append(source) command = 'ogr2ogr -overwrite -f "{output_format}" ' \ '"{output_connection}" "{input_connection}"'.\ format(output_format=dst_driver.gdal_driver, output_connection=file_result, input_connection=gdal_layer_source) result_files.append(file_result) os.system(command) zipped = len(result_files) > 1 if zipped: output_result = src_layer['source'].split('/')[-1].split('.')[0] zip_output = os.path.join(tmp_dir, '{}.zip'.format(output_result)) result_names = [name.split(os.sep)[-1].split('.')[0] + "*" for name in result_files] command = "cd {} && zip -q {} {}".format(tmp_dir, zip_output. split(os.sep)[-1], ' '.join(result_names)) os.system(command) # Update layer new_cloud_parameters = { 'driver_type': const.ZIP_KEY, 'source': "{}/{}.zip".format('/'.join(src_layer['source']. split('/')[:-1]), output_result) } src_layer.cloud_parameters.update(new_cloud_parameters) result_files = [zip_output] src_layer.residual_sources.append(tmp_dir) return result_files def publish(self, layer, autorefresh, autorefresh_latency): """ Publish on Geoserver Parameters ---------- layer: :obj:`Layer` Origin layer autorefresh: bool Sincronized? autorefresh_latency: float Sincronized interval Returns ------- success: list Url with the layer servers created """ postgis_obj = PostGis('Public') ip_pg = postgis_obj.ip port_pg = postgis_obj.port user_pg = postgis_obj.user passw_pg = postgis_obj.passw database_pg = postgis_obj.dbname schema_pg = layer.user tables_pg = layer.attached_sources if 'table_view_name' in layer.cloud_parameters: out_layer = layer.cloud_parameters['table_view_name'] else: out_layer = layer.cloud_parameters['source'].\ split('/')[-1].split('.')[0] service = layer['options'] user = layer.user urls = [] layer_names = [] out_layers = ["{}_{}".format(out_layer, table_pg) for table_pg in tables_pg] if len(tables_pg) > 1 else [out_layer] geoserver_obj = Geoserver() status_layer = "" for table_pg, out_layer in zip(tables_pg, out_layers): if autorefresh: try: out_layer_w_suffix = "{}_vw".format(out_layer) External_PostGis(ip_pg, port_pg, user_pg, passw_pg, database_pg, schema_pg, table_pg).\ create_view_pg(out_layer_w_suffix) out_layer = out_layer_w_suffix status_layer = geoserver_obj.publish_vec_pg(user, ip_pg, port_pg, user_pg, passw_pg, database_pg, schema_pg, out_layer) except geoserver_obj.cat.UploadError: status_layer = 'The layer already exists' else: if not autorefresh_latency: autorefresh_latency = 0 try: out_layer_w_suffix = "{}_mv".format(out_layer) External_PostGis(ip_pg, port_pg, user_pg, passw_pg, database_pg, schema_pg, table_pg)\ .create_materialized_view_pg(out_layer_w_suffix) out_layer = out_layer_w_suffix status_layer = geoserver_obj.publish_vec_pg(user, ip_pg, port_pg, user_pg, passw_pg, database_pg, schema_pg, out_layer) # Solución muy guarra para que no falle ################## if int(autorefresh_latency) >= 60: autorefresh_latency = 59 ########################################################## fetch_method("/MG/routines/create", {"user": user, "geom": 'MG/pg/refresh_concurretly_pg', "params": {'user': user, "ip_pg": layer['ip'], "port_pg": layer['port'], "user_pg": layer['user'], "passw_pg": layer['password'], "database_pg": layer['database_name'], "schema_pg": layer['schema'], "table_pg": out_layer}, "period": 'minute.every({})'.format( autorefresh_latency)}) except geoserver.catalog.UploadError: status_layer = 'The layer already exists' url = geoserver_obj.get_service(service, layer.user, out_layer) urls.append(url) layer_names.append(out_layer) result = {"status": status_layer, 'url': urls, 'layer': layer_names} return result def check_source(self, source): """ Check that source follow the naming rules of the driver Parameters ---------- source: str Path to the source Returns ------- success: str Formatted path to the source """ return source def get_source(self, layer, *args, only_name=False, **kwargs): """ Get source path with the formar of the driver Parameters ---------- layer: :obj: `Layer` only_name: bool Get only the name or full path args: list Arguments kwargs: dict Arguments Returns ------- success: str Formatted source """ if only_name: return '{}'.format(layer['table_view_name']) return '{}.{}'.format(layer.user, layer['table_view_name']) def set_source(self, layer, *sources): """ Set the source Parameters ---------- layer: Layer sources: str Source name """ layer.parameters['table_view_name'] = ":".join(sources) def check(self, layer): """ Check that source Parameters ---------- layer: :obj: `Layer` Returns ------- bool True if successful, False otherwise. """ try: ext_pg = External_PostGis(layer.protocol.ip, layer.protocol.port, layer.protocol.user, layer.protocol.password, layer.protocol.dbname, layer['schema'], layer['table_view_name']) ext_pg.test_connection() existence = ext_pg.test_table() exist = existence.replace("(", "").replace(",)", "") if exist == 'True': return True else: return False except ConnectionRefusedError: return False def create(self, layer, params, overwrite): """ With the execution of the wrapper, the foreign table is obtained from which the attributes that the user indicates will be extracted, for subsequent preview. Parameters ---------- layer: :obj: `Layer` params: dict Dict with creation view clauses overwrite: bool Notes ----- This method will return None for all layers except Postgres, since once the wrapper is done, all layers are foreign tables. Returns ------- list List that contain the attributes and layer name """ user = layer.user ext_pg = External_PostGis(layer.protocol.ip, layer.protocol.port, layer.protocol.user, layer.protocol.password, layer.protocol.dbname, layer['schema'], layer['table_view_name']) list_tables_pg = ext_pg.list_all_by_schema(user) list_tables = [] list_content = [] output_list = [] list_attributes_view = [] list_attr_quote = [] list_attr_quote_view = [] for i in list_tables_pg: list_tables.append(i[0]) if params["view"] in list_tables: if overwrite is False or overwrite == 'false': erase = [False] else: try: ext_pg.send_sql_command(""" DROP VIEW "{}"."{}" CASCADE; """.format(user, params["view"])) erase = [True] except psycopg2.OperationalError: erase = [True] else: erase = [True] if erase == [True]: attributes = params["select"] list_attributes = attributes.split(",") if params["where"] is None or params["where"] == '' \ or params["where"] == ' ': where = '' else: where = 'where ' + params["where"] where = where.replace("'", "''") if params["groupBy"] == '' or params["groupBy"] == ' ' \ or params["groupBy"] is None: group_by = '' else: group_by = 'group by ' + params["groupBy"] if params["having"] == '' or params["having"] == ' ' \ or params["having"] is None: having = '' else: having = 'having ' + params["having"] if params["orderBy"] == '' or params["orderBy"] == ' ' \ or params["orderBy"] is None: order_by = '' else: order_by = 'order by ' + params["orderBy"] if params["limit"] == '' or params["limit"] == ' ' \ or params["limit"] is None: limit = '' else: limit = 'limit ' + params["limit"] for i in list_attributes: list_attr_quote.append(i.strip()) ext_pg.create_view_by_clauses( user, params["view"], ','.join(list_attr_quote), user, layer["table_view_name"], where, group_by, having, order_by, limit) list_attr_view = list(ext_pg.list_attributes_each_item( user, params["view"])) command = "SELECT f_geometry_column FROM " \ "geometry_columns WHERE " \ "f_table_name='{table_name}' AND " \ "f_table_schema='{schema}'"\ .format(table_name=layer['table_view_name'], schema=user ) geom_columns = list(ext_pg.send_sql_command(command)) geom_columns = geom_columns[0] if len(geom_columns) != 0 \ else geom_columns for i in list_attr_view: if i[0] in geom_columns: continue list_attributes_view.append(i[0]) for i in list_attributes_view: list_attr_quote_view.append('"{i}"'.format(i=i.strip())) content_pg = ext_pg.select_by_attribute( user, params["view"], ','.join(list_attr_quote_view), limit=9) for i in content_pg: list_content.append(i) output_list.append(erase) output_list.append(list_attributes_view) output_list.append(list_content) return output_list def list(self, layer): """ Get the list of attributes and layer name of the source Parameters ---------- layer: :obj: `Layer` Returns ------- dict dict containing layer and attributes. {"layers": [{"name": 'layer1', 'attributes': [attr1, attr2, ...]}]} """ ext_pg = External_PostGis(layer.protocol.ip, layer.protocol.port, layer.protocol.user, layer.protocol.password, layer.protocol.dbname, layer['schema'], '') list_tables_pg = ext_pg.list_all_by_schema(layer['schema']) list_tables = [] list_attributes = [] for i in list_tables_pg: list_tables.append(i[0]) if layer['table_view_name'] in list_tables: list_attributes_pg = ext_pg.list_attributes_each_item( layer['schema'], layer['table_view_name']) for i in list_attributes_pg: list_attributes.append(i[0]) data = {'layers': [{'name': layer['table_view_name'], 'attributes': list_attributes}]} else: data = {'layers': [{'name': list_tables, 'attributes': list_attributes}]} return data def preview(self, _): """ Get the info to preview Parameters ---------- _: :obj: `Layer` Returns ------- bool True if successful. """ return True def prelist(self, layer): """ list the layers of sources to preview Parameters ---------- layer: :obj: `Layer` Returns ------- list List that contain the layer name """ conn = ogr.Open(layer.gdal_url(source='.', with_vsi=True)) output_list = [] schemas_tables = [] schemas_tables_d = [] schemas_tables_thematic_d = [] result_schemas_tables = [] head = ["schemas", "tables"] for ogr_layer in conn: schemas_tables_d.append(ogr_layer.GetName()) schemas_tables.append(ogr_layer.GetName().split('.') if len(ogr_layer.GetName().split('.')) == 2 else ['public', ogr_layer.GetName()]) if layer['type'] == 'THEMATIC': gdal.SetConfigOption("PG_LIST_ALL_TABLES", "YES") conn_th = ogr.Open(layer.gdal_url(source='.', with_vsi=True)) for ogr_layer in conn_th: schemas_tables_thematic_d.append(ogr_layer.GetName()) result_schemas_tables_thematic = list( set(schemas_tables_thematic_d)-set(schemas_tables_d)) for layer_thematic in result_schemas_tables_thematic: result_schemas_tables.append(layer_thematic.split('.') if len(layer_thematic. split('.')) == 2 else ['public', layer_thematic]) elif layer['type'] == 'VECTOR': result_schemas_tables = schemas_tables output_list.append(head) output_list.append(result_schemas_tables) return output_list