""" Module that control the behaviour of zip files and container of zip files. """ import os import tempfile import shutil import zipfile # Fix gdal 2.40 and 3.3 integration problems try: import ogr except ModuleNotFoundError: from osgeo import ogr from tools import const from MG.tools.postgis_api import PostGis from .masterDriver import MasterDriver class _ZIP(MasterDriver): """ General class for ZIP files """ format = 'zip' gdal_prefix = '/vsizip/' gdal_driver = 'ZIP' driver_type = const.BOTH_TYPE_KEY name = const.ZIP_KEY def to_local(self, *_, **__): """ Get remote source to out local environment Parameters ---------- _: optional Arguments __: optional Arguments Returns ------- success: dict New parameters of the layer """ return None def translate(self, *_, **__): """ Translate layer from one driver to another Parameters ---------- _: __: Returns ------- success: list Resulting files """ return None def download(self, *_, **__): """ Method to download the source from remote to local file. Parameters ---------- _: list Arguments __: dict Arguments Returns ------- success: str Path where the fil has been saved """ return None def upload(self, layer, orig, *_, **__): """ Method to upload file from out local system to remote Parameters ---------- layer: :obj:`Layer` Layer from remote source orig: str Path in local system _: list Arguments _: dict Arguments Returns ------- success: None """ layer.protocol.upload(orig, layer.get_source()) def remove(self, source, *_): """ Remove sources Parameters ---------- source: str Path to remove _: list Arguments Returns ------- success: None """ # Everything on this driver are going to be files or folder if os.path.isdir(source): shutil.rmtree(source) elif os.path.isfile(source): os.remove(source) def publish(self, *_, **__): """ Publish on Geoserver Parameters ---------- _ __ Returns ------- """ return None def check_source(self, source): """ Check that source follow the naming rules of the driver Parameters ---------- source: str Path to the source Returns ------- success: str Formatted path to the source """ # Checking dst path has_extension = len(source.split('.')) >= 2 if not has_extension: # Adding extension source = "{}.{}".format(source, self.format) return source def get_source(self, layer, *_, only_name=False, **__): """ Get source path with the formar of the driver Parameters ---------- layer: :obj: `Layer` only_name: bool Get only the name or full path _: list Arguments __: dict Arguments Returns ------- success: str Formatted source """ if only_name: return layer['source'].split('/')[-1].split('.')[0] return layer['source'] class ZIPVector(_ZIP): """ Class to control the zip with Shapefiles inside. """ def to_local(self, layer, *_, **__): """ Get remote source to out local environment Parameters ---------- layer: :obj:`Layer` Layer from remote source _: optional Arguments __: optional Arguments Returns ------- success: dict New parameters of the layer """ url = layer.gdal_layer(with_vsi=False) out_layer = layer['source'].split('/')[-1].split('.')[0] user = layer.user server_name = "{}_SHPzip_{}_{}".format(user, out_layer, layer['layer_name']).\ replace(' ', '_') foreign_table_name = "SHPzip_{}_{}_ft".format(out_layer, layer['layer_name']).\ replace(' ', '_') layer_name = layer['layer_name'].replace(' ', '_').lower() status_wrapper_zip = PostGis('public').wrapper_shp_zip_limit_pg( user, server_name, url, user, layer_name, foreign_table_name) # Update parameters of layer with the new connection and source postgis = PostGis('public') new_parameters = {'ip': postgis.ip, 'port': postgis.port, 'protocol': const.POSTGRESQL_KEY, 'driver_type': const.POSTGRESQL_KEY, 'user': postgis.user, 'password': postgis.passw, 'database_name': postgis.dbname, 'table_view_name': status_wrapper_zip['layer'], 'layer_name': out_layer, 'schema': user} return new_parameters def download(self, layer, dst, *args, **kwargs): """ Method to download the source from remote to local file. Parameters ---------- layer: :obj:`Layer` Layer from remote source dst: str Path to save the file _: list Arguments __: dict Arguments Returns ------- success: str Path where the fil has been saved """ dst = self.check_source(dst) src_path = layer.cloud_parameters['source'] if '.' in src_path: src_path = src_path else: src_path = '{}/{}'.format(src_path, layer.cloud_parameters['layer_name']. split(':')[0]) layer.protocol.download(src_path, dst) # Removing, if exists, table in user's schema. # The method "to_local" has been executed earlier. postgis_obj = PostGis(layer.user) postgis_obj.get_foreign_server_from_ft( layer.user, layer.local_parameters['table_view_name']) return dst def translate(self, src_layer, dst_driver): """ Translate layer from one driver to another Parameters ---------- src_layer: :obj:`Layer` Origin layer dst_driver: :obj:`Layer` Destiny layer Returns ------- success: list Resulting files """ tmp_dir = tempfile.mkdtemp(dir=const.TMP_DIRECTORY) result_files = [] for source in src_layer.attached_sources: gdal_layer_source = src_layer.gdal_url(source, with_vsi=True) file_name = source.split(os.sep)[-1].split('.')[0] file_result = os.path.join(tmp_dir, file_name) file_result = dst_driver.check_source(file_result) command = 'ogr2ogr -overwrite -f "{output_format}" ' \ '"{output_connection}" "{input_connection}"'.\ format(output_format=dst_driver.gdal_driver, output_connection=file_result, input_connection=gdal_layer_source) result_files.append(file_result) os.system(command) def check(self, layer): """ Check that source Parameters ---------- layer: :obj: `Layer` Returns ------- bool True if successful, False otherwise. """ data_source = ogr.Open(layer.gdal_layer(), 0) if not data_source: return False return True def create(self, *_): """ With the execution of the wrapper, the foreign table is obtained from which the attributes that the user indicates will be extracted, for subsequent preview. Parameters ---------- _: Notes ----- This method will return None for all layers except Postgres, since once the wrapper is done, all layers are foreign tables. Returns ------- success: None """ return None def list(self, layer): """ Get the list of attributes and layer name of the source Parameters ---------- layer: :obj: `Layer` Returns ------- dict dict containing layer and attributes. {"layers": [{"name": 'layer1', 'attributes': [attr1, attr2, ...]}]} """ data_source = ogr.Open(layer.gdal_layer(), 0) data = {'layers': []} for layer_ogr in data_source: layer_data = dict() layer_data['name'] = layer_ogr.GetName() layer_data['attributes'] = [] layer_definition = layer_ogr.GetLayerDefn() for j in range(layer_definition.GetFieldCount()): field_definition = layer_definition.GetFieldDefn(j) attribute_layer = field_definition.name layer_data['attributes'].append(attribute_layer) if layer_definition.GetGeomFieldCount() >= 1: layer_data['attributes'].append('geom') data['layers'].append(layer_data) return data def preview(self, _): """ Get the info to preview Parameters ---------- _: Returns ------- bool True if successful. """ return True def prelist(self, layer): """ list the layers of sources to preview Parameters ---------- layer: :obj: `Layer` Returns ------- list List that contain the layer name """ data_source = ogr.Open(layer.gdal_layer(), 0) head = ["layers"] layers = [] output_list = [] for lay in data_source: layers.append([lay.GetName()]) output_list.append(head) output_list.append(layers) return output_list class ZIPGeneric(_ZIP): """ Method to download the source from remote to local file. Parameters ---------- layer: :obj:`Layer` Layer from remote source dst: str Path to save the file args: list Arguments kwargs: dict Arguments Returns ------- success: str Path where the fil has been saved """ def download(self, layer, dst, *args, **kwargs): """ Method to download the layer Parameters ---------- layer: Layer Layer to download dst: str Path where save the file _ __ Returns ------- str: The path to the file. """ dst = self.check_source(dst) src_path = layer.cloud_parameters['source'] if '.' in src_path: src_path = src_path else: src_path = '{}/{}'.format(src_path, layer.cloud_parameters['layer_name']. split(':')[0]) layer.protocol.download(src_path, dst) # Unzipping source os.system("unzip -d {} {}".format(os.path.dirname(dst), dst)) return os.path.dirname(dst) def check(self, _): """ Check that Generic Zip exist. Always return true Parameters ---------- _: Returns ------- bool Only true if successful. """ return True def create(self, *_): """ With the execution of the wrapper, the foreign table is obtained from which the attributes that the user indicates will be extracted, for subsequent preview. Parameters ---------- _: Notes ----- This method will return None for all layers except Postgres, since once the wrapper is done, all layers are foreign tables. Returns ------- success: None """ return None def list(self, layer): """ Get the list of attributes and layer name of the source Parameters ---------- layer: :obj: `Layer` Returns ------- dict dict containing layer and attributes. {"layers": [{"name": 'layer1', 'attributes': [attr1, attr2, ...]}]} """ path = layer.get_source() url = layer.gdal_url(source=path) name_ext = url.split("/")[-1] name_layer = name_ext.split(".")[0] att = [const.ZIP_KEY] data = {'layers': [{'name': name_layer, 'attributes': att}]} return data def preview(self, layer): """ Get info to preview the layer Parameters ---------- layer: :obj: `Layer` Returns ------- list List that contain the name layer and attributes """ content_files = [] headers = ['files'] url = layer.gdal_layer(with_vsi=False) name_ext = url.split("/")[-1] tempdir = tempfile.TemporaryDirectory() file = layer["source"] temp_folder = """{}/{}""".format(tempdir.name, name_ext) layer.protocol.download(file, temp_folder) with zipfile.ZipFile(tempdir.name + '/' + name_ext, 'r') as zip_f: name_list = zip_f.namelist() for i in name_list: content_files.append(i) del tempdir output_list = [headers, [content_files]] return output_list def prelist(self, layer): """ list the layers of sources to preview Parameters ---------- layer: :obj: `Layer` Returns ------- list List that contain the layer name """ path = layer.get_source() url = layer.gdal_url(source=path) name_ext = url.split("/")[-1] name_layer = [name_ext.split(".")[0]] output_list = [] head = ["layers"] output_list.append(head) output_list.append([name_layer]) return output_list class ContainerZip(ZIPVector, ZIPGeneric): """ Control de containers of zips, like folders of zips. """ def prelist(self, layer): head = ['layers'] layers = [] for _file in layer.list_files(): ext = _file.split('.')[-1] if ext == self.format: if '/' in _file: if '/ftp/' in _file: file = _file.replace('/ftp/{}/public/'.format( layer['user']), '').rsplit('/', 1)[1] source = '{}'.format(_file.replace( '/ftp/{}/public'.format(layer['user']), '')) else: file = _file.rsplit('/', 1)[1] source = '{}'.format(_file) else: file = _file source = '{}/{}'.format(layer['source'], _file) layer_new = layer.copy() layer_new.parameters['source'] = source layer_new.update_driver_and_protocol(layer_new.parameters) if layer_new['type'] == 'GENERIC': res = ZIPGeneric.prelist(self, layer_new) else: res = ZIPVector.prelist(self, layer_new) for layer_name in res[1]: layers.append(["{}:{}".format(file, layer_name[0])]) return [head, layers] def check(self, layer): layer_new = layer.copy() file, layer_name = layer_new.parameters['layer_name'].split(':') layer_new.parameters["source"] = "{}/{}".format( layer_new.parameters['source'], file) layer_new.parameters["layer_name"] = layer_name layer_new.update_driver_and_protocol(layer_new.parameters) if layer_new['type'] == 'GENERIC': return ZIPGeneric.check(self, layer_new) return ZIPVector.check(self, layer_new) def to_local(self, layer, *args, **kwargs): layer_new = layer.copy() file, layer_name = layer_new.parameters['layer_name'].split(':') layer_new.parameters["source"] = "{}/{}".format( layer_new.parameters['source'], file) layer_new.parameters["layer_name"] = layer_name layer_new.update_driver_and_protocol(layer_new.parameters) if layer_new['type'] == 'GENERIC': return None return ZIPVector.to_local(self, layer_new, *args, **kwargs) def list(self, layer): layer_new = layer.copy() file, layer_name = layer_new.parameters['layer_name'].split(':') layer_new.parameters["source"] = "{}/{}".format( layer_new.parameters['source'], file) layer_new.parameters["layer_name"] = layer_name layer_new.update_driver_and_protocol(layer_new.parameters) if layer_new['type'] == 'GENERIC': data = ZIPGeneric.list(self, layer_new) else: data = ZIPVector.list(self, layer_new) for layer_data in data['layers']: if layer_data['name'] == layer_new["layer_name"]: layer_data['name'] = layer['layer_name'] break return data def preview(self, layer): layer_new = layer.copy() file, layer_name = layer_new.parameters['layer_name'].split(':') layer_new.parameters["source"] = "{}/{}".format( layer_new.parameters['source'], file) layer_new.parameters["layer_name"] = layer_name layer_new.update_driver_and_protocol(layer_new.parameters) if layer_new['type'] == 'GENERIC': return ZIPGeneric.preview(self, layer_new) return True def download(self, layer, dst, *args, **kwargs): layer_new = layer.copy() file, layer_name = layer_new.parameters['layer_name'].split(':') layer_new.parameters["source"] = "{}/{}".format( layer_new.parameters['source'], file) layer_new.parameters["layer_name"] = layer_name layer_new.update_driver_and_protocol(layer_new.parameters) if layer_new['type'] == 'GENERIC': return ZIPGeneric.download(self, layer_new, dst, *args, **kwargs) return ZIPVector.download(self, layer_new, dst, *args, **kwargs)