""" Module for .tif files. Controls the behaviour of that files. """ import tempfile import os import shutil # Fix gdal 2.40 and 3.3 integration problems try: import gdal except ModuleNotFoundError: from osgeo import gdal from tools import const from .masterDriver import MasterDriver VSI_CURL_STRING = '/vsicurl/' VSI_CURL_STREAMING_STRING = '/vsicurl_streaming/' class TIFF(MasterDriver): """ Class to handle tif files. This class corresponds with GTiff gdal driver. Attributes ---------- format: str Extension of the files associated with this driver gdal_driver: str Identification string og gdal driver used for open this files with gdal. Goes to gdal `_ raster drivers to get more information driver_type: str Raster or vector, in this case Raster name: str Name that identifies this driver, actually GeoTIFF """ format = 'tif' gdal_driver = 'GTiff' driver_type = const.RASTER_KEY name = const.GEOTIFF_KEY def to_local(self, *_, **__): """ Get remote source to our local environment Returns ------- success: dict New parameters of the layer """ return None def download(self, layer, dst, *_, **kwargs): """ Method to download the source from remote to local file. Parameters ---------- layer: :obj:`Layer` Layer from remote source dst: str Path to save the file kwargs: dict Arguments Returns ------- success: str Path where the fil has been saved """ dst = self.check_source(dst) src_path = layer['source'] layer.protocol.download(src_path, dst, **kwargs) return dst def upload(self, layer, orig, *_, **__): """ Method to upload file from out local system to remote Parameters ---------- layer: :obj:`Layer` Layer from remote source orig: str Path in local system Returns ------- success: None """ # Upload file with the desired protocol layer.protocol.upload(orig, layer.get_source()) def remove(self, source, *_): """ Remove sources Parameters ---------- source: str Path to remove Returns ------- success: None """ # Everything on this driver are going to be files or folder if os.path.isdir(source): shutil.rmtree(source) elif os.path.isfile(source): os.remove(source) def translate(self, src_layer, dst_driver): """ Translate layer from one driver to another Parameters ---------- src_layer: :obj:`Layer` Origin layer dst_driver: :obj:`Layer` Destiny layer Returns ------- success: list Resulting files """ tmp_dir = tempfile.mkdtemp(dir=const.TMP_DIRECTORY) result_files = [] name_files = [] for source in src_layer.attached_sources: gdal_layer_source = src_layer.gdal_url(source) file_name = source.split(os.sep)[-1].split('.')[0] file_result = os.path.join(tmp_dir, file_name) file_result = dst_driver.check_source(file_result) command = 'gdal_translate -of "{output_format}" ' \ '"{input_connection}" "{output_connection}"'.\ format(output_format=dst_driver.gdal_driver, output_connection=file_result, input_connection=gdal_layer_source) result_files.append(file_result) if file_name is not name_files: name_files.append(file_name) os.system(command) zipped = len(result_files) > 1 and len(name_files) > 1 if zipped: output_result = src_layer['source'].split('/')[-1].split('.')[0] zip_output = os.path.join(tmp_dir, '{}.zip'.format(output_result)) result_names = [name.split(os.sep)[-1].split('.')[0] + "*" for name in result_files] command = "cd {} && zip -q {} {}".\ format(tmp_dir, zip_output.split(os.sep)[-1], ' '.join(result_names)) os.system(command) # Update layer new_cloud_parameters = { 'driver_type': const.ZIP_KEY, 'source': "{}/{}.zip".format('/'.join( src_layer['source'].split('/')[:-1]), output_result) } src_layer.cloud_parameters.update(new_cloud_parameters) result_files = [zip_output] src_layer.residual_sources.append(tmp_dir) return result_files def publish(self, *_, **__): """ Publish on Geoserver """ return None def check_source(self, source): """ Check that source follow the naming rules of the driver Parameters ---------- source: str Path to the source Returns ------- success: str Formatted path to the source """ # Checking dst path has_extension = len(source.split('.')) >= 2 if not has_extension: # Adding extension source = "{}.{}".format(source, self.format) return source def get_source(self, layer, *_, only_name=False, **__): """ Get source path with the format of the driver Parameters ---------- layer: :obj: `Layer` only_name: bool Get only the name or full path Returns ------- success: str Formatted source """ if only_name: return layer['source'].split('/')[-1].split('.')[0] return layer['source'] def check(self, layer): """ Check that source Parameters ---------- layer: :obj: `Layer` Returns ------- bool True if successful, False otherwise. """ data_source = gdal.Open(layer.gdal_layer(), 0) if not data_source: return False return True def create(self, *_, **__): """ With the execution of the wrapper, the foreign table is obtained from which the attributes that the user indicates will be extracted, for subsequent preview. Notes ----- This method will return None for all layers except Postgres, since once the wrapper is done, all layers are foreign tables. Returns ------- success: None """ return None def list(self, layer): """ Get the list of attributes and layer name of the source Parameters ---------- layer: :obj: `Layer` Returns ------- dict dict containing layer and attributes. {"layers": [{"name": 'layer1', 'attributes': [attr1, attr2, ...]}]} """ data_source = gdal.Open(layer.gdal_layer()) url_meta = data_source.GetDescription() name_ext = url_meta.split("/")[-1] name_layer = name_ext.split(".")[0] att = [const.GEOTIFF_KEY] data = {'layers': [{'name': name_layer, 'attributes': att}]} return data def preview(self, layer): """ Get info to preview the layer Parameters ---------- layer: :obj: `Layer` Returns ------- list List that contain the name layer and attributes """ data_source = gdal.Open(layer.gdal_layer()) statistics = [] headers = ['min', 'max', 'mean', 'stdDev', 'band'] for band in range(data_source.RasterCount): band += 1 src_band = data_source.GetRasterBand(band) if src_band is None: continue stats = src_band.GetStatistics(True, True) stats.append(band) statistics.append(stats) if stats is None: continue output_list = [headers, statistics] return output_list def prelist(self, layer): """ list the layers of sources to preview Parameters ---------- layer: :obj: `Layer` Returns ------- list List that contain the layer name """ data_source = gdal.Open(layer.gdal_layer()) url_meta = data_source.GetDescription() name_ext = url_meta.split("/")[-1] name_layer = [name_ext.split(".")[0]] output_list = [] head = ["layers"] output_list.append(head) output_list.append([name_layer]) return output_list class ContainerTiff(TIFF): """ Control containers of tif files, like folders. """ def prelist(self, layer): head = ['layers'] layers = [] for _file in layer.list_files(): ext = _file.split('.')[-1] if ext == self.format: if '/' in _file: if '/ftp/' in _file: file = _file.replace('/ftp/{}/public/'.format( layer['user']), '').rsplit('/', 1)[1] source = '{}'.format(_file.replace( '/ftp/{}/public'.format(layer['user']), '')) else: file = _file.rsplit('/', 1)[1] source = '{}'.format(_file) else: file = _file source = '{}/{}'.format(layer['source'], _file) layer_new = layer.copy() layer_new.parameters['source'] = source layer_new.update_driver_and_protocol(layer_new.parameters) res = TIFF.prelist(self, layer_new) layers.append(["{}".format(file, res[1][0][0])]) return [head, layers] def check(self, layer): layer_new = layer.copy() _file = layer_new.parameters['layer_name'] layer_new.parameters["source"] = "{}/{}".format( layer_new.parameters['source'], _file) layer_new.update_driver_and_protocol(layer_new.parameters) return TIFF.check(self, layer_new) def list(self, layer): layer_new = layer.copy() _file = layer_new.parameters['layer_name'] layer_new.parameters["source"] = "{}/{}".format( layer_new.parameters['source'], _file) layer_new.update_driver_and_protocol(layer_new.parameters) data = TIFF.list(self, layer_new) data['layers'][0]['name'] = layer['layer_name'] return data def preview(self, layer): layer_new = layer.copy() _file = layer_new.parameters['layer_name'] layer_new.parameters["source"] = "{}/{}".format( layer_new.parameters['source'], _file) layer_new.update_driver_and_protocol(layer_new.parameters) data = TIFF.preview(self, layer_new) return data