# """ # Module that manage the behaviour of source allocated in a google drive source # """ # import os # import os.path # import json # import io # import sqlite3 # import urllib.parse # from django.conf import settings # from google.oauth2.credentials import Credentials # from googleapiclient.discovery import build # from googleapiclient.http import MediaIoBaseDownload, MediaFileUpload # from cryptography.fernet import Fernet # # from tools import const # from tools.envConf.def_conf import GDRIVE_FILE_SERVER # from tools.protocols.masterProtocol import MasterProtocol # from .tools.mime_types import MIME_TYPES # # # SCOPES = ['https://www.googleapis.com/auth/drive.file', # 'https://www.googleapis.com/auth/drive.install', # 'https://www.googleapis.com/auth/userinfo.email'] # GDRIVE_FOLDER_MIME_TYPE = 'application/vnd.google-apps.folder' # # # class GoogleDrive(MasterProtocol): # """ # Main class of Google Drive protocol. # # Attributes # ---------- # CONNECTION_REQUIRED: dict # Parameters required for the connection # NAME: str # Name of the protocol # user: str # User # service: Resource # Google drive api service # cred: Credentials # Google drive api credentials # # Raises # ------ # LoginError: # The authenticated user does not exist in the system. Trigger the authentication flow. # NoDirectoryOrFile: # The directory or file does not exist or the permission are not enough. # DirectoryOrFileExists: # Directory or file already exists. # # Parameters # ---------- # user: str # User # check_main_folder: bool # Check or not the existance of the main folder. # """ # LoginError = type('LoginError', (Exception,), {}) # NoDirectoryOrFile = type('NoDirectoryOrFile', (Exception,), {}) # DirectoryOrFileExists = type('DirectoryOrFileExists', (Exception,), {}) # CONNECTION_REQUIRED = {'user': 'user', 'password': 'password', 'ip': 'ip', # 'fileType': 'driver_type', 'filePath': 'source', # 'port': 'port', 'layerName': 'layer_name', 'subtype': 'type'} # NAME = const.GOOGLEDRIVE_KEY # # def __init__(self, ip, port, user="", check_main_folder=True, **__): # super().__init__() # env_conf = settings.ENV_CONF # self.__driver_server_url = env_conf[GDRIVE_FILE_SERVER] # self.user = user # self.user_creds_path = os.path.join(settings.BASE_DIR, 'MG', 'authenticate', 'certs', # self.user, 'drive.json') # self.service, self.cred = self.__connect() # self.__item = {} # self.__actual_path = [] # # # Check application folder # if check_main_folder and not self.__exists(['IpsilumCoreGms']): # raise self.DirectoryOrFileExists('Main folder not found. Please follow the ' # 'configuration steps.') # # def __get_item(self, path): # """ # Get google drive item by path # Parameters # ---------- # path: list # List of subsequent path. i.e. ['master_folder', 'subfolder', 'file.ext'] # # Returns # ------- # dict: # Item from google drive. # # Raises # ------ # NoDirectoryOrFile: # `path` does not exists. # # """ # if self.__item and self.__actual_path == path: # return self.__item # item = self.get_element_by_path('/'.join(path)) # if item is None: # raise self.NoDirectoryOrFile('Directory or file ({}) not found'.format(path)) # self.__item = item # self.__actual_path = path # return self.__item # # def __exists(self, path): # """ # Check if path exists in google drive # Parameters # ---------- # path: list # List of subsequent path. i.e. ['master_folder', 'subfolder', 'file.ext'] # # Returns # ------- # bool: # Whether exists or not # """ # if not self.__get_item(path): # return False # return True # # def exists(self, layer): # """ # Check if the source of the layer exists. # Parameters # ---------- # layer: Layer # Layer to check # # Returns # ------- # bool: # Whether exists or not # """ # return self.__exists(layer['source'].split('/')) # # @staticmethod # def __decrypt(user_cred): # """ # Method to decrypt the credentials saved in json string format. # Parameters # ---------- # user_cred: str # Encrypted string # # Returns # ------- # dict: # Decrypted credentials in json format. # """ # with open(settings.CIPHER_KEY, 'rb') as key_file: # key = key_file.read() # fernet = Fernet(key) # cred_json = json.loads(fernet.decrypt(user_cred).decode()) # return cred_json # # def __connect(self): # """ # Connect to google drive. Get the service needed to interact with google drive. # Returns # ------- # Resource: # Google drive api service. # # Raises # ------ # LoginError: # Authorization certificates not found # """ # conn = sqlite3.connect(settings.CERTS_DB_PATH) # res = list(conn.execute('SELECT "GDriveID" from users where name="{}"'.format(self.user))) # if not res: # raise self.LoginError("Authorization certificates not found. Please visit the docs " # "to trigger the login flow") # user_cred = list(conn.execute('SELECT cred from gdrive where' # ' id="{}"'.format(res[0][0])))[0][0] # conn.close() # cred_json = self.__decrypt(user_cred.encode()) # self.cred = Credentials.from_authorized_user_info(cred_json, SCOPES) # return build('drive', 'v3', credentials=self.cred, cache_discovery=False), self.cred # # def __check_element(self, item, searched_path, query_parameters_orig): # """ # Check if the item belongs to the path which is searched. For that the parents # are checked in a recursive manner. # Parameters # ---------- # item: dict # Google drive item representation # searched_path: str # File path to search # query_parameters_orig: # Original query parameter of google drive api # # Returns # ------- # bool: # Whether or not the element belong to the path # """ # if not searched_path or ('parents' not in item.keys() and len(searched_path) == 1): # return True # # Get item parent # parent = item['parents'][0] # result = self.service.files().get(fileId=parent, fields='*').execute() # # if result['name'] == searched_path[0]: # return self.__check_element(result, searched_path[1:], query_parameters_orig) # return False # # def get_element_by_path(self, path) -> dict or None: # """ # Get google drive item by path # # Parameters # ---------- # path: str # Path to search # # Returns # ------- # dict or None: # Dictionary item or None if `path` not exists. # # """ # url_parsed = urllib.parse.urlparse(path) # query_parameters = urllib.parse.parse_qs(url_parsed.query) # query_parameters = {query_param: query_parameters[query_param][0] # for query_param in query_parameters} # absolute_path = url_parsed.path.split('/')[::-1] # children = absolute_path[0] # query_parameters.update({'pageSize': 1000, 'fields': '*'}) # query_parameters_orig = query_parameters.copy() # query = "name='{}' and {}".format(children, query_parameters['q']) \ # if 'q' in query_parameters.keys() else "name='{}'".format(children) # query_parameters.update({'q': query}) # results = self.service.files().list(**query_parameters).execute() # items = results.get('files', []) # for item in items: # if self.__check_element(item, absolute_path[1:], query_parameters_orig): # self.__item = item # self.__actual_path = path # return item # return None # # def list_files(self, directory, suffix=None): # """ # List files inside a directory # Parameters # ---------- # directory: str # Path to the directory # suffix: str # Filter to the files found. # # Returns # ------- # list: # List with the files found. # """ # item = self.__get_item(directory.split('/')) # if item is None: # raise self.NoDirectoryOrFile('Directory {} does not exists'.format(directory)) # if item['mimeType'] != GDRIVE_FOLDER_MIME_TYPE: # print('It is not a folder') # return None # query_parameters = { # "q": "mimeType!='{}' and '{}' in parents {}".format( # GDRIVE_FOLDER_MIME_TYPE, # item['id'], # "and name contains '.{}'".format(suffix) * # bool(suffix))} # query_parameters.update({'pageSize': 1000, 'fields': 'files(id, name)'}) # results = self.service.files().list(**query_parameters).execute() # return [result['name'] for result in results['files']] # # def is_file(self, path): # """ # Check if `path` is a file. # Parameters # ---------- # path: str # Path to the file # # Returns # ------- # bool: # Whether or not `path` is a file. # """ # item = self.__get_item(path.split('/')) # if item['mimeType'] != GDRIVE_FOLDER_MIME_TYPE: # return True # return False # # def gdal_layer(self, layer, with_vsi): # """ # Get gdal connection string of the source # Parameters # ---------- # layer: Layer # Layer to get the gdal connection # with_vsi: bool # Add virtual driver gdal prefix or not. # Returns # ------- # str: # Gdal connection string # """ # return self.gdal_url(layer['source'], with_vsi=with_vsi, # driver_prefix=layer.driver.gdal_prefix) # # def gdal_url(self, source, with_vsi=False, driver_prefix=''): # """ # Get the gdal connection url of source. # Parameters # ---------- # source: str # Path to the source. # with_vsi: bool # Add virtual driver gdal prefix or not. # driver_prefix: str # Specific driver prefix (/vsizip/) # # Returns # ------- # str: # Gdal connection string # # """ # vsi_string = "" # if with_vsi: # vsi_string = '{}/vsicurl/'.format(driver_prefix) # url = "{drive_server_url}/{user}/{source}".format( # user=self.user, # drive_server_url=self.__driver_server_url, # source=source) # gdal_url = "{vsi_string}{url}".format(vsi_string=vsi_string, url=url) # return gdal_url # # @staticmethod # def join(*args): # """ # Join args paths # Parameters # ---------- # args: list # List with subpaths # # Returns # ------- # str: # String with joined paths # """ # return '/'.join(*args) # # def mkdir(self, directory, p=False): # """ # Make directory or trees of directory # Parameters # ---------- # directory: str # Path # p: bool # Linux mkdir p parameter # """ # directory = directory.split('/') # if self.__exists(directory): # raise self.DirectoryOrFileExists('Folder {} already exists'.format(directory)) # if not p and not self.__exists(directory[:-1]): # raise self.NoDirectoryOrFile('Folder or file {} not found' # .format(directory[:-1])) # self.__mktreedir(directory) # # def __mktreedir(self, directory): # """ # Make tree of directories # Parameters # ---------- # directory: list # Path # """ # for i in range(1, len(directory)): # element_path = directory[0:i+1] # if self.__exists(element_path): # continue # self.__mkdir(element_path) # # def __mkdir(self, path): # """ # Make a single folder. The parent of the folder must exist. # Parameters # ---------- # path: list # Path # """ # parent_item = self.__get_item(path[0:-1]) # # file_metadata = { # 'name': path[-1], # 'mimeType': GDRIVE_FOLDER_MIME_TYPE, # 'parents': [parent_item['id']] # } # self.service.files().create(body=file_metadata).execute() # # def upload(self, src, dst, regex=""): # """ # Upload a single file or a folder structure. # Parameters # ---------- # src: str # Path from upload # dst: str # Path to upload # regex: str # Regex pattern to apply in file names. # """ # if not os.path.exists(src): # raise self.NoDirectoryOrFile('Origin path ({}) does not exist' # .format(src)) # if self.__exists(dst.split('/')): # raise self.DirectoryOrFileExists('Destination path ({}) already exists'.format(dst)) # # name = dst.split('/')[-1] # parent = dst.split('/')[0:-1] # if not self.__exists(parent): # self.mkdir('/'.join(parent), p=True) # # if os.path.isdir(src): # self.__upload_dir(src, dst.split('/'), regex) # return # # self.__upload(src, parent, name) # # def __upload_dir(self, src_directory, dst_directory, _): # """ # Upload a directory # Parameters # ---------- # src_directory: str # Path # dst_directory list # Path # _: str # Regex pattern # """ # walk_gen = os.walk(src_directory) # for dirpath, _, filenames in walk_gen: # if dirpath != src_directory: # dst_directory += [dirpath.replace(src_directory, '').split('/')[-1]] # self.__mkdir(dst_directory) # for filename in filenames: # self.__upload(os.path.join(dirpath, filename), dst_directory, filename) # # def __upload(self, src, dst, name): # """ # Upload a single file # Parameters # ---------- # src: str # Origin path # dst: list # Destination path # name: str # Name of the upload file. # """ # extension = name.split('.')[-1] # mime_type = MIME_TYPES[extension] if extension in MIME_TYPES.keys() else '' # parent = self.get_element_by_path('/'.join(dst))['id'] # file_metadata = {'name': name, # 'mimeType': mime_type, # 'parents': [parent]} # media = MediaFileUpload(src) # self.service.files().create(body=file_metadata, # media_body=media).execute() # # def download(self, src, dst, regex=""): # """ # Download a file or a directory tree. # Parameters # ---------- # src: str # Origin path # dst: str # Destination path # regex: str # Regex pattern. # """ # if not self.__exists(src.split('/')): # raise self.NoDirectoryOrFile('Origin path ({}) does not exist' # .format(src)) # if os.path.exists(dst): # raise self.DirectoryOrFileExists('Destination path ({}) already exists'.format(dst)) # # parent = '/'.join(dst.split(os.sep)[0:-1]) # if not os.path.exists(parent): # os.makedirs(parent, exist_ok=True) # # if not self.is_file(src): # self.__download_dir(src.split('/'), dst, regex) # return # # self.__download(src.split('/'), dst) # # def __download_dir(self, src, dst, regex): # """ # Download a directory # Parameters # ---------- # src: list # Origin path # dst: str # Destination path # regex: str # Regex pattern # """ # if not os.path.exists(dst): # os.makedirs(dst, exist_ok=True) # results = self.list_files('/'.join(src), regex) # for _file in results: # self.__download(src + [_file], os.path.join(dst, _file)) # # def __download(self, src, dst): # """ # Download a single file. # Parameters # ---------- # src: list # Origin path # dst: str # Destination path # """ # item = self.get_element_by_path('/'.join(src)) # request = self.service.files().get_media(fileId=item['id']) # in_memory_file = io.BytesIO() # downloader = MediaIoBaseDownload(in_memory_file, request) # done = False # while done is False: # _, done = downloader.next_chunk() # in_memory_file.seek(0) # with open(dst, 'wb') as dst_file: # dst_file.write(in_memory_file.read()) # # @staticmethod # def format_parameters(_, parameters): # """ # Format parameter in a google drive manner # Parameters # ---------- # _ # parameters: dict # Parameters of the layer. # # Returns # ------- # dict: # Formatted parameters. # """ # # Check if source has a beginning slash # parameters['source'] = parameters['source'][1:] if parameters['source'][0] == '/'\ # else parameters['source'] # return parameters