"""Fiona's GDAL/AWS environment""" from functools import wraps, total_ordering from inspect import getfullargspec import logging import os import re import threading import warnings import attr from fiona._env import ( GDALDataFinder, GDALEnv, PROJDataFinder, calc_gdal_version_num, get_gdal_config, get_gdal_release_name, get_gdal_version_num, set_gdal_config, set_proj_data_search_path, ) from fiona.errors import EnvError, FionaDeprecationWarning, GDALVersionError from fiona.session import Session, DummySession class ThreadEnv(threading.local): def __init__(self): self._env = None # Initialises in each thread # When the outermost 'fiona.Env()' executes '__enter__' it # probes the GDAL environment to see if any of the supplied # config options already exist, the assumption being that they # were set with 'osgeo.gdal.SetConfigOption()' or possibly # 'fiona.env.set_gdal_config()'. The discovered options are # reinstated when the outermost Fiona environment exits. # Without this check any environment options that are present in # the GDAL environment and are also passed to 'fiona.Env()' # will be unset when 'fiona.Env()' tears down, regardless of # their value. For example: # # from osgeo import gdal import fiona # # gdal.SetConfigOption('key', 'value') # with fiona.Env(key='something'): # pass # # The config option 'key' would be unset when 'Env()' exits. # A more comprehensive solution would also leverage # https://trac.osgeo.org/gdal/changeset/37273 but this gets # Fiona + older versions of GDAL halfway there. One major # assumption is that environment variables are not set directly # with 'osgeo.gdal.SetConfigOption()' OR # 'fiona.env.set_gdal_config()' inside of a 'fiona.Env()'. self._discovered_options = None local = ThreadEnv() log = logging.getLogger(__name__) class Env: """Abstraction for GDAL and AWS configuration The GDAL library is stateful: it has a registry of format drivers, an error stack, and dozens of configuration options. Fiona's approach to working with GDAL is to wrap all the state up using a Python context manager (see PEP 343, https://www.python.org/dev/peps/pep-0343/). When the context is entered GDAL drivers are registered, error handlers are configured, and configuration options are set. When the context is exited, drivers are removed from the registry and other configurations are removed. Example: with fiona.Env(GDAL_CACHEMAX=512) as env: # All drivers are registered, GDAL's raster block cache # size is set to 512MB. # Commence processing... ... # End of processing. # At this point, configuration options are set to their # previous (possible unset) values. A boto3 session or boto3 session constructor arguments `aws_access_key_id`, `aws_secret_access_key`, `aws_session_token` may be passed to Env's constructor. In the latter case, a session will be created as soon as needed. AWS credentials are configured for GDAL as needed. """ @classmethod def default_options(cls): """Default configuration options Parameters ---------- None Returns ------- dict """ return { "CHECK_WITH_INVERT_PROJ": True, "GTIFF_IMPLICIT_JPEG_OVR": False, "FIONA_ENV": True, } def __init__( self, session=None, aws_unsigned=False, profile_name=None, session_class=Session.aws_or_dummy, **options ): """Create a new GDAL/AWS environment. Note: this class is a context manager. GDAL isn't configured until the context is entered via `with fiona.Env():` Parameters ---------- session : optional A Session object. aws_unsigned : bool, optional Do not sign cloud requests. profile_name : str, optional A shared credentials profile name, as per boto3. session_class : Session, optional A sub-class of Session. **options : optional A mapping of GDAL configuration options, e.g., `CPL_DEBUG=True, CHECK_WITH_INVERT_PROJ=False`. Returns ------- Env Notes ----- We raise EnvError if the GDAL config options AWS_ACCESS_KEY_ID or AWS_SECRET_ACCESS_KEY are given. AWS credentials are handled exclusively by boto3. Examples -------- >>> with Env(CPL_DEBUG=True, CPL_CURL_VERBOSE=True): ... with fiona.open("zip+https://example.com/a.zip") as col: ... print(col.profile) For access to secured cloud resources, a Fiona Session or a foreign session object may be passed to the constructor. >>> import boto3 >>> from fiona.session import AWSSession >>> boto3_session = boto3.Session(...) >>> with Env(AWSSession(boto3_session)): ... with fiona.open("zip+s3://example/a.zip") as col: ... print(col.profile """ aws_access_key_id = options.pop("aws_access_key_id", None) # Warn deprecation in 1.9, remove in 2.0. if aws_access_key_id: warnings.warn( "Passing abstract session keyword arguments is deprecated. " "Pass a Fiona AWSSession object instead.", FionaDeprecationWarning, ) aws_secret_access_key = options.pop("aws_secret_access_key", None) aws_session_token = options.pop("aws_session_token", None) region_name = options.pop("region_name", None) if not {"AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"}.isdisjoint(options): raise EnvError( "GDAL's AWS config options can not be directly set. " "AWS credentials are handled exclusively by boto3." ) if session: # Passing a session via keyword argument is the canonical # way to configure access to secured cloud resources. # Warn deprecation in 1.9, remove in 2.0. if not isinstance(session, Session): warnings.warn( "Passing a boto3 session is deprecated. Pass a Fiona AWSSession object instead.", FionaDeprecationWarning, ) session = Session.aws_or_dummy(session=session) self.session = session elif aws_access_key_id or profile_name or aws_unsigned: self.session = Session.aws_or_dummy( aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_session_token=aws_session_token, region_name=region_name, profile_name=profile_name, aws_unsigned=aws_unsigned, ) elif {"AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"}.issubset(os.environ.keys()): self.session = Session.from_environ() else: self.session = DummySession() self.options = options.copy() self.context_options = {} @classmethod def from_defaults(cls, *args, **kwargs): """Create an environment with default config options Parameters ---------- args : optional Positional arguments for Env() kwargs : optional Keyword arguments for Env() Returns ------- Env Notes ----- The items in kwargs will be overlaid on the default values. """ options = Env.default_options() options.update(**kwargs) return Env(*args, **options) def credentialize(self): """Get credentials and configure GDAL Note well: this method is a no-op if the GDAL environment already has credentials, unless session is not None. Returns ------- None """ cred_opts = self.session.get_credential_options() self.options.update(**cred_opts) setenv(**cred_opts) def drivers(self): """Return a mapping of registered drivers.""" return local._env.drivers() def _dump_open_datasets(self): """Writes descriptions of open datasets to stderr For debugging and testing purposes. """ return local._env._dump_open_datasets() def __enter__(self): if local._env is None: self._has_parent_env = False # See note directly above where _discovered_options is globally # defined. This MUST happen before calling 'defenv()'. local._discovered_options = {} # Don't want to reinstate the "RASTERIO_ENV" option. probe_env = {k for k in self.options.keys() if k != "RASTERIO_ENV"} for key in probe_env: val = get_gdal_config(key, normalize=False) if val is not None: local._discovered_options[key] = val defenv(**self.options) self.context_options = {} else: self._has_parent_env = True self.context_options = getenv() setenv(**self.options) self.credentialize() return self def __exit__(self, exc_type=None, exc_val=None, exc_tb=None): delenv() if self._has_parent_env: defenv() setenv(**self.context_options) else: # See note directly above where _discovered_options is globally # defined. while local._discovered_options: key, val = local._discovered_options.popitem() set_gdal_config(key, val, normalize=False) local._discovered_options = None def defenv(**options): """Create a default environment if necessary.""" if not local._env: local._env = GDALEnv() local._env.update_config_options(**options) local._env.start() def getenv(): """Get a mapping of current options.""" if not local._env: raise EnvError("No GDAL environment exists") else: return local._env.options.copy() def hasenv(): return bool(local._env) def setenv(**options): """Set options in the existing environment.""" if not local._env: raise EnvError("No GDAL environment exists") else: local._env.update_config_options(**options) def hascreds(): warnings.warn("Please use Env.session.hascreds() instead", FionaDeprecationWarning) return local._env is not None and all( key in local._env.get_config_options() for key in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] ) def delenv(): """Delete options in the existing environment.""" if not local._env: raise EnvError("No GDAL environment exists") else: local._env.clear_config_options() local._env.stop() local._env = None class NullContextManager: def __init__(self): pass def __enter__(self): return self def __exit__(self, *args): pass def env_ctx_if_needed(): """Return an Env if one does not exist Returns ------- Env or a do-nothing context manager """ if local._env: return NullContextManager() else: return Env.from_defaults() def ensure_env(f): """A decorator that ensures an env exists before a function calls any GDAL C functions. Parameters ---------- f : function A function. Returns ------- A function wrapper. Notes ----- If there is already an existing environment, the wrapper does nothing and immediately calls f with the given arguments. """ @wraps(f) def wrapper(*args, **kwargs): if local._env: return f(*args, **kwargs) else: with Env.from_defaults(): return f(*args, **kwargs) return wrapper def ensure_env_with_credentials(f): """Ensures a config environment exists and has credentials. Parameters ---------- f : function A function. Returns ------- A function wrapper. Notes ----- The function wrapper checks the first argument of f and credentializes the environment if the first argument is a URI with scheme "s3". If there is already an existing environment, the wrapper does nothing and immediately calls f with the given arguments. """ @wraps(f) def wrapper(*args, **kwds): if local._env: env_ctor = Env else: env_ctor = Env.from_defaults fp_arg = kwds.get("fp", None) or args[0] if isinstance(fp_arg, str): session_cls = Session.cls_from_path(fp_arg) if local._env and session_cls.hascreds(getenv()): session_cls = DummySession session = session_cls() else: session = DummySession() with env_ctor(session=session): return f(*args, **kwds) return wrapper @attr.s(slots=True) @total_ordering class GDALVersion: """Convenience class for obtaining GDAL major and minor version components and comparing between versions. This is highly simplistic and assumes a very normal numbering scheme for versions and ignores everything except the major and minor components. """ major = attr.ib(default=0, validator=attr.validators.instance_of(int)) minor = attr.ib(default=0, validator=attr.validators.instance_of(int)) def __eq__(self, other): return (self.major, self.minor) == tuple(other.major, other.minor) def __lt__(self, other): return (self.major, self.minor) < tuple(other.major, other.minor) def __repr__(self): return f"GDALVersion(major={self.major}, minor={self.minor})" def __str__(self): return f"{self.major}.{self.minor}" @classmethod def parse(cls, input): """ Parses input tuple or string to GDALVersion. If input is a GDALVersion instance, it is returned. Parameters ---------- input: tuple of (major, minor), string, or instance of GDALVersion Returns ------- GDALVersion instance """ if isinstance(input, cls): return input if isinstance(input, tuple): return cls(*input) elif isinstance(input, str): # Extract major and minor version components. # alpha, beta, rc suffixes ignored match = re.search(r"^\d+\.\d+", input) if not match: raise ValueError( "value does not appear to be a valid GDAL version " "number: {}".format(input) ) major, minor = (int(c) for c in match.group().split(".")) return cls(major=major, minor=minor) raise TypeError("GDALVersion can only be parsed from a string or tuple") @classmethod def runtime(cls): """Return GDALVersion of current GDAL runtime""" return cls.parse(get_gdal_release_name()) def at_least(self, other): other = self.__class__.parse(other) return self >= other def require_gdal_version( version, param=None, values=None, is_max_version=False, reason="" ): """A decorator that ensures the called function or parameters are supported by the runtime version of GDAL. Raises GDALVersionError if conditions are not met. Examples: \b @require_gdal_version('2.2') def some_func(): calling `some_func` with a runtime version of GDAL that is < 2.2 raises a GDALVersionErorr. \b @require_gdal_version('2.2', param='foo') def some_func(foo='bar'): calling `some_func` with parameter `foo` of any value on GDAL < 2.2 raises a GDALVersionError. \b @require_gdal_version('2.2', param='foo', values=('bar',)) def some_func(foo=None): calling `some_func` with parameter `foo` and value `bar` on GDAL < 2.2 raises a GDALVersionError. Parameters ------------ version: tuple, string, or GDALVersion param: string (optional, default: None) If `values` are absent, then all use of this parameter with a value other than default value requires at least GDAL `version`. values: tuple, list, or set (optional, default: None) contains values that require at least GDAL `version`. `param` is required for `values`. is_max_version: bool (optional, default: False) if `True` indicates that the version provided is the maximum version allowed, instead of requiring at least that version. reason: string (optional: default: '') custom error message presented to user in addition to message about GDAL version. Use this to provide an explanation of what changed if necessary context to the user. Returns --------- wrapped function """ if values is not None: if param is None: raise ValueError("require_gdal_version: param must be provided with values") if not isinstance(values, (tuple, list, set)): raise ValueError( "require_gdal_version: values must be a tuple, list, or set" ) version = GDALVersion.parse(version) runtime = GDALVersion.runtime() inequality = ">=" if runtime < version else "<=" reason = f"\n{reason}" if reason else reason def decorator(f): @wraps(f) def wrapper(*args, **kwds): if (runtime < version and not is_max_version) or ( is_max_version and runtime > version ): if param is None: raise GDALVersionError( "GDAL version must be {} {}{}".format( inequality, str(version), reason ) ) # normalize args and kwds to dict argspec = getfullargspec(f) full_kwds = kwds.copy() if argspec.args: full_kwds.update(dict(zip(argspec.args[: len(args)], args))) if argspec.defaults: defaults = dict( zip(reversed(argspec.args), reversed(argspec.defaults)) ) else: defaults = {} if param in full_kwds: if values is None: if param not in defaults or ( full_kwds[param] != defaults[param] ): raise GDALVersionError( 'usage of parameter "{}" requires ' "GDAL {} {}{}".format( param, inequality, version, reason ) ) elif full_kwds[param] in values: raise GDALVersionError( 'parameter "{}={}" requires ' "GDAL {} {}{}".format( param, full_kwds[param], inequality, version, reason ) ) return f(*args, **kwds) return wrapper return decorator # Patch the environment if needed, such as in the installed wheel case. if "GDAL_DATA" not in os.environ: path = GDALDataFinder().search_wheel() if path: log.debug("GDAL data found in package: path=%r.", path) set_gdal_config("GDAL_DATA", path) # See https://github.com/mapbox/rasterio/issues/1631. elif GDALDataFinder().find_file("header.dxf"): log.debug("GDAL data files are available at built-in paths.") else: path = GDALDataFinder().search() if path: set_gdal_config("GDAL_DATA", path) log.debug("GDAL data found in other locations: path=%r.", path) if 'PROJ_DATA' in os.environ: # PROJ 9.1+ path = os.environ["PROJ_DATA"] set_proj_data_search_path(path) elif "PROJ_LIB" in os.environ: # PROJ < 9.1 path = os.environ["PROJ_LIB"] set_proj_data_search_path(path) elif PROJDataFinder().search_wheel(): path = PROJDataFinder().search_wheel() log.debug("PROJ data found in package: path=%r.", path) set_proj_data_search_path(path) # See https://github.com/mapbox/rasterio/issues/1631. elif PROJDataFinder().has_data(): log.debug("PROJ data files are available at built-in paths.") else: path = PROJDataFinder().search() if path: log.debug("PROJ data found in other locations: path=%r.", path) set_proj_data_search_path(path)