import tensorflow as tf import json import os from google.protobuf.json_format import MessageToDict import cv2 import numpy as np import gc import typing as typ from .model import Model class UnetSkynet(Model): LABEL = 'unet' def __init__(self, weights, conf, **kwargs): super(UnetSkynet, self).__init__(**kwargs) self.weights = weights self.conf = conf self.session_conf = tf.compat.v1.ConfigProto() self.session_conf.allow_soft_placement = True self.session_conf.gpu_options.allow_growth = True if not os.path.exists(self.conf) or not os.path.exists(self.weights): raise self.not_found_error('Some configuration file is not found, model not loaded') json_conf = json.load(open(self.conf, 'r')) indexes_map = { key: {"index": json_conf[key]['index'], "type": json_conf[key]['type']} if not isinstance(json_conf[key], list) else { "index": json_conf[key][0]['index'], "type": json_conf[key][0]['type']} for key in json_conf} indexes_map['all'] = [{"index": indexes_map[key]["index"], "type": indexes_map[key]["type"]} for key in indexes_map] self.indexes_map = indexes_map with open(self.conf, 'r', encoding="utf-8") as f: entities = json.load(f) self.entities_list = sorted(list(entities.keys())) if 'size_pixels' not in self.attributes.keys(): image_max_size = 1024 image_min_size = 800 zoom_levels = self['zoom_levels'] c = (image_max_size - image_min_size) / np.log(zoom_levels[-1] / zoom_levels[0]) d = image_max_size - np.log(zoom_levels[-1]) * c self.attributes['size_pixels'] = [int(np.log(x_i) * c + d) for x_i in zoom_levels] def detect(self, image, *args): image = cv2.imread(image) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) image = (image-255.0/2)/255.0 self.orig_shape = image.shape pixel_size = self['size_pixels'] image = cv2.resize(image, dsize=(pixel_size[args[0]], pixel_size[args[0]])) image = np.expand_dims(image, axis=0) # Pairing intputs data = {} for input_data_key in self.input_data: data[self.input_data[input_data_key]] = image prediction = self.session.run(self.output_data, data) output = np.squeeze(prediction, axis=0) scores = np.amax(output, axis=2) classes = np.argmax(output, axis=2) return {'output': output, 'masks': classes, 'classes': classes, 'scores': scores} def filter(self, result_dict, indexes, detection_threshold): """ Prune of low confident detections Parameters ---------- result_dict: dict Dict containing the results of the detection. Result of self.detect method indexes: list of int List containing the indexes to filter detection_threshold: float Float between 0 and 1. Minimum detection confidence. Returns ------- """ output = result_dict['output'] output = cv2.resize(output, dsize=(self.orig_shape[1], self.orig_shape[0])) scores = np.amax(output, axis=2) classes = np.argmax(output, axis=2) not_enough_score = (scores >= detection_threshold).astype(np.int) masks = np.multiply(classes, not_enough_score) scores = np.multiply(scores, not_enough_score) return {'output': result_dict['output'], 'masks': masks, 'classes': classes, 'scores': scores} @staticmethod def to_array(detections: typ.Dict, img_size: typ.List[int]) -> np.ndarray: """ Convert detection into images (to burn detections mask into raster). The detections Parameters ---------- detections: dict Detection dict, output of self.detect or self.filter img_size: list of int Size of image, [W, H] Returns ------- result_array: numpy.ndarray 3D tensor of masks. [H, W] --> H: Height of image pixels W: Width of image pixels """ result_array = np.zeros([img_size[1], img_size[0], 1]) encoded_scores = np.apply_along_axis(lambda x: np.vectorize( lambda y: int(10*y))(x), 0, detections['scores']) result_array[:, :, 0] = np.expand_dims(100*detections['masks'] + encoded_scores, axis=-1)[:, :, 0] return result_array @staticmethod def to_raster(mask_array: np.ndarray, entity: int): mask = (mask_array[:, :, :] // 100).astype(np.int32) == entity return np.squeeze(np.where(mask, mask_array, mask.astype(np.int32)), axis=-1) def load(self): self.graph = tf.Graph() self.session = tf.compat.v1.Session(graph=self.graph, config=self.session_conf) self.session.__enter__() with tf.device('gpu'): keras_model = tf.saved_model.load(self.session, ('serve',), self.weights) signatures = keras_model.signature_def[tf.saved_model.DEFAULT_SERVING_SIGNATURE_DEF_KEY] signatures_dict = MessageToDict(signatures) self.input_data = {i: self.graph.get_tensor_by_name(signatures_dict['inputs'][i]['name']) for i in signatures_dict['inputs']} self.output_data = [self.graph.get_tensor_by_name(signatures_dict['outputs'][i]['name']) for i in signatures_dict['outputs']] def unload(self): del self.input_data del self.output_data self.graph.finalize() del self.graph self.session.__exit__(None, None, None) del self.session tf.keras.backend.clear_session() gc.collect()