import os # Fix gdal 2.40 and 3.3 integration problems try: import gdal import ogr import osr except ModuleNotFoundError: from osgeo import gdal, ogr, osr import pickle import datetime import dateutil import sys import subprocess import re TMP_PATH = os.path.join(os.path.abspath(''), 'MG', 'update','StaticBase', 'tmp') """str: Path to tmp directory Used to write files than then will be disappeared """ SENTINEL_TILE_GRID = os.path.join(os.path.abspath(''), 'MG', 'update', 'StaticBase', 'SENTINEL2_TILE_GRID.kml') """str: Path to sentinel 2 tile grid kml kml containing the tile grid of sentinel 2 mission. https://sentinel.esa.int/documents/247904/1955685/S2A_OPER_GIP_TILPAR_MPC__20151209T095117_V20150622T000000_21000101T000000_B00.kml """ DOWNLOAD_SENTINEL_SCRIPT = os.path.join(os.path.abspath(''), 'MG', 'update', 'StaticBase', 'sentinel_download.py') """str: Path to sentinel download script Tool to download sentinel images """ APIHUB_AUTH = os.path.join(os.path.abspath(''), 'MG', 'update', 'StaticBase', 'apihub.txt') """str: Path to user password file. File with user and password """ class StaticBase(object): """ Superclass that handle and load all static base types. """ TMP_PATH = TMP_PATH @classmethod def load(cls, static_database_dict): """ Load repositories. Parameters ---------- static_database_dict: dict Dict with key pair values indicating the static database type and their parameters. Returns ------- static_database_dict: dict Dict containing the repositories loaded as StaticBase objects. Examples -------- >>> static_database_dict = {'sentinel': {'global_sr':'EPSG:4326', \ 'cloud_max_coverage':10,\ 'path':'/home/sentinel', \ 'sentinel2_tile_grid': '/home/sentinel/tile_grid.kml'}} >>> StaticBase.load(static_database_dict) {'sentinel': } """ for database in static_database_dict: database_class = eval('StaticBase{}'.format(database.title())) database_object = database_class.load(static_database_dict[database]) static_database_dict[database] = database_object return static_database_dict class StaticBaseSentinel(StaticBase): def __init__(self, global_sr=None, cloud_coverage_max=5, path=None, sentinel2_tile_grid=SENTINEL_TILE_GRID, apihub_path=APIHUB_AUTH): """ Subclass of StaticBase that handle the download of sentinel images. Parameters ---------- global_sr: osgeo.osr.SpatialReference System reference of the sentinel repository cloud_coverage_max: int Maximum cloud coverage percentage. Images with cloud coverage below will be discarded. path: str Path where repository is located sentinel2_tile_grid: Path where the tile grid of sentinel 2 mission is located. Examples -------- >>>> StaticBaseSentinel(global_sr=osr.SpatialReference().ImportFromEPSG(4326), \ cloud_coverage_max=5, \ path='/path/to/sentinel/repository', \ sentinel2_tile_grid='/path/to/sentinel2_tile_grid.kml') """ self.global_sr = global_sr self.source = 'sentinel' self.tile_grid = sentinel2_tile_grid self.cloud_coverage_max = cloud_coverage_max self.apihub_path = apihub_path self.entries = [] if path: self.sentinel_path = path @classmethod def load(cls, base_params): """ Class method for instantiate a StaticBaseSentinel from a dict containing the parameters. Parameters ---------- base_params: dict Parameters of StaticBaseSentinel initialization Returns ------- sentinel_base: object of StaticBaseSentinel Loaded repository Examples -------- >>> static_database_dict = {'sentinel': {'global_sr':'EPSG:4326', \ 'cloud_max_coverage':10, \ 'path':'/home/sentinel', \ 'sentinel2_tile_grid': '/home/sentinel/tile_grid.kml'}} >>> StaticBaseSentinel.load(static_database_dict) """ # Global spatial ref ######################### global_sr = osr.SpatialReference() # global_sr.ImportFromEPSG(4326) # ############################################## sentinel_base = cls(global_sr=global_sr, **base_params) # sentinel_base.load_tiles() return sentinel_base def update(self, task, AOI, date=None, check_cloud=False): """ Update sentinel tiles overlapping with AOI within date. Parameters ---------- AOI: list of int Integer array containing the kml file positioning the Area of Interest. date: str String indicating the date of the image to be downloaded. check_cloud: bool If True, cloud coverage not be checked. If False, cloud coverage are checked. Returns ------- entries: list of SentinelTile SentinelTile instances of downloaded images Examples -------- >>>> date="02/2015" >>>> SentinelTile([255,2,5,87,210,21,....,56,87,96], date=date, check_cloud=False) [, ..... , ] """ # Reading AOI aoi_file = ogr.Open(AOI) aoi_file_layer = aoi_file.GetLayer() aoi_file_sr = aoi_file_layer.GetSpatialRef() # Setting transformation for AOI sr_trans = osr.CoordinateTransformation(aoi_file_sr, self.global_sr) AOI = list(aoi_file_layer) assert len(AOI) == 1 AOI = AOI[0] AOI_geom = AOI.GetGeometryRef() # Applying transformation AOI_geom.Transform(sr_trans) if not date: date = r'{}/{}'.format(datetime.date.month, datetime.date.year) entries = self.__getattribute__('get_from_{}'.format(self.source))(task, AOI_geom, date) # Check cloud coverage. If it is greater than a cloud_coverage_max search other date. if check_cloud: checked_entries = [] for entry in entries: checked_entries.append(self.__getattribute__('check_cloud_from_{}'.format(self.source))(entry, AOI)) # os.remove(AOI_file) return self.entries def load_tiles(self): """ Load all tiles within repository Returns ------- success: None """ entries_to_load = [] dirs = list(os.walk(self.sentinel_path)) for i in dirs: if len(i[2]) > 0: entries_to_load.append(i[0]) self.entries = SentinelTile.create_entries(entries_to_load) def get_from_sentinel(self, task, aoi, date): """ Get images from sentinel. Parameters ---------- aoi: osgeo.ogr.Geometry Area of Interest date: str Formatted string date, month/year Returns ------- exist_entries: list of SentinelTile Entries created """ # Get month and year month, year = date.split('/') date = [int(month), int(year)] global_sr = self.global_sr # kml_driver = ogr.GetDriverByName('KML') tile_grid_file = ogr.Open(self.tile_grid) tile_grid_layer = tile_grid_file.GetLayer() tile_grid_sr = tile_grid_layer.GetSpatialRef() if global_sr: trans = osr.CoordinateTransformation(tile_grid_sr, global_sr) else: trans = osr.CoordinateTransformation(tile_grid_sr, tile_grid_sr) tile_grid_elements = list(tile_grid_layer) tiles = [] # Get sentinel tiles which overlaps AOI for sentinel_tile in tile_grid_elements: sentinel_tile_geom = sentinel_tile.GetGeometryRef() sentinel_tile_geom.Transform(trans) sentinel_tile_geom_bb = sentinel_tile_geom.GetEnvelope() if aoi.Intersects(sentinel_tile_geom) and not ( sentinel_tile_geom_bb[0] == -180 and sentinel_tile_geom_bb[1] == 180): tile_name = sentinel_tile.GetField('name') tiles.append(tile_name) # Check if the images exist within repo download_images = [] exist_entries = [] for i in tiles: entries = self.search_by(['tile', 'date'], [i, datetime.date(date[1], date[0], 12)]) if not entries: download_images.append([date, i]) exist_entries.extend(entries) else: # Download images that not exists within the repo if len(download_images) > 0: print('{} sentinel images do not exists within the image repo.\n' 'Must be downloaded, this can take several minutes.'.format(len(download_images))) total_files = len(download_images) current_file = 0 for download_image in download_images: while task.FLAG_PAUSE: pass date = datetime.datetime.strptime("{}/{}".format(*download_image[0]), '%m/%Y').date() task.status_description = "Downloading tile {} within {}/{}".format(download_image[1], download_image[0][0], download_image[0][1]) self._download(download_image[1], date, aoi) current_file += 1 task.set_progress(current_file/total_files*100) return exist_entries def check_cloud_from_sentinel(self, entry, AOI_geom): """ Check cloud of sentinel images. If images's cloud coverage parameter is greater than `StaticBaseSentinel.cloud_coverage_max` go to other month and check it again. Parameters ---------- entry: SentinelTile Entry of the repository to check cloud coverage AOI_geom: osgeo.ogr.Geometry Geometry locating the zone of interest. Returns ------- entry: SentinelTile Entry with less cloud coverage percentage """ if entry.cloud > self.cloud_coverage_max: months_to_find = [1, -1, 2, -2, 3, -3, 4, -4] clouds_index = [] new_entries = [] for month in months_to_find: date = entry.date + dateutil.relativedelta(month=month) entries = self.search_by(['tile', 'date'], [entry.tile, date]) if len(entries) == 0: new_entry = self._download(entry.tile, entry.date, AOI_geom) else: new_entry = entries[0] clouds_index.append(new_entry.cloud) new_entries.append(new_entry) if new_entry.cloud < self.cloud_coverage_max: print('WARNING: The requested date ({}) does not have the minimun cloud coverage ({}). The image' 'has been replaced by date ({})'.format(entry.date, self.cloud_coverage_max, new_entry.date)) return new_entry else: # No optimal cloud photo found. Selecting the best optimum_cloud_value = sorted(clouds_index)[0] cloud_optimum_index = clouds_index.index(optimum_cloud_value) new_entry = new_entries[cloud_optimum_index] print('WARNING: The requested date ({}) does not have the minimun cloud coverage ({}). No optimum cloud ' 'coverage found within ten months, selecting the best result. The image ' 'has been replaced by date ({})'.format(entry.date, self.cloud_coverage_max, new_entry.date)) entry = new_entry else: entry = entry return entry def _download(self, tile, date, AOI_geom): """ Method to download sentinel image from the web. A script with path DOWNLOAD_SENTINEL_SCRIPT is used. See python DOWNLOAD_SENTINEL_SCRIPT --help for usage info Parameters ---------- tile: str Sentinel tile id. date: datetime.date Date to get the image (only month and year will be used) AOI_geom: osgeo.ogr.Geometry Geometry locating the zone of interest. Returns ------- entry: SentinelTile Image downloaded """ aoi_bb = AOI_geom.GetEnvelope() latmax = aoi_bb[3] latmin = aoi_bb[2] lonmax = aoi_bb[1] lonmin = aoi_bb[0] # Download images download_script_path = DOWNLOAD_SENTINEL_SCRIPT apihub_path = self.apihub_path month = str(date.month) year = str(date.year) download_image_path = os.path.join(os.path.abspath(''), self.sentinel_path, tile, year, month) command_string = '{} {} -a {} --latmin {} --latmax {} --lonmin {} --lonmax {} --id {}-{}-01 ' \ '--if {}-{}-28 -t {} -m 100 -r 1000 -w {} -s S2 --sg {}'.format(sys.executable, download_script_path, apihub_path, latmin, latmax, lonmin, lonmax, year, month, year, month, tile, download_image_path, self.tile_grid) os.system('mkdir -p {}'.format(download_image_path)) actual_path = os.path.abspath('') os.chdir(download_image_path) os.system(command_string) os.chdir(actual_path) entry = SentinelTile.create_entries([download_image_path]) self.entries.extend(entry) return entry def search_by(self, fields, values): """ Method to search entries within the repo by some fields. Parameters ---------- fields: list of str Any of SentinelTile attribute. values: list of Any Values to find in entries Returns ------- found_entries: list of SentinelTile Found entries whit requested values for requested fields Examples -------- >>>> StaticBaseSentinel.search_by(['tile', 'date'], ['29SMB',datetime.date(19, 8, 12)]) [] Entry of tile 29SMB and date 19/08/2019 """ found_entries = [] flag_string = "True " for index, field_value in enumerate(zip(fields, values)): flag_string += " and (entry.__getattribute__('{}') == values[{}])".format(field_value[0], index) for entry in self.entries: flag = eval(flag_string) if flag: found_entries.append(entry) return found_entries class SentinelTile(object): def __init__(self, path, name, cloud, date, tile, level): """ Class to handle parameters of each sentinel image Parameters ---------- path: str Path to the zip containing the sentinel data cloud: float Cloud coverage percentage of sentinel data date: datetime.date Date of sentinel image tile: str Sentinel tile ID """ self.path = path self.name = name self.cloud = cloud self.date = date self.tile = tile self.level = level @classmethod def create_entries(cls, images_path): """ Class method to create some instances of sentinel tiles Parameters ---------- images_path: list of str Path to images where a features.pick file must be present. Returns ------- entries: list of SentinelTile Created entries. """ entries = [] for j in images_path: # Parse features.pick path = os.path.join(j) features = pickle.load(open(os.path.join(j, 'features.pick'), 'rb')) cloud = features['cloudcoverpercentage'] tile = features['tile'] date = features['date'] level = features['level'] date = datetime.date(int(date[1]), int(date[0]), 12) _, _, filenames = os.walk(path).__next__() filename = [filename for filename in filenames if filename.split('.')[-1] in ['zip']][0] entries.append(cls(path, filename, cloud, date, tile, level)) return entries def exists(self): """ Method to check if an entry exists Returns ------- bool: True if exists. """ return os.path.exists(self.path) def extract(self): print(self.level, 'LEVEEEEEEEEL') self.__getattribute__('_extract_{}'.format(self.level.lower()))() def _extract_1c(self): ds = gdal.Open(os.path.join(self.path, self.name)) subdatasets = ds.GetSubDatasets() # L1C product files = {} for i in subdatasets[0:3]: result = subprocess.check_output(['gdalinfo', i[0]]).decode() result = result.split('\n') is_files = False for i in result: if re.match('^Size.*', i): is_files = False break if re.match('^Files: .*', i): is_files = True file = re.findall('^Files:(.*)', i)[0] try: band = re.findall('.*_B0?[0-9]+A?\.jp2', file)[0] except IndexError as err: continue files[band] = file if is_files: file = i.replace(' ', '') try: band = re.findall('.*_(B0?[0-9]+A?)\.jp2', file)[0] except IndexError as err: continue files[band] = file # Ordering keys_ordered = sorted(files.keys()) keys_ordered.insert(8, keys_ordered[-1]) keys_ordered.pop(-1) # Merge string file_to_merge = [files[key] for key in keys_ordered] result_name = os.path.join(self.path, ''.join(self.name.split('.')[0:-1])) # Creating m command = "gdal_merge.py -ps 10 10 -separate -o {}.tif {}".format(result_name, " ".join(file_to_merge)) os.system(command) ## Generate metadata string metadata = ds.GetMetadata() metadata_string = '' for metadata_key in metadata: metadata_string += '-mo \"{}={}\" '.format(metadata_key, metadata[metadata_key]) self.name = result_name + '.tif' gdal_edit_command = "gdal_edit.py {} {}".format(metadata_string, self.name) os.system(gdal_edit_command) def _extract_2a(self): ds = gdal.Open(os.path.join(self.path, self.name)) subdatasets = ds.GetSubDatasets() # L1C product files = {} for i in subdatasets[0:3]: result = subprocess.check_output(['gdalinfo', i[0]]).decode() result = result.split('\n') is_files = False for i in result: if re.match('^Size.*', i): is_files = False break if re.match('^Files: .*', i): is_files = True file = re.findall('^Files:(.*)', i)[0] try: band = re.findall('.*_B0?[0-9]+A?_.*\.jp2', file)[0] # Filtro para 2A except IndexError as err: continue files[band] = file if is_files: file = i.replace(' ', '') try: band = re.findall('.*_(B0?[0-9]+A?)_.*\.jp2', file)[0] # Filtro para 2A except IndexError as err: continue files[band] = file # Ordering keys_ordered = sorted(files.keys()) keys_ordered.insert(8, keys_ordered[-1]) keys_ordered.pop(-1) # Merge string file_to_merge = [files[key] for key in keys_ordered] result_name = os.path.join(self.path, ''.join(self.name.split('.')[0:-1])) # Creating m command = "gdal_merge.py -ps 10 10 -separate -o {}.tif {}".format(result_name, " ".join(file_to_merge)) os.system(command) ## Generate metadata string metadata = ds.GetMetadata() metadata_string = '' for metadata_key in metadata: metadata_string += '-mo \"{}={}\" '.format(metadata_key, metadata[metadata_key]) self.name = result_name + '.tif' gdal_edit_command = "gdal_edit.py {} {}".format(metadata_string, self.name) os.system(gdal_edit_command)