"""Utility functions.""" import re import sys from packaging.version import Version from pathlib import Path from typing import Union from urllib.parse import urlparse from pyogrio._vsi import vsimem_rmtree_toplevel as _vsimem_rmtree_toplevel def get_vsi_path_or_buffer(path_or_buffer): """Get VSI-prefixed path or bytes buffer depending on type of path_or_buffer. If path_or_buffer is a bytes object, it will be returned directly and will be read into an in-memory dataset when passed to one of the Cython functions. If path_or_buffer is a file-like object with a read method, bytes will be read from the file-like object and returned. Otherwise, it will be converted to a string, and parsed to prefix with appropriate GDAL /vsi*/ prefixes. Parameters ---------- path_or_buffer : str, pathlib.Path, bytes, or file-like A dataset path or URI, raw buffer, or file-like object with a read method. Returns ------- str or bytes """ # treat Path objects here already to ignore their read method + to avoid backslashes # on Windows. if isinstance(path_or_buffer, Path): return vsi_path(path_or_buffer) if isinstance(path_or_buffer, bytes): return path_or_buffer if hasattr(path_or_buffer, "read"): bytes_buffer = path_or_buffer.read() # rewind buffer if possible so that subsequent operations do not need to rewind if hasattr(path_or_buffer, "seekable") and path_or_buffer.seekable(): path_or_buffer.seek(0) return bytes_buffer return vsi_path(str(path_or_buffer)) def vsi_path(path: Union[str, Path]) -> str: """Ensure path is a local path or a GDAL-compatible VSI path.""" # Convert Path objects to string, but for VSI paths, keep posix style path. if isinstance(path, Path): if sys.platform == "win32" and path.as_posix().startswith("/vsi"): path = path.as_posix() else: path = str(path) # path is already in GDAL format if path.startswith("/vsi"): return path # Windows drive letters (e.g. "C:\") confuse `urlparse` as they look like # URL schemes if sys.platform == "win32" and re.match("^[a-zA-Z]\\:", path): if not path.split("!")[0].endswith(".zip"): return path # prefix then allow to proceed with remaining parsing path = f"zip://{path}" path, archive, scheme = _parse_uri(path) if scheme or archive or path.endswith(".zip"): return _construct_vsi_path(path, archive, scheme) return path # Supported URI schemes and their mapping to GDAL's VSI suffix. SCHEMES = { "file": "file", "zip": "zip", "tar": "tar", "gzip": "gzip", "http": "curl", "https": "curl", "ftp": "curl", "s3": "s3", "gs": "gs", "az": "az", "adls": "adls", "adl": "adls", # fsspec uses this "hdfs": "hdfs", "webhdfs": "webhdfs", # GDAL additionally supports oss and swift for remote filesystems, but # those are for now not added as supported URI } CURLSCHEMES = {k for k, v in SCHEMES.items() if v == "curl"} def _parse_uri(path: str): """Parse a URI. Returns a tuples of (path, archive, scheme) path : str Parsed path. Includes the hostname and query string in the case of a URI. archive : str Parsed archive path. scheme : str URI scheme such as "https" or "zip+s3". """ parts = urlparse(path, allow_fragments=False) # if the scheme is not one of GDAL's supported schemes, return raw path if parts.scheme and not all(p in SCHEMES for p in parts.scheme.split("+")): return path, "", "" # we have a URI path = parts.path scheme = parts.scheme or "" if parts.query: path += "?" + parts.query if parts.scheme and parts.netloc: path = parts.netloc + path parts = path.split("!") path = parts.pop() if parts else "" archive = parts.pop() if parts else "" return (path, archive, scheme) def _construct_vsi_path(path, archive, scheme) -> str: """Convert a parsed path to a GDAL VSI path.""" prefix = "" suffix = "" schemes = scheme.split("+") if "zip" not in schemes and (archive.endswith(".zip") or path.endswith(".zip")): schemes.insert(0, "zip") if schemes: prefix = "/".join(f"vsi{SCHEMES[p]}" for p in schemes if p and p != "file") if schemes[-1] in CURLSCHEMES: suffix = f"{schemes[-1]}://" if prefix: if archive: return "/{}/{}{}/{}".format(prefix, suffix, archive, path.lstrip("/")) else: return f"/{prefix}/{suffix}{path}" return path def _preprocess_options_key_value(options): """Preprocess options. For example, `spatial_index=True` gets converted to `SPATIAL_INDEX="YES"`. """ if not isinstance(options, dict): raise TypeError(f"Expected options to be a dict, got {type(options)}") result = {} for k, v in options.items(): if v is None: continue k = k.upper() if isinstance(v, bool): v = "ON" if v else "OFF" else: v = str(v) result[k] = v return result def _mask_to_wkb(mask): """Convert a Shapely mask geometry to WKB. Parameters ---------- mask : Shapely geometry The geometry to convert to WKB. Returns ------- WKB bytes or None Raises ------ ValueError raised if Shapely >= 2.0 is not available or mask is not a Shapely Geometry object """ if mask is None: return mask try: import shapely if Version(shapely.__version__) < Version("2.0.0"): shapely = None except ImportError: shapely = None if not shapely: raise ValueError("'mask' parameter requires Shapely >= 2.0") if not isinstance(mask, shapely.Geometry): raise ValueError("'mask' parameter must be a Shapely geometry") return shapely.to_wkb(mask) def vsimem_rmtree_toplevel(path: Union[str, Path]): """Remove the parent directory of the file path recursively. This is used for final cleanup of an in-memory dataset, which may have been created within a directory to contain sibling files. Additional VSI handlers may be chained to the left of /vsimem/ in path and will be ignored. Remark: function is defined here to be able to run tests on it. Parameters ---------- path : str or pathlib.Path path to in-memory file """ if isinstance(path, Path): path = path.as_posix() _vsimem_rmtree_toplevel(path)