""" Module that control and implements the behaviour of the layers. a layer is the basic structure which GMS works with. A layer has two main concepts: - Driver: one of tools/drivers module, controls how the file works - Protocol: one of tools/protocols, controls how interact with the filesystem, allow the "filesystem" operations. TODO: Deep explanation of layer's behaviour """ import random import os import uuid from typing import Type import shutil from tools import const from MG.tools.postgis_api import PostGis from .drivers.driver import Drivers from .drivers.masterDriver import MasterDriver from .protocols.protocol import Protocol class MasterLayer: """ Master class from which the rest inherits. Attributes ---------- DEFINED_TYPES: list of str Allowed layer types. Raises ------ BadLayerType: Layer type specified does not exist in DEFINED_TYPES list. BadProtocolError: Protocol does not exist in protocol types definition. NoRemoteSourceError: The remote source could not be reachable. """ DEFINED_TYPES = [const.LAYER_INPUT_KEY, const.LAYER_OUTPUT_KEY, const.LAYER_NONLOCAL_OUTPUT_KEY, const.LAYER_TMP_KEY, const.LAYER_NONLOCAL_INPUT_KEY] bad_layer_type = type('BadLayerType', (Exception, ), {}) bad_protocol_error = type('BadProtocolError', (Exception,), {}) layer_do_not_exist_on_remote = type('NoRemoteSourceError', (Exception,), {}) def __init__(self, user, layer_params): self.parent_parameters = layer_params self.parameters = layer_params.copy() self.user = user self.residuals = [] self.residual_objects = [] self.residual_sources = [] self.residual_tables = [] self.residual_schemas = [] self.attached_sources = [] self.protocol = Type[Protocol] self.driver = Type[MasterDriver] self.update_driver_and_protocol(layer_params) self.local_parameters = {} self.cloud_parameters = self.format_parameters(layer_params.copy()) self.parameters = self.format_parameters(layer_params.copy()) def format_parameters(self, parameters): """ Format parameters according to layer protocol Parameters ---------- parameters: dict A raw parameters dictionary Returns ------- dict: Formatted parameters """ return self.protocol.format_parameters(self, parameters) def update_driver_and_protocol(self, new_parameters): """ Update driver and protocol of the layer Parameters ---------- new_parameters: dict A dictionary containing the new parameters Raises ------ bad_protocol_error: If the selected protocol does not exist. """ self.parameters.update(new_parameters) if 'protocol' not in self.parameters: return None # Check ip and port ############# ip = self['ip'] if 'ip' in self.parameters.keys() and self['ip'] else self['domain'] port = self['port'] if 'port' in self.parameters.keys() else '' if self['user'] == self.user: self.parameters['user'] = '' ################################### if self['protocol'] == const.POSTGRESQL_KEY: self.driver = Drivers().get_by_name(const.POSTGRESQL_KEY, _type=self['type']) self.protocol = Protocol()[const.POSTGRESQL_KEY](ip, port, self['user'], self['password'], self['database_name']) elif self['protocol'] == const.FIWARE_KEY: self.driver = Drivers().get_by_name(const.FIWARE_KEY, _type=self['type']) self.protocol = Protocol()[const.FIWARE_KEY](ip, port, self['user'], self['password'], self['url_token'], self['service'], self['subservice'], self['application_id'], self['application_secret']) elif self['protocol'] in list(Protocol.PROTOCOLS.keys()): self.protocol = Protocol()[self['protocol']](ip, port, self['user'], self['password']) self.driver = Drivers().get_by_name(self['driver_type'], _type=self['type'], container=not self.is_file()) else: raise self.bad_protocol_error('{} protocol do no exists'.format(self['protocol'])) def to_local(self): """ Do a wrapper to our PostgreSQL database to work with it. Raises ------ layer_do_not_exist_on_remote: If the file, table or something else does not exist on remote server. """ # First of all check if exists if not self.exists(): raise self.layer_do_not_exist_on_remote("{} not exists on {}".format(self.get_source(), self.protocol.ip)) # Wrapper new_parameters = self.driver.to_local(self) # Updating driver and protocol to work in local if new_parameters: self.update_driver_and_protocol(new_parameters) self.local_parameters = new_parameters.copy() def download(self, directory, prefix='', **kwargs): """ Download a remote source to local. This action change the layer protocol. The destiny protocol is LOCAL. Parameters ---------- directory: str Path to save downloaded layer. prefix: str Prefix to add before random string name Returns ------- str: Path to downloaded source """ # Change to cloud protocol. self.update_driver_and_protocol(self.cloud_parameters) dst_path = os.path.join(directory, '{}{}'.format(prefix, uuid.uuid1())) dst = self.driver.download(self, dst_path, **kwargs) # Change parameters to LOCAL protocol local_driver_parameters = {'protocol': const.LOCAL_KEY, 'source': dst, 'driver_type': self.cloud_parameters['driver_type']} # Change to LOCAL protocol. self.update_driver_and_protocol(local_driver_parameters) return self['source'] @classmethod def optional(cls): """ Instance of an optional layer Returns ------- Layer: The instance of a new layer. """ return cls(None, {'protocol': const.LOCAL_KEY, 'ip': '0.0.0.0', 'port': 999999, 'user': None, 'password': None, 'type': const.VECTOR_KEY, 'driver_type': const.KML_KEY, 'source': 'empty.kml', 'layer_name': 'empty'}) @classmethod def set_layer_from_type(cls, layer_type): """ Select layer type from it's name Parameters ---------- layer_type: str Name type of the layer Returns ------- class: Class of the selected layer type. Raises ------ bad_layer_type: If the layer type is not found. """ if layer_type.lower() not in cls.DEFINED_TYPES: raise cls.bad_layer_type('Layer type "{}" is not allowed'.format(layer_type)) for subclass in cls.__subclasses__(): if subclass.LAYER_TYPE == layer_type.lower(): return subclass return None def gdal_layer(self, with_vsi=True): """ Get connection string to open layer's source with gdal. Parameters ---------- with_vsi: bool Add gdal virtual drivers prefix or not. Returns ------- str: Connection gdal query """ return self.protocol.gdal_layer(self, with_vsi) def gdal_url(self, source, with_vsi=False): """ Get connection string of source with the parameters of the layer's protocol Parameters ---------- source: str Source to open with_vsi: bool Add gdal virtual drivers prefix or not. Returns ------- str: Connection gdal query """ source = self.driver.check_source(source) return self.protocol.gdal_url(source, with_vsi=with_vsi, driver_prefix=self.driver.gdal_prefix) def is_file(self): """ Return if the source is a file or a table. Returns ------- bool: True if is a source or table, false otherwise. """ return self.protocol.is_file(self['source']) def list_files(self): """ List all files in a source (OLD) Returns ------- list: List of files """ return self.protocol.list_files(self['source'], suffix=self.driver.format) def publish(self, autorefresh=True, autorefresh_latency=0): """ Publish a layer Parameters ---------- autorefresh: bool Autorefresh the publication autorefresh_latency: float Latency of refreshing Returns ------- dict: Dict with status, urls and layer keys. """ return self.driver.publish(self, autorefresh=autorefresh, autorefresh_latency=autorefresh_latency) def check(self): """ Check if a layer exists Returns ------- bool: True if exist, False otherwise. """ return self.exists() and self.driver.check(self) def list(self): """ List the sources of the connection (NEW) Returns ------- list: List with the sources of the connection """ return self.driver.list(self) def create(self, params, overwrite): """ Create the wrapper of source Parameters ---------- params: dict Sql parameters to create the wrapper overwrite: bool Overwrite the view if exists. Returns ------- list: List that contain the attributes and layer name """ return self.driver.create(self, params=params, overwrite=overwrite) def remove(self, layer): """ Remove a layer Parameters ---------- layer: Layer object Returns ------- bool: True if deleted are did without problem, False otherwise. """ return self.driver.remove(self, layer) def preview(self): """ Preview a layer. Vector types: selects a random sample of entities. Raster types: pixel value statistics Returns ------- list: List with the samples or the statistics """ return self.driver.preview(self) def copy(self): """ Method to copy the layer Returns ------- """ return Layer(self.user, self.parameters.copy()) def prelist(self): """ List parent connection. Returns ------- list: List of parent connection. """ return self.driver.prelist(self) @staticmethod def _remove_file_dir(source): """ Remove file or a directory. Parameters ---------- source: str Path to file or directory """ if os.path.isdir(source): os.system('rm -r {source}/* && rm -r {source}'.format(source=source)) else: os.system('rm {}'.format(source)) def _remove_table(self, source): """ Remove table Parameters ---------- source: str Table name """ PostGis(self.user).delete_tables(self.user, source) def _remove_schema(self, schema): """ Remove schema Parameters ---------- schema: str Schema name """ PostGis(self.user).delete_schema(schema) def attach_sources(self, *sources): """ Attach sources to layer Parameters ---------- sources: list of str List of sources """ self.attached_sources.extend(*sources) def to_cloud(self, is_append=False, is_truncate=False): """ Export result to local environment from remote environment Parameters ---------- is_append: bool Flag to overwrite or not the remote result. - is_append=True: overwrite=False - is_append=False: overwrite=True is_truncate: bool Flag to add --config OGR_TRUNCATE_YES to ogr2ogr command. """ dst_driver_string = self.cloud_parameters['driver_type']\ if 'driver_type' in self.cloud_parameters.keys() else const.POSTGRESQL_KEY dst_driver = Drivers().get_by_name(dst_driver_string, _type=self['type']) files = self.attached_sources if not dst_driver.__class__ == Drivers().get_by_name(const.POSTGRESQL_KEY, _type=const.VECTOR_KEY).__class__: files = self.driver.translate(self, dst_driver) # Change layer configuration to cloud parameters self.update_driver_and_protocol(self.cloud_parameters) dsts = [self.driver.get_source(self)] if len(files) == 1 else \ ['{}_{}'.format(self.driver.get_source(self, only_name=True), file.split('/')[-1].split('.')[0]) for file in files] for file, dst in zip(files, dsts): self.driver.upload(self, file, dst, is_append=is_append, is_truncate=is_truncate) # Change layer configuration to local parameters and set local sources as layer source self.update_driver_and_protocol(self.local_parameters) self.driver.set_source(self, *self.attached_sources) def clean_residuals(self): """ Remove all the items added to residuals* objects: - residuals: sources to remove - residuals_object: python objects to remove - residual_sources: files and folders - residual_tables: tables - residual_schemas: schemas """ for res in self.residuals: if isinstance(res, str): self.driver.remove(res) else: del res # TODO: Think about a better residuals policy. for res in self.residual_objects: del res for res in self.residual_sources: if os.path.isdir(res): shutil.rmtree(res, ignore_errors=True) continue _ = os.remove(res) if os.path.exists(res) else False postgis_obj = PostGis('public') for res in self.residual_tables: postgis_obj.delete_tables(self.user, res,) for res in self.residual_schemas: postgis_obj.delete_schema(res) # ###################################################### def clean_layer(self): """ Method to clean the layer. Remove the attached sources. """ for attached_source in self.attached_sources: self.driver.remove(attached_source, self) self.attached_sources = [] def get_source(self): """ Get the formatted id of the source, ruled by the layer's driver. Returns ------- str: Formatted source """ return self.driver.get_source(self) def exists(self): """ check whether the layer exists. Returns ------- bool: True if exists, False otherwise """ return self.protocol.exists(self) def __getitem__(self, item): return self.parameters[item] class Layer(MasterLayer): """ Input layer behaviour """ LAYER_TYPE = const.LAYER_INPUT_KEY def clean_layer(self): self.driver.remove(self.driver.get_source(self, only_name=True), self) class NonLocalLayer(MasterLayer): """ Input layer behaviour """ LAYER_TYPE = const.LAYER_NONLOCAL_INPUT_KEY def to_local(self): None def clean_layer(self): self.driver.remove(self.driver.get_source(self, only_name=True), self) class OutLayer(MasterLayer): """ Output layer behaviour """ LAYER_TYPE = const.LAYER_OUTPUT_KEY def __init__(self, user, layer_params): super().__init__(user, layer_params) self.postgis = PostGis('public') self.to_local() @staticmethod def _random_string(): """ Method to get a random string of length 10 Returns ------- str: Random string """ possible = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890' string = ''.join(random.sample(possible, 10)) return string def to_local(self): #Check type of source if self.driver.driver_type == const.VECTOR_KEY: try: table_view_name = self['table_view_name'] except KeyError: try: table_view_name = self['source'].split('/')[-1].split('.')[0] except KeyError: table_view_name = self['entity_name'].split('/')[-1].split('.')[0] local_parameters = {'ip': self.postgis.ip, 'protocol': const.POSTGRESQL_KEY, 'port': self.postgis.port, 'database_name': self.postgis.dbname, 'user': self.postgis.user, 'password': self.postgis.passw, 'schema': self.user, 'table_view_name': table_view_name} elif self.driver.driver_type == const.RASTER_KEY: local_parameters = {'protocol': const.LOCAL_KEY, 'driver_type': self['driver_type']} else: local_parameters = {'protocol': const.LOCAL_KEY} self.local_parameters = local_parameters self.update_driver_and_protocol(local_parameters) def is_file(self): """ Return if the source is a file or a table. Returns ------- bool: True if is a source or table, false otherwise. """ return self['source'].split('/')[-1].split('.') != "" class NonLocalOutLayer(MasterLayer): """ Output layer behaviour without to local() """ LAYER_TYPE = const.LAYER_NONLOCAL_OUTPUT_KEY def __init__(self, user, layer_params): super().__init__(user, layer_params) @staticmethod def _random_string(): """ Method to get a random string of length 10 Returns ------- str: Random string """ possible = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890' string = ''.join(random.sample(possible, 10)) return string def is_file(self): """ Return if the source is a file or a table. Returns ------- bool: True if is a source or table, false otherwise. """ return self['source'].split('/')[-1].split('.') != "" class TmpLayer(MasterLayer): """ Temporal layer behaviour """ LAYER_TYPE = const.LAYER_TMP_KEY def __init__(self, user, layer_params): random_string = self._random_string() # Update default params ############### params = dict() if 'source' not in layer_params or 'table_view_name' not in layer_params: params = {'protocol': const.LOCAL_KEY, 'ip': '0.0.0.0', 'port': 999999, 'user': None, 'password': None, 'type': const.VECTOR_KEY, 'driver_type': const.KML_KEY, 'source': os.path.join(const.TMP_DIRECTORY, random_string), 'table_view_name': random_string, 'autorefresh': False, 'layer_name': random_string} layer_params.update(params) super().__init__(user, layer_params) @staticmethod def _random_string(): possible = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890' string = ''.join(random.sample(possible, 10)) return string def to_local(self): return None def to_cloud(self, is_append=False, is_truncate=False): return None def attach_sources(self, *sources): """ Method to attach source to a temporal layer. Is slightly different that other layers. Parameters ---------- sources Returns ------- """ # Check source and set default parameters source = sources[0][0] sql_source = len(source.split('.')) == 1 and not os.path.isdir(source[0]) if sql_source: postgis_obj = PostGis('public') parameters = {'ip': postgis_obj.ip, 'protocol': const.POSTGRESQL_KEY, 'port': postgis_obj.port, 'database_name': postgis_obj.dbname, 'user': postgis_obj.user, 'password': postgis_obj.passw, 'schema': self.user, 'type': const.VECTOR_KEY} else: # Get data type if isinstance(source, str): source = [source] if os.path.isdir(source[0]): # Directory source walk_gen = os.walk(source[0]) _, _, filename = walk_gen.__next__() extension = filename[0].split('.')[-1] driver = Drivers().get_by_field('format', extension) parameters = { 'ip': '0.0.0.0', 'port': 0, 'user': 'user', 'password': 'passw', 'protocol': const.LOCAL_KEY, 'driver_type': driver, 'source': ':'.join(source), 'type': Drivers().get_by_name(driver).driver_type } else: # File source extension = source[0].split('.')[-1] driver = Drivers().get_by_field('format', extension) parameters = { 'ip': '0.0.0.0', 'port': 0, 'user': 'user', 'password': 'passw', 'protocol': const.LOCAL_KEY, 'driver_type': driver, 'source': ':'.join(source), 'type': Drivers().get_by_name(driver).driver_type } self.update_driver_and_protocol(parameters) self.attached_sources.extend(*sources)