""" Module that manage the behaviour of source allocated in a ftp """ import ftplib import tempfile import random import os import re import shutil import urllib.parse from copy import deepcopy from tools.protocols.masterProtocol import MasterProtocol from tools import const try: from AI.training.calls.create_map import Create_map except: from tools.protocols.create_map import Create_map class FTP(MasterProtocol): ConnectionError = type('ConnectionError', (Exception,), {}) LoginError = type('LoginError', (Exception,), {}) NotEnoughPrivileges = type('NotEnoughPrivileges', (Exception, ), {}) NoDirectoryOrFile = type('NoDirectoryOrFile', (Exception,), {}) DirectoryOrFileExists = type('DirectoryOrFileExists', (Exception, ), {}) CONNECTION_REQUIRED = {'user': 'user', 'password': 'password', 'ip': 'ip', 'fileType': 'driver_type', 'filePath': 'source', 'domain': 'domain', 'port': 'port', 'layerName': 'layer_name', 'subtype': 'type'} NAME = const.FTP_KEY VSI_CURL_STREAMING_STRING = '/vsicurl_streaming/' def __init__(self, ip, port, user="", password=""): super().__init__() self.ip = ip self.port = int(port) if port else 21 # Mucho cuidado con el puerto porque hay que pasar el puerto correcto de la ftp self.user = user self.password = password self.conn = ftplib.FTP() self.connect(ip, self.port) self.login(user, password) self.write_test() def exists(self, layer): if self.connect(self.ip, self.port) is not False and \ self.login(self.user, self.password) is not False: path = layer.get_source() previous_path = '/'.join(path.split('/')[:-1]) try: path_gen = [file[0] for file in self.conn.mlsd(previous_path)] except ftplib.error_perm: path_gen_all = [file for file in self.conn.nlst(previous_path)] path_gen = [] for files in path_gen_all: files_n = files.rsplit('/', 1)[1] path_gen.append(files_n) if len(path) == 1 and path == '/': return True else: return path.replace(previous_path + '/', '') in path_gen else: return False def write_test(self): if self.connect(self.ip, self.port) is not False and \ self.login(self.user, self.password) is not False: string_to_test = 'Write test {}'.format(random.randint(12458, 85848682)) tmp = tempfile.NamedTemporaryFile('w+b') tmp.write(string_to_test.encode()) tmp.flush() try: self.upload(tmp.name, '/write_test.txt') self.conn.delete('/write_test.txt') #self.conn.close() except ftplib.error_perm: # raise self.NotEnoughPrivileges('Not enough privileges on ftp {}:{}' # .format(self.ip, self.port)) return False def connect(self, ip, port): try: if not port: port = 0 self.conn.connect(ip, port) return True except (ConnectionRefusedError, ConnectionResetError, OSError, TimeoutError): # raise self.ConnectionError('ftp on {}:{} off'.format(self.ip, # self.port)) return False def login(self, user, password): if self.connect(self.ip, self.port) is not False: try: self.conn.login(user, password) return True except (ftplib.error_perm, AttributeError, TimeoutError): # raise self.LoginError('Bad user or password for ftp on {}:{}' # .format(self.ip, self.port)) return False def list_files(self, directory, suffix=None): list_files = [] if self.connect(self.ip, self.port) is not False and \ self.login(self.user, self.password) is not False: if suffix: dir_list = self.conn.nlst(directory) if '{}/..'.format(directory) in dir_list: dir_list.remove('{}/..'.format(directory)) if '{}/.'.format(directory) in dir_list: dir_list.remove('{}/.'.format(directory)) for file in dir_list: if re.search('{}(f?)'.format(suffix), file): list_files.append(file) else: list_files = self.conn.nlst(directory) #self.conn.close() return list_files def is_file(self, path): if self.connect(self.ip, self.port) is not False and \ self.login(self.user, self.password) is not False: try: files = self.list_files(path) if len(files) == 1: #self.conn.close() return True return False except (ftplib.error_perm, AttributeError): #self.conn.close() return False def is_directory(self, path): if self.connect(self.ip, self.port) is not False and \ self.login(self.user, self.password) is not False: files = self.conn.nlst(path) #self.conn.close() return len(files) > 1 def gdal_layer(self, layer, with_vsi): if self.connect(self.ip, self.port) is not False and \ self.login(self.user, self.password) is not False: if '.' in layer['source']: layer_name_1 = layer['source'].rsplit('/', 1)[0] list_n = self.conn.nlst(layer_name_1) else: # Shapefiles must be in separate folder to read it, # at least for now list_n = self.conn.nlst(layer['source']) if layer['driver_type'] == 'SHAPE': list_copy = deepcopy(list_n) list_n = [] for i in list_copy: if ".shp" in i: list_n.append(i) if '.' in layer['source']: layer_name = layer['source'] if '/ftp/' in layer['source']: layer_name = layer['source'].replace( '/ftp/{}/public/'.format(self.user), '') else: if 'layer_name' in layer.cloud_parameters: layer_name = '{}/{}'.format(layer['source'], layer['layer_name']) if len(list_n) == 1: layer_name = '{}/{}'.format(layer['source'], list_n[0]) if layer['driver_type'] == 'SHAPE' and len(list_n) > 1: for i in list_n: layer_name = '{}/{}'.format(layer['source'], i) self.conn.close() return self.gdal_url(layer_name, with_vsi=with_vsi, driver_prefix=self.VSI_CURL_STREAMING_STRING) def gdal_url(self, source, with_vsi=False, driver_prefix=''): if source[0] == '/': source = source[1:] if not self.user: login_string = '' else: login_string = '{}:{}@'.format(urllib.parse.quote(self.user), urllib.parse.quote(self.password)) vsi_string = '' if with_vsi: vsi_string = '{}'.format(driver_prefix) return "{}ftp://{}{}:{}/{}".format(vsi_string, login_string, self.ip, self.port, source) def join(self, *args): args = list(args) for arg_index, arg in enumerate(args[1:]): if arg[0] == '/': args[arg_index+1] = arg[1:] join_string = os.path.join(*args) return join_string def mkdir(self, directory, p=False): try: if not p: self.conn.mkd(directory) else: directories = directory.split('/')[1:] relative_path = '' for folder in directories: relative_path = os.path.join(relative_path, folder) # This could not be ran if not self.exists(relative_path): self.conn.mkd(relative_path) except ftplib.error_perm: raise self.NoDirectoryOrFile('{} not found'.format(directory)) def upload(self, src, dst): ## Antes de subir las cosas a la ftp, movemos los archivos los archivos a la carpeta global que tenemos creada y hacemos el archivo html if os.path.isfile(src) and src.find('tmp') == 0: try: shutil.copy2(src, '/home/master_folder/IA/detection') print('Lo ha copiado') except: print('No lo ha podido copiar') try: print(src.rsplit('/',1)[1]) crear_mapa = Create_map() crear_mapa.create('/home/master_folder/IA/detection/{}'.format(src.rsplit('/',1)[1])) print('El mapa se ha creado') except: print('No ha podido crear el mapa') ## Hacemos las cosas de la ftp try: if dst.find('.zip') != -1: dst_cut = dst.rsplit('/',1)[1] dst = '/IA_output/{}'.format(dst_cut) print(dst) elif dst.find('.geojson') != -1: dst_cut = dst.rsplit('/',1)[1] dst = '/IA_output/{}'.format(dst_cut) print(dst) except: pass if self.connect(self.ip, self.port) is not False and \ self.login(self.user, self.password) is not False: with open(src, 'rb') as file_fp: path_ftp = str(self.conn.pwd()) try: print(dst.rsplit('/',1)[0]) self.conn.mkd(path_ftp + dst.rsplit('/',1)[0]) except: print('Ya está creada la carpeta') upload = self.conn.storbinary("STOR " + path_ftp + dst, file_fp) if "Transfer complete" in upload: return True return False def upload_dir(self, src_directory, dst_directory): for dirpath, _, filenames in os.walk(src_directory): for file in filenames: relative_path = dirpath.replace(src_directory, '') self.mkdir(self.join(dst_directory, relative_path), p=True) self.upload(os.path.join(dirpath, file), self.join(dst_directory, relative_path, file)) def download(self, file, path): if self.connect(self.ip, self.port) is not False and \ self.login(self.user, self.password) is not False: with open(path, 'wb') as handle: self.conn.retrbinary('RETR {}'.format(file), handle.write) def format_parameters(self, _, parameters): # Format source parameters.update({'source': re.sub(' +', '_', parameters['source'])}) return parameters