""" Module to implement Fiware services as a driver. """ import base64 import re import json import requests from MG.tools.postgis_api import PostGis from tools.protocols.masterProtocol import MasterProtocol from tools import const class FIWARE(MasterProtocol): """ Class to control the PostgreSQL protocol Attributes ---------- CONNECTION_REQUIRED: dict Mapping of required connection parameters. Only is important when GMS is used with Itasker. NAME: str Protocol identifier string Parameters ---------- ip: str Url of service port: int Port of service user: str User of service connection Context Broker password: str Password of service service: str Name of Fiware service subservice: str Name of the Fiware subservice always must include "/" application_id: str Encode id to get the token when the version is V1 application_secret: str Encode secret to get the token when the version is V1 """ LoginError = type('LoginError', (Exception,), {}) NotEnoughPrivileges = type('NotEnoughPrivileges', (Exception, ), {}) CONNECTION_REQUIRED = {'user': 'user', 'password': 'password', 'url_token': 'url_token', 'fileType': 'driver_type', 'ip': 'ip', 'port': 'port', 'layer_name': 'entity_name', 'service': 'service', 'subservice': 'subservice', 'application_id': 'application_id', 'application_secret': 'application_secret', 'subtype': 'type', 'layerName': 'layer_name'} NAME = const.FIWARE_KEY def __init__(self, ip, port, user="", password="", url_token="", service="", subservice="", application_id="", application_secret=""): super(FIWARE, self).__init__() self.ip = ip self.port = int(port) self.user = user self.password = password self.url_token = url_token self.service = service self.subservice = subservice self.application_id = application_id self.application_secret = application_secret def exists(self, layer): """ Service to check whether the entity of Fiware exists Returns ------- bool True if successful, False otherwise. """ version = self.get_version(self.ip, self.port) access_token = self.get_token(self.user, self.password) uri_cb = """http://{ip}:{port}/v2/entities?type={entity}""". \ format(ip=self.ip, port=self.port, entity=layer["entity_name"]) if version == 'KeyStone': headers_get = {'Content-Type': 'application/json', 'Fiware-Service': self.service, 'Fiware-ServicePath': self.subservice, 'X-Auth-Token': access_token} response = requests.get(url=uri_cb, headers=headers_get, verify=False) elif version == 'KeyRock': headers_get = {'Fiware-Service': self.service, 'Fiware-ServicePath': self.subservice, 'X-Auth-Token': access_token} response = requests.get(url=uri_cb, headers=headers_get) else: raise TypeError("Version Actualmente no admitida") if response.status_code in [200, 201]: return True return False def get_version(self): """ Returns ------- str String with the type of version KeyRock or KeyStone """ try: get_version = requests.get(url=self.url_token, verify=False) json_version = json.loads(get_version.content.decode('utf-8')) if 'error' in json_version: version = 'KeyRock' elif 'version' in json_version: version_split = json_version['version']['id'].split(".") if version_split[0] == 'v3': version = 'KeyStone' return version except: raise TypeError('Fiware on {} off'.format(self.url_token)) def get_token(self, user, password): """ Parameters ---------- user: str Username of Context Broker user password: str Password of Context Broker Returns ------- str Access Token """ try: version = self.get_version() if version == 'KeyStone': headers = {'Content-Type': 'application/json'} data = { "auth": { "identity": { "methods": ["password"], "password": { "user": { "domain": { "name": self.service }, "name": user, "password": password } } }, "scope": { "project": { "domain": { "name": self.service }, "name": self.subservice} } } } uri_token = self.url_token + 'auth/tokens' response = requests.post(url=uri_token, headers=headers, data=json.dumps(data), verify=False) access_token = response.headers['X-Subject-Token'] elif version == 'KeyRock': encoded_auth_token = base64.b64encode( "{}:{}".format(self.application_id, self.application_secret).encode( 'utf8')).decode('utf8') headers = {'Host': '192.168.1.43:30031', 'Authorization': "Basic {}".format(encoded_auth_token), 'Content-Type': 'application/x-www-form-urlencoded'} data = {'grant_type': 'password', 'username': user, 'password': password} uri_token = self.url_token.replace('v1/', 'oauth2/token') response = requests.post(url=uri_token, headers=headers, data=data) access_token = json.loads( response.content.decode())['access_token'] else: raise TypeError("Version actualmente no admitida") return access_token except: raise self.LoginError('Bad user or password for Fiware on {}:{}'. format(self.ip, self.port)) def get_entities(self, layer): """ Parameters ---------- layer: :obj:Layer Returns ------- dict Dict that contain the entities obtained with the GET request """ version = self.get_version() access_token = self.get_token(self.user, self.password) if layer["entity_name"] is None or layer["entity_name"] == '' \ or layer["entity_name"] == ' ': uri_cb = """http://{ip}:{port}/v2/entities""".\ format(ip=layer["ip"], port=layer["port"]) geojson_file = {"type": "FeatureCollection", "name": "entities", "features": ""} else: uri_cb = """http://{ip}:{port}/v2/entities?type={entity_name}""" \ .format(ip=layer["ip"], port=layer["port"], entity_name=layer["entity_name"]) geojson_file = {"type": "FeatureCollection", "name": layer["entity_name"], "features": ""} headers = { 'X-Auth-Token': access_token, 'Fiware-Service': layer["service"], 'Fiware-ServicePath': layer["subservice"]} if version == 'KeyStone': response = requests.get(url=uri_cb, headers=headers, verify=False) elif version == 'KeyRock': response = requests.get(url=uri_cb, headers=headers) else: raise TypeError("Version Actualmente no admitida") json_response = json.loads(response.content.decode('utf-8')) geojson_file["features"] = json_response return geojson_file def is_file(self, path): """ Service to check the connection results is a file Returns ------- success: True """ return True def is_directory(self, *_): """ Service to check the connection results is a directory Returns ------- success:True """ return False def gdal_layer(self, layer, *_): """ Service to get the gdal layer url Parameters ---------- layer: :obj:`Layer` with_vsi: bool To indicate if the gdal layer need vsi Returns ------- str Name of the entity that has been requested to the Context Broker """ exist = self.exists(layer) if exist: return layer['entity_name'] def gdal_url(self, source, *_): """ Service to get the gdal PostgreSQL connection url Parameters ---------- source: str Path of URL Returns ------- str URL to request the entity to the Context Broker """ return """http://{ip}:{port}/v2/entities?type={entity}""".\ format(ip=self.ip, port=self.port, entity=source) def join(self, *args): """ Parameters ---------- args Returns ------- """ pass def upload(self, layer, *_): """ Parameters ---------- layer: :obj:Layer Returns ------- None """ postgis_obj = PostGis('Public') schema_pg = layer.user table_pg = layer["table_view_name"] columns = list(postgis_obj.list_columns(schema_pg, table_pg)) columns_name = [] geom_val = 0 for i in columns: if i[0] in ('geom', 'geometry'): geom_val = i[0] else: columns_name.append(i[0]) if geom_val == 0: select = list(postgis_obj.send_sql_command(""" SELECT {attr} from "{schema}"."{table}" """.format(attr=','.join(columns_name), schema=schema_pg, table=table_pg))) else: select = list(postgis_obj.send_sql_command( """ SELECT {attr}, ST_AsText(ST_Transform ({geom}, 4326)) from "{schema}"."{table}" """.format(attr=','.join(columns_name), geom=geom_val, schema=schema_pg, table=table_pg))) columns_name.append(geom_val) list_select = [] for i in select: list_select.append(i) fiware_dict = {"type": "Text", "value": '', "metadata": {}} fiware_type = """ "type": "{entity_type}" """.\ format(entity_type=layer["table_view_name"]) fiware_dict_geom = {"type": "geo:json", "value": {"type": "", "coordinates": ""}} uri_cb = """http://{ip}:{port}/v2/entities""".\ format(ip=self.ip, port=self.port) if len(list_select) > 1000: list_select = list_select[:1000] for k in list_select: new_dict = "{" coord_lat_long = [] zipb_obj = zip(columns_name, k) dict_of_words = dict(zipb_obj) for i, list_col in enumerate(columns_name): if list_col in ('ogc_fid', 'fid'): new_dict += '"id" :' + '"' + str(dict_of_words[list_col]) \ + '" ,' + fiware_type + "," elif list_col in ('geom', 'geometry'): type_geom = dict_of_words[list_col].split("((")[0] if type_geom.lower() == 'polygon' or \ type_geom.lower() == 'multipolygon': type_geom = 'MultiPolygon' elif type_geom.lower() == 'point': type_geom = 'Point' else: raise TypeError("Formato no admitido") fiware_dict_geom["value"]["type"] = type_geom coordinates = dict_of_words[list_col].split("((")[1] coord = coordinates.replace("))", '').split(",") for j in coord: lat_long = j.split() if type_geom == 'MultiPolygon': if lat_long: new_lat_long = [lat_long[0] + " , " + lat_long[1]] coord_lat_long.append(new_lat_long) fiware_dict_geom["value"]["coordinates"] = \ [[coord_lat_long]] else: if lat_long: new_lat_long = lat_long[0] + " , " + lat_long[1] coord_lat_long.append(new_lat_long) fiware_dict_geom["value"]["coordinates"] = \ coord_lat_long dict_of_words = fiware_dict_geom new_dict += ' "location"' + ":" + str(fiware_dict_geom) \ + "} " else: fiware_dict['value'] = str(dict_of_words[list_col]) new_dict += ' "' + list_col + '"' + ':' + \ str(fiware_dict).replace('"', "") + ' , ' data = new_dict.replace("'", '"').replace('["', "[").\ replace('"]', "]").replace("(", "").replace(")", "") version = self.get_version(self.ip, self.port) access_token = self.get_token(self.user, self.password) headers = {'Content-Type': 'application/json', 'X-Auth-Token': access_token, 'Fiware-Service': self.service, 'Fiware-ServicePath': self.subservice} data_loads = json.loads(data) coord_length = len( data_loads["location"]["value"]["coordinates"][0][0]) if version == 'KeyStone': if coord_length > 80: pass else: response = requests.post(url=uri_cb, headers=headers, data=data, verify=False) if response.status_code not in [200, 201]: raise Exception(response.content.decode()) elif version == 'KeyRock': if coord_length > 80: pass else: response = requests.post(url=uri_cb, headers=headers, data=data) if response.status_code not in [200, 201]: raise Exception(response.content.decode()) else: raise TypeError("Version Actualmente no admitida") def download(self, file, path): """ Parameters ---------- file path Returns ------- None """ if "." in path: path_dst = """{namefile}""".format(namefile=path) else: path_dst = """{namefile}.geojson""".format(namefile=path) file_to_write = open(path_dst, "w") file_to_write.write(file) file_to_write.close() def format_parameters(self, *_, parameters): """ Parameters ---------- parameters Returns ------- parameters """ # Format entity_name parameters.update({'entity_name': re.sub(' +', '_', parameters['entity_name'])}) return parameters