import re import subprocess from django.conf import settings # Fix gdal 2.40 and 3.3 integration problems try: import gdal import osr import ogr except ModuleNotFoundError: from osgeo import gdal, ogr, osr import numpy as np from subprocess import Popen, PIPE class CustomSet(list): def add(self, element): chosen_element = [j for j in self[0:] if j[0] == element[0]] if not chosen_element: self.append(element) else: chosen_element = chosen_element[0] chosen_element[1] = element[1] class GdalOptions(object): def __init__(self): self.parameters_dict = {} def update_by_string(self, parameters_string): elem_parameters = re.findall(r'-(.*?) (?=(?:-|$))', parameters_string+" ") for elem in elem_parameters: if "=" in elem: multi = True else: multi = False split = elem.split(" ") key = split[0] value = ' '.join(split[1:]) if not value: value = 'True' if key in self.parameters_dict.keys(): if multi: sub_key, sub_value = value.split('=') self.parameters_dict[key].add([sub_key, sub_value]) else: self.parameters_dict[key] = value else: if multi: sub_key, sub_value = value.split('=') self.parameters_dict[key] = CustomSet() self.parameters_dict[key].add([sub_key, sub_value]) else: self.parameters_dict[key] = value def export_params(self): exporting_string = "" for key, value in self.parameters_dict.items(): if isinstance(value, list): for val in value: exporting_string += "-{} {} ".format(key, "=".join(val)) else: if value.lower() == "false": continue if value.lower() == "true": value = "" exporting_string += "-{} {} ".format(key, value) return exporting_string @classmethod def parse_parameters(cls, parameters, default): # Load defaults options = cls() options.update_by_string(default) # Load requested options if parameters: options.update_by_string(parameters) # params_string = options.export_params() return options def __getitem__(self, item): return self.parameters_dict[item] def keys(self): return self.parameters_dict.keys() def gdal_command_progress(command, task,initial=0,finish=100): command = command.replace("-v", "") popen = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True) progress=initial sum_string="" while popen.poll() is None: sum_string += popen.stdout.read(1).decode() results = re.findall('\.([0-9]{2,3})\.', sum_string) if len(results) > 0: progress= (finish - initial) / 100 * float(results[-1]) + initial task.set_progress(progress) if settings.DEBUG: print("\rProgress: {}".format(progress), end="") popen.communicate() return popen def polygon_from_points(points): ring = ogr.Geometry(ogr.wkbLinearRing) for point in points: ring.AddPoint(point[0], point[1]) else: ring.AddPoint(*points[0]) polygon = ogr.Geometry(ogr.wkbPolygon) polygon.AddGeometry(ring) return polygon def is_overlapping(raster, polygon): raster = gdal.Open(raster) polygon = polygon.Clone() if not raster: return False # Get raster geometry upx, xres, xskew, upy, yskew, yres = raster.GetGeoTransform() cols = raster.RasterXSize rows = raster.RasterYSize ulx = upx + 0 * xres + 0 * xskew uly = upy + 0 * yskew + 0 * yres llx = upx + 0 * xres + rows * xskew lly = upy + 0 * yskew + rows * yres lrx = upx + cols * xres + rows * xskew lry = upy + cols * yskew + rows * yres urx = upx + cols * xres + 0 * xskew ury = upy + cols * yskew + 0 * yres raster_geometry = polygon_from_points([[ulx, uly], [llx, lly], [lrx, lry], [urx, ury]]) raster_sr = osr.SpatialReference(wkt=raster.GetProjection()) polygon_sr = polygon.GetSpatialReference() trans = osr.CoordinateTransformation(polygon_sr, raster_sr) polygon.Transform(trans) overlaps = raster_geometry.Contains(polygon) or raster_geometry.Intersect( polygon) return overlaps def square_grid_polygon(meters, master_polygon, gen_grid=None, offset=0): tiles = [] meters = np.arcsin(meters / 6371000) * 180 / np.pi offset = np.arcsin(offset / 6371000) * 180 / np.pi master_polygon = master_polygon.Clone() master_pol_bb = master_polygon.GetEnvelope() uly_AOI = master_pol_bb[3] lry_AOI = master_pol_bb[2] lrx_AOI = master_pol_bb[1] ulx_AOI = master_pol_bb[0] AOI_width = lrx_AOI - ulx_AOI AOI_height = uly_AOI - lry_AOI width_tiles = int(AOI_width / meters) height_tiles = int(AOI_height / meters) ulx = ulx_AOI - offset for _ in range(width_tiles): brx = ulx + meters + 2*offset uly = uly_AOI + offset for _ in range(height_tiles): bry = uly - meters - 2*offset polygon = polygon_from_points([[ulx, uly], [brx, uly], [brx, bry], [ulx, bry]]) if master_polygon.Intersects(polygon) or master_polygon.Overlaps( polygon) or master_polygon.Contains(polygon): tiles.append(polygon) uly = bry + 2*offset else: bry = lry_AOI uly = lry_AOI + meters + 2*offset polygon = polygon_from_points([[ulx, uly], [brx, uly], [brx, bry], [ulx, bry]]) if master_polygon.Intersects(polygon) or master_polygon.Overlaps( polygon) or master_polygon.Contains(polygon): tiles.append(polygon) ulx = brx - 2*offset else: brx = lrx_AOI ulx = lrx_AOI - meters - 2*offset uly = uly_AOI + offset for _ in range(height_tiles): bry = uly - meters - 2 * offset polygon = polygon_from_points([[ulx, uly], [brx, uly], [brx, bry], [ulx, bry]]) if master_polygon.Intersects(polygon) or master_polygon.Overlaps( polygon) or master_polygon.Contains(polygon): tiles.append(polygon) uly = bry + 2 * offset else: bry = lry_AOI uly = lry_AOI + meters + 2 * offset polygon = polygon_from_points([[ulx, uly], [brx, uly], [brx, bry], [ulx, bry]]) if master_polygon.Intersects(polygon) or master_polygon.Overlaps( polygon) or master_polygon.Contains(polygon): tiles.append(polygon) if gen_grid: driver = ogr.GetDriverByName('ESRI Shapefile') output = gen_grid datasource = driver.CreateDataSource(output) layer = datasource.CreateLayer(output, geom_type=ogr.wkbPolygon, srs=master_polygon.GetSpatialReference()) for tile in tiles: feature = ogr.Feature(layer.GetLayerDefn()) feature.SetGeometry(tile) layer.CreateFeature(feature) feature.Destroy() datasource.Destroy() return tiles def get_EPSG(spatialRef): if "unknown" in spatialRef and "GRS80" in spatialRef and\ "UTM" in spatialRef: spatialRef_wkt= spatialRef.replace( "unknown", "European_Terrestrial_Reference_System_1989") # spatialRef = layer.GetSpatialRef() # spatialRef.ImportFromWkt(spatialRef_new) # spatialRef_wkt = str(spatialRef.ExportToWkt()) else: spatialRef_wkt = spatialRef p = Popen( "gdalsrsinfo -e '{}'".format(spatialRef_wkt), shell=True, # NOTE: don't use a list argument with shell=True on POSIX stdin=PIPE, stdout=PIPE, stderr=PIPE, bufsize=-1, universal_newlines=True) data2SGR = [] stdout_srsinfo = list(p.stdout) for index, line in enumerate(stdout_srsinfo): if 'Confidence in this match' in line: linesplit = line.split(': ') array = [int(linesplit[1][:-2]), stdout_srsinfo[index + 2][:-1]] data2SGR.append(array) array = [] max_index = [] max_value = max([x[0] for x in data2SGR]) posibleEPSG = [] for i in data2SGR: if i[0] == max_value: split_EPSG = i[1].split(':') pEPSG = [split_EPSG[1], len(split_EPSG[1])] posibleEPSG.append(pEPSG) pEPSG = [] max_value_EPSG = max([x[1] for x in posibleEPSG]) for i in posibleEPSG: if i[1] == max_value_EPSG: EPSG = i[0] return EPSG