""" Module that manage the behaviour of source allocated in a dropbox source """ import re import os import json import urllib.request import urllib.parse import sqlite3 import datetime import requests from cryptography.fernet import Fernet import dateutil.parser from django.conf import settings import dropbox as dbx from tools.protocols.masterProtocol import MasterProtocol from tools.envConf.def_conf import DROPBOX_FILE_SERVER from tools import const class Dropbox(MasterProtocol): """ Main class of Google Drive protocol. Attributes ---------- CONNECTION_REQUIRED: dict Parameters required for the connection NAME: str Name of the protocol user: str User service: Resource Dropbox api service cred: Credentials Dropbox api credentials Raises ------ LoginError: The authenticated user does not exist in the system. Trigger the authentication flow. NoDirectoryOrFile: The directory or file does not exist or the permission are not enough. DirectoryOrFileExists: Directory or file already exists. Parameters ---------- user: str User """ LoginError = type('LoginError', (Exception,), {}) NoDirectoryOrFile = type('NoDirectoryOrFile', (Exception,), {}) DirectoryOrFileExists = type('DirectoryOrFileExists', (Exception, ), {}) CONNECTION_REQUIRED = {'user': 'user', 'password': 'password', 'ip': 'ip', 'fileType': 'driver_type', 'filePath': 'source', 'port': 'port', 'layerName': 'layer_name', 'subtype': 'type'} NAME = const.DROPBOX_KEY def __init__(self, ip, port, user, *_, **__): super().__init__() self.user = user env_conf = settings.ENV_CONF self.__dropbox_server_url = env_conf[DROPBOX_FILE_SERVER] self.__connect() self.__access_token = '' self.__expired_at = datetime.date(2000, 1, 1) def __connect(self): """ Method to test the connection Raises ------ LoginError: The credentials of the user are not found. """ conn = sqlite3.connect(settings.CERTS_DB_PATH) res = list( conn.execute('SELECT "DropboxID" FROM users WHERE name="{}"'.format(self.user))) conn.close() if not res: raise self.LoginError("User {} has not dropbox authentication".format(self.user)) @staticmethod def __decrypt(user_cred): """ Method to decrypt the credentials saved in json string format. Parameters ---------- user_cred: str Encrypted string Returns ------- dict: Decrypted credentials in json format. """ with open(settings.CIPHER_KEY, 'rb') as key_file: key = key_file.read() fernet = Fernet(key) cred_json = json.loads(fernet.decrypt(user_cred).decode()) return cred_json @staticmethod def __encrypt(user_cred): """ Method to encrypt the credentials. Parameters ---------- user_cred: dict Credentials dictionary Returns ------- str: Encrypted string """ with open(settings.CIPHER_KEY, 'rb') as key_file: key = key_file.read() fernet = Fernet(key) cred_json = fernet.encrypt(json.dumps(user_cred).encode()).decode() return cred_json @staticmethod def __refresh_token(cred): """ Method to get new access_token when is expired Parameters ---------- cred: dict Dropbox credentials dict """ with open(settings.DROPBOX_API_CREDENTIALS_PATH, 'r') as app_cred_file: app_cred = json.load(app_cred_file) # Refresh token res = requests.post("https://{}:{}@api.dropboxapi.com/oauth2/token?" "refresh_token={}&grant_type=refresh_token" .format(app_cred["app_key"], app_cred['app_secret'], cred['refresh_token'])) new_cred = res.json() cred['access_token'] = new_cred['access_token'] cred['expires_at'] = (datetime.datetime.utcnow() + datetime.timedelta(seconds=new_cred['expires_in'])).isoformat() def __get_token(self): """ Method to get access_token. If an access token exists and is valid return it else a new one is generated. Returns ------- str: Valid access_token """ if self.__access_token and datetime.datetime.utcnow() < self.__expired_at: return self.__access_token conn = sqlite3.connect(settings.CERTS_DB_PATH) res = list(conn.execute('SELECT "cred" FROM dropbox WHERE id=(SELECT DropboxID FROM users' ' WHERE name="{}")'.format(self.user))) if not res: raise self.LoginError("The user {} exist but the credentials not exists" .format(self.user)) cred = res[0][0] cred = self.__decrypt(cred.encode()) # Check if token is expired if dateutil.parser.parse(cred['expires_at']) <= (datetime.datetime.utcnow()): self.__refresh_token(cred) # Save new creds conn.execute('UPDATE dropbox SET cred="{cred}" WHERE id=(SELECT DropboxID ' 'FROM users WHERE name="{user}")' .format(cred=self.__encrypt(cred), user=self.user)) conn.commit() conn.close() self.__access_token = cred['access_token'] self.__expired_at = dateutil.parser.parse(cred['expires_at']) return self.__access_token def __exists(self, path): """ Check if path exist on dropbox Parameters ---------- path:str Account absolute path Returns ------- bool: Whether or not the file exists. """ with dbx.Dropbox(oauth2_access_token=self.__get_token()) as dbx_conn: try: dbx_conn.files_get_metadata(path) except dbx_conn.exceptions.ApiError: return False return True def authorize(self, header=False): """ Method to authorize a request. If header=True a dict is returned to act as a header. - header=True {"Authorization": "Bearer access_token"} - header=False authorization="Bearer access_token" (url encoded) Parameters ---------- header: bool True for dict and False for url parameter Returns ------- dict or str: Dict or string """ token = self.__get_token() if header: return {'Authorization': "Bearer {}".format(token)} return "authorization={}".format(urllib.parse.quote_plus("Bearer {}". format(token))) def exists(self, layer): """ Check if layer exists Parameters ---------- layer: Layer Layer to check Returns ------- bool: Whether or not the layer exists. """ return self.__exists(layer['source']) def is_file(self, path): """ Check if `path` is a file. Parameters ---------- path: str Path to the file Returns ------- bool: Whether or not `path` is a file. """ with dbx.Dropbox(oauth2_access_token=self.__get_token()) as dbx_conn: metadata = dbx_conn.files_get_metadata(path) if metadata.__class__.__name__ == "FileMetadata": return True return False def gdal_layer(self, layer, with_vsi): """ Return the gdal connection string. Parameters ---------- layer: Layer Layer to get gdal connection with_vsi: bool Add the virtual driver prefix. i.e. /vsicurl/ Returns ------- str: Gdal connection string. """ return self.gdal_url(layer['source'], with_vsi=with_vsi, driver_prefix=layer.driver.gdal_prefix) def gdal_url(self, source, with_vsi=False, driver_prefix=''): """ Get gdal connection string of `source` with the features of the protocol Parameters ---------- source: str Absolute in dropbox account space. with_vsi: bool Add virtual drive prefix, i.e. /vsicurl/ driver_prefix: str Add specific virtual drive. like ZIP /vsizip/ Returns ------- str: Gdal connection string. """ with dbx.Dropbox(oauth2_access_token=self.__get_token()) as dbx_conn: file_id = dbx_conn.files_get_metadata(source).id return "{}{}?user={}&file={}".format('{}/vsicurl/'. format(driver_prefix)*with_vsi, self.__dropbox_server_url, self.user, urllib.parse.quote_plus( json.dumps({"path": file_id}))) def join(self, *args): """ Join args paths Parameters ---------- args: list List with subpaths Returns ------- str: String with joined paths """ args = list(args) for arg_index, arg in enumerate(args[1:]): if arg[0] == '/': args[arg_index+1] = arg[1:] join_string = os.path.join(*args) return join_string def create_folder(self, path): """ Parameters ---------- path: str path inside the App where you want to create the folder. Returns ------- Boolean True if Success or False if Fails """ with dbx.Dropbox(oauth2_access_token=self.__get_token()) as dbx_conn: create = dbx_conn.files_create_folder_v2(path) if not create: return False return True def upload(self, src, dst): """ Parameters ---------- src: str full path where the file to upload is dst: str path inside the App where you want to save the file. Set the name. The file can be rename if you set another name Returns ------- Boolean True if Success or False if Fails """ if 'ftp://' in src or 'http://' in src: web_url = urllib.request.urlopen(src) else: web_url = open(src, 'rb') read_files = web_url.read() web_url.close() with dbx.Dropbox(oauth2_access_token=self.__get_token()) as dbx_conn: upload = dbx_conn.files_upload(read_files, dst) if not upload: return False return True def upload_dir(self, src_directory, dst_directory): """ Parameters ---------- src_directory: str full path where the file to upload is dst_directory: str path inside the App where you want to save the file. Set the name. The file can be rename if you set another name Returns ------- Boolean True if Success or False if Fails """ files = [] if 'http://' in src_directory: page = urllib.request.urlopen(src_directory) page_decode = str(page.read().decode()) regex = re.compile(r'