"""Abstraction for sessions in various clouds.""" import logging import os from rasterio.path import parse_path, UnparsedPath log = logging.getLogger(__name__) try: import boto3 except ImportError: log.debug("Could not import boto3, continuing with reduced functionality.") boto3 = None class Session(object): """Base for classes that configure access to secured resources. Attributes ---------- credentials : dict Keys and values for session credentials. Notes ----- This class is not intended to be instantiated. """ @classmethod def hascreds(cls, config): """Determine if the given configuration has proper credentials Parameters ---------- cls : class A Session class. config : dict GDAL configuration as a dict. Returns ------- bool """ return NotImplemented def get_credential_options(self): """Get credentials as GDAL configuration options Returns ------- dict """ return NotImplemented @staticmethod def from_foreign_session(session, cls=None): """Create a session object matching the foreign `session`. Parameters ---------- session : obj A foreign session object. cls : Session class, optional The class to return. Returns ------- Session """ if not cls: return DummySession() else: return cls(session) @staticmethod def cls_from_path(path): """Find the session class suited to the data at `path`. Parameters ---------- path : str A dataset path or identifier. Returns ------- class """ if not path: return DummySession path = parse_path(path) if isinstance(path, UnparsedPath) or path.is_local: return DummySession elif ( path.scheme == "s3" or "amazonaws.com" in path.path ) and not "X-Amz-Signature" in path.path: if boto3 is not None: return AWSSession else: log.info("boto3 not available, falling back to a DummySession.") return DummySession elif path.scheme == "oss" or "aliyuncs.com" in path.path: return OSSSession elif path.path.startswith("/vsiswift/"): return SwiftSession elif path.scheme == "az": return AzureSession # This factory can be extended to other cloud providers here. # elif path.scheme == "cumulonimbus": # for example. # return CumulonimbusSession(*args, **kwargs) else: return DummySession @staticmethod def from_path(path, *args, **kwargs): """Create a session object suited to the data at `path`. Parameters ---------- path : str A dataset path or identifier. args : sequence Positional arguments for the foreign session constructor. kwargs : dict Keyword arguments for the foreign session constructor. Returns ------- Session """ return Session.cls_from_path(path)(*args, **kwargs) @staticmethod def aws_or_dummy(*args, **kwargs): """Create an AWSSession if boto3 is available, else DummySession Parameters ---------- path : str A dataset path or identifier. args : sequence Positional arguments for the foreign session constructor. kwargs : dict Keyword arguments for the foreign session constructor. Returns ------- Session """ if boto3 is not None: return AWSSession(*args, **kwargs) else: return DummySession(*args, **kwargs) @staticmethod def from_environ(*args, **kwargs): """Create a session object suited to the environment. Parameters ---------- path : str A dataset path or identifier. args : sequence Positional arguments for the foreign session constructor. kwargs : dict Keyword arguments for the foreign session constructor. Returns ------- Session """ try: session = Session.aws_or_dummy(*args, **kwargs) session.credentials except RuntimeError as exc: log.warning("Credentials in environment have expired. Creating a DummySession.") session = DummySession(*args, **kwargs) return session class DummySession(Session): """A dummy session. Attributes ---------- credentials : dict The session credentials. """ def __init__(self, *args, **kwargs): self._session = None self.credentials = {} @classmethod def hascreds(cls, config): """Determine if the given configuration has proper credentials Parameters ---------- cls : class A Session class. config : dict GDAL configuration as a dict. Returns ------- bool """ return True def get_credential_options(self): """Get credentials as GDAL configuration options Returns ------- dict """ return {} class AWSSession(Session): """Configures access to secured resources stored in AWS S3. """ def __init__( self, session=None, aws_unsigned=False, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, region_name=None, profile_name=None, endpoint_url=None, requester_pays=False): """Create a new AWS session Parameters ---------- session : optional A boto3 session object. aws_unsigned : bool, optional (default: False) If True, requests will be unsigned. aws_access_key_id : str, optional An access key id, as per boto3. aws_secret_access_key : str, optional A secret access key, as per boto3. aws_session_token : str, optional A session token, as per boto3. region_name : str, optional A region name, as per boto3. profile_name : str, optional A shared credentials profile name, as per boto3. endpoint_url: str, optional An endpoint_url, as per GDAL's AWS_S3_ENPOINT requester_pays : bool, optional True if the requester agrees to pay transfer costs (default: False) """ if session: self._session = session else: self._session = boto3.Session( aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_session_token=aws_session_token, region_name=region_name, profile_name=profile_name) self.requester_pays = requester_pays self.unsigned = bool(os.getenv("AWS_NO_SIGN_REQUEST", aws_unsigned)) self.endpoint_url = endpoint_url self._creds = ( self._session._session.get_credentials() if not self.unsigned and self._session else None ) @classmethod def hascreds(cls, config): """Determine if the given configuration has proper credentials Parameters ---------- cls : class A Session class. config : dict GDAL configuration as a dict. Returns ------- bool """ return ('AWS_ACCESS_KEY_ID' in config and 'AWS_SECRET_ACCESS_KEY' in config) or 'AWS_NO_SIGN_REQUEST' in config @property def credentials(self): """The session credentials as a dict""" res = {} if self._creds: # pragma: no branch frozen_creds = self._creds.get_frozen_credentials() if frozen_creds.access_key: # pragma: no branch res['aws_access_key_id'] = frozen_creds.access_key if frozen_creds.secret_key: # pragma: no branch res['aws_secret_access_key'] = frozen_creds.secret_key if frozen_creds.token: res['aws_session_token'] = frozen_creds.token if self._session.region_name: res['aws_region'] = self._session.region_name if self.requester_pays: res['aws_request_payer'] = 'requester' if self.endpoint_url: res['aws_s3_endpoint'] = self.endpoint_url return res def get_credential_options(self): """Get credentials as GDAL configuration options Returns ------- dict """ if self.unsigned: opts = {'AWS_NO_SIGN_REQUEST': 'YES'} if 'aws_region' in self.credentials: opts['AWS_REGION'] = self.credentials['aws_region'] return opts else: return {k.upper(): v for k, v in self.credentials.items()} class OSSSession(Session): """Configures access to secured resources stored in Alibaba Cloud OSS. """ def __init__(self, oss_access_key_id=None, oss_secret_access_key=None, oss_endpoint=None): """Create new Alibaba Cloud OSS session Parameters ---------- oss_access_key_id: string, optional (default: None) An access key id oss_secret_access_key: string, optional (default: None) An secret access key oss_endpoint: string, optional (default: None) the region attached to the bucket """ self._creds = { "oss_access_key_id": oss_access_key_id, "oss_secret_access_key": oss_secret_access_key, "oss_endpoint": oss_endpoint } @classmethod def hascreds(cls, config): """Determine if the given configuration has proper credentials Parameters ---------- cls : class A Session class. config : dict GDAL configuration as a dict. Returns ------- bool """ return 'OSS_ACCESS_KEY_ID' in config and 'OSS_SECRET_ACCESS_KEY' in config @property def credentials(self): """The session credentials as a dict""" return self._creds def get_credential_options(self): """Get credentials as GDAL configuration options Returns ------- dict """ return {k.upper(): v for k, v in self.credentials.items()} class GSSession(Session): """Configures access to secured resources stored in Google Cloud Storage """ def __init__(self, google_application_credentials=None): """Create new Google Cloude Storage session Parameters ---------- google_application_credentials: string Path to the google application credentials JSON file. """ if google_application_credentials is not None: self._creds = {'google_application_credentials': google_application_credentials} else: self._creds = {} @classmethod def hascreds(cls, config): """Determine if the given configuration has proper credentials Parameters ---------- cls : class A Session class. config : dict GDAL configuration as a dict. Returns ------- bool """ return 'GOOGLE_APPLICATION_CREDENTIALS' in config @property def credentials(self): """The session credentials as a dict""" return self._creds def get_credential_options(self): """Get credentials as GDAL configuration options Returns ------- dict """ return {k.upper(): v for k, v in self.credentials.items()} class SwiftSession(Session): """Configures access to secured resources stored in OpenStack Swift Object Storage. """ def __init__(self, session=None, swift_storage_url=None, swift_auth_token=None, swift_auth_v1_url=None, swift_user=None, swift_key=None): """Create new OpenStack Swift Object Storage Session. Three methods are possible: 1. Create session by the swiftclient library. 2. The SWIFT_STORAGE_URL and SWIFT_AUTH_TOKEN (this method is recommended by GDAL docs). 3. The SWIFT_AUTH_V1_URL, SWIFT_USER and SWIFT_KEY (This depends on the swiftclient library). Parameters ---------- session: optional A swiftclient connection object swift_storage_url: the storage URL swift_auth_token: the value of the x-auth-token authorization token swift_storage_url: string, optional authentication URL swift_user: string, optional user name to authenticate as swift_key: string, optional key/password to authenticate with Examples -------- >>> import rasterio >>> from rasterio.session import SwiftSession >>> fp = '/vsiswift/bucket/key.tif' >>> conn = Connection(authurl='http://127.0.0.1:7777/auth/v1.0', user='test:tester', key='testing') >>> session = SwiftSession(conn) >>> with rasterio.Env(session): >>> with rasterio.open(fp) as src: >>> print(src.profile) """ if swift_storage_url and swift_auth_token: self._creds = { "swift_storage_url": swift_storage_url, "swift_auth_token": swift_auth_token } else: from swiftclient.client import Connection if session: self._session = session else: self._session = Connection( authurl=swift_auth_v1_url, user=swift_user, key=swift_key ) self._creds = { "swift_storage_url": self._session.get_auth()[0], "swift_auth_token": self._session.get_auth()[1] } @classmethod def hascreds(cls, config): """Determine if the given configuration has proper credentials Parameters ---------- cls : class A Session class. config : dict GDAL configuration as a dict. Returns ------- bool """ return 'SWIFT_STORAGE_URL' in config and 'SWIFT_AUTH_TOKEN' in config @property def credentials(self): """The session credentials as a dict""" return self._creds def get_credential_options(self): """Get credentials as GDAL configuration options Returns ------- dict """ return {k.upper(): v for k, v in self.credentials.items()} class AzureSession(Session): """Configures access to secured resources stored in Microsoft Azure Blob Storage. """ def __init__(self, azure_storage_connection_string=None, azure_storage_account=None, azure_storage_access_key=None, azure_unsigned=False): """Create new Microsoft Azure Blob Storage session Parameters ---------- azure_storage_connection_string: string A connection string contains both an account name and a secret key. azure_storage_account: string An account name azure_storage_access_key: string A secret key azure_unsigned : bool, optional (default: False) If True, requests will be unsigned. """ self.unsigned = bool(os.getenv("AZURE_NO_SIGN_REQUEST", azure_unsigned)) self.storage_account = os.getenv("AZURE_STORAGE_ACCOUNT", azure_storage_account) if azure_storage_connection_string: self._creds = { "azure_storage_connection_string": azure_storage_connection_string } elif not self.unsigned: self._creds = { "azure_storage_account": self.storage_account, "azure_storage_access_key": azure_storage_access_key } else: self._creds = { "azure_storage_account": self.storage_account } @classmethod def hascreds(cls, config): """Determine if the given configuration has proper credentials Parameters ---------- cls : class A Session class. config : dict GDAL configuration as a dict. Returns ------- bool """ return ( 'AZURE_STORAGE_CONNECTION_STRING' in config or ('AZURE_STORAGE_ACCOUNT' in config and 'AZURE_STORAGE_ACCESS_KEY' in config) or ('AZURE_STORAGE_ACCOUNT' in config and 'AZURE_NO_SIGN_REQUEST' in config) ) @property def credentials(self): """The session credentials as a dict""" return self._creds def get_credential_options(self): """Get credentials as GDAL configuration options Returns ------- dict """ if self.unsigned: return { 'AZURE_NO_SIGN_REQUEST': 'YES', 'AZURE_STORAGE_ACCOUNT': self.storage_account } else: return {k.upper(): v for k, v in self.credentials.items()}