""" Module that control the authentication for dropbox. The authentication flow is as follow: 1.- A HTTP post is made to MG/authenticate/dropbox (goes to POST examples). 2.- Follow the url returned by this method. Logging with the wanted dropbox account and allow all the permissions. 3.- The authentication flow is completed. Behind the scene: 1.- When step one is received the dropbox library are used to make the registration url. Then this url is sent to the user. 2.- When user finish the login process an HTTP GET is received in MG/authentication/dropbox. This request has the information needed to finished the OAuth process and get the authorization credentials to access the client's dropbox. 3.- The credentials are saved to disk. Warnings -------- - The credentials are removed when the server is restarted, so the authentication flow must be relaunched. Notes ----- - The credentials are saved in a sqlite3 file with the following structure: * Tables: - users: +----------+--------------+ | name | DropboxID | +----------+--------------+ - name (TEXT PRIMARY KEY): Name of the authenticated user. - DropboxID (TEXT): Unique identification for a dropbox element. - dropbox +----------+--------------+-----------+----------+--------+ | id | user_id | account_id | scope | cred | +----------+--------------+-----------+----------+--------+ - id (TEXT PRIMARY KEY): Id of the authentication parameters. Must be the same as DropboxID. - user_id (TEXT): Dropbox ID of the user - account_id (TEXT): Dropbox ID of the account. - scope (TEXT): Scopes. - cred (TEXT): Encrypted credentials in a json string format. """ import json import os.path import uuid import sqlite3 from django.conf import settings from django.http import HttpRequest, HttpResponse from cryptography.fernet import Fernet import dropbox as dbx from tools.service.http_loaders import invisible_out, invisible_in from tools.parser import NoneParser AUTH_PATH = os.path.join(settings.BASE_DIR, 'MG', 'authenticate', 'certs') def post(_, user: str = None, request=None): """ Trigger the authentication flow. After do the post follow the url returned. Parameters ---------- user: str User to register request: str Kind of request Returns ------- str: Url to complete the authentication flow. Examples -------- >>> import requests >>> parameters={'user':'test'} >>> response = requests.post(':/MG/authenticate/dropbox', ... json=parameters) >>> response.json() "https://www.dropbox.com/oauth2/authorize?response_type=code&redirect_uri=http%3A%2F%2F 127.0.0.1%3A1114%2FMG%2Fauthenticate%2Fdropbox&state=90kB7zdLEs3A861wLD8ihw%3D%3D%7C%7B %22user%22%3A+%22test%22%7D&client_id=t6rpf1d5nr54enk&token_access_type=offline" """ user_ = json.loads(request.body.decode('utf8'))["user"] host = request.get_host() # Read app key and secret with open(settings.DROPBOX_API_CREDENTIALS_PATH, 'r') as app_cred_file: app_cred = json.load(app_cred_file) auth_flow = dbx.DropboxOAuth2Flow(app_cred['app_key'], '{}://{}/MG/authenticate/dropbox'.format( request.scheme, host), {}, 'dropbox-auth-csrf-token', token_access_type='offline', consumer_secret=app_cred['app_secret']) authorize_url = auth_flow.start(url_state=json.dumps({'user': user_})) return authorize_url POST_REQUEST = { 'function': post, 'http_p': invisible_in, 'parser': NoneParser } def get(_, user: str = None, request: HttpRequest = None): """ This functionality is only internal. This method would be called from dropbox consent screen. """ parameters = request.GET.dict() parameters_splitted = parameters['state'].split('|') user_ = json.loads(parameters_splitted[1])['user'] # Read app key and secret with open(settings.DROPBOX_API_CREDENTIALS_PATH, 'r') as app_cred_file: app_cred = json.load(app_cred_file) oauth_result = dbx.DropboxOAuth2Flow( app_cred['app_key'], '{}://{}/MG/authenticate/dropbox' .format(request.scheme, request.get_host()), {'dropbox-auth-csrf-token': parameters_splitted[0]}, 'dropbox-auth-csrf-token', token_access_type='offline', consumer_secret=app_cred['app_secret']).finish(parameters) credentials = {'access_token': oauth_result.access_token, 'expires_at': oauth_result.expires_at.isoformat(), 'refresh_token': oauth_result.refresh_token} # Encrypt file with open(settings.CIPHER_KEY, 'rb') as key_file: key = key_file.read() fernet = Fernet(key) # checking user credentials conn = sqlite3.connect(settings.CERTS_DB_PATH) res = list(conn.execute( 'SELECT DropboxID FROM users where name=\'{}\''.format(user_))) if res: dropboxid = res[0][0] conn.execute( 'REPLACE INTO dropbox (id, user_id, account_id, scope, cred) VALUES ' '("{dropbox_id}", "{user_id}", "{account_id}", "{scope}", "{cred}");' .format(dropbox_id=dropboxid, user_id=oauth_result.user_id, account_id=oauth_result.account_id, scope=oauth_result.scope, cred=fernet.encrypt(json.dumps( credentials).encode()).decode())) else: dropboxid = uuid.uuid4().hex conn.execute( 'INSERT INTO dropbox (id, user_id, account_id, scope, cred) VALUES ' '("{dropbox_id}", "{user_id}", "{account_id}", "{scope}", "{cred}");' .format(dropbox_id=dropboxid, user_id=oauth_result.user_id, account_id=oauth_result.account_id, scope=oauth_result.scope, cred=fernet.encrypt( json.dumps(credentials).encode()).decode())) conn.execute('INSERT OR REPLACE INTO users (name, DropboxID) VALUES ' '("{name}", "{dropbox_id}");' .format(dropbox_id=dropboxid, name=user_)) conn.commit() conn.close() return HttpResponse('Created') GET_REQUEST = { 'function': get, 'http_p': invisible_in, 'http_rm': invisible_out, 'parser': NoneParser }