from mrcnn.config import Config import tensorflow as tf import json import os import mrcnn.model as modellib import cv2 import numpy as np import gc import typing as typ from .model import Model class Skynet(Model): LABEL = 'mrcnn' def __init__(self, weights, conf, **kwargs): super(Skynet, self).__init__(**kwargs) self.weights = weights self.conf = conf self.session_conf = tf.compat.v1.ConfigProto() # me indica que cambie el tf.ConfigProto por tf.compat.v1.ConfigProto self.session_conf.allow_soft_placement = True if not os.path.exists(self.conf) or not os.path.exists(self.weights): raise self.not_found_error('Some configuration file is not found, model not loaded') json_conf = json.load(open(self.conf, 'r')) indexes_map = { key: {"index": json_conf[key]['index'], "type": json_conf[key]['type']} if not isinstance(json_conf[key], list) else { "index": json_conf[key][0]['index'], "type": json_conf[key][0]['type']} for key in json_conf} indexes_map['all'] = [{"index": indexes_map[key]["index"], "type": indexes_map[key]["type"]} for key in indexes_map] self.indexes_map = indexes_map with open(self.conf, 'r', encoding="utf-8") as f: entities = json.load(f) self.entities_list = sorted(list(entities.keys())) if 'size_pixels' not in self.attributes.keys(): image_max_size = 1024 image_min_size = 800 zoom_levels = self['zoom_levels'] c = (image_max_size - image_min_size) / np.log(zoom_levels[-1] / zoom_levels[0]) d = image_max_size - np.log(zoom_levels[-1]) * c self.attributes['size_pixels'] = [int(np.log(x_i) * c + d) for x_i in zoom_levels] def detect(self, image, *args): image = cv2.imread(image) # Detect objects # set_session(self.session) self.orig_shape = image.shape pixel_size = self['size_pixels'] image = cv2.resize(image, dsize=(pixel_size[args[0]], pixel_size[args[0]])) results = self.model.detect([image], verbose=0)[0] y1, x1, y2, x2 = np.split(results['rois'], 4, axis=1) bb = np.concatenate([x1, y1, x2, y2], axis=1) return {'masks': results['masks'], 'scores': list(results['scores']), 'class_ids': list(results['class_ids']), 'bboxes': bb.tolist()} def filter(self, result_dict, indexes, detection_threshold): """ Prune of low confident detections Parameters ---------- result_dict: dict Dict containing the results of the detection. Result of self.detect method indexes: list of int List containing the indexes to filter detection_threshold: float Float between 0 and 1. Minimum detection confidence. Returns ------- """ detections_filtered = {'masks': [], 'class_ids': [], 'bboxes': [], 'scores': []} num_masks = result_dict['masks'].shape[2] for id_mask in range(num_masks): if result_dict['scores'][id_mask] >= float(detection_threshold) and \ str(result_dict['class_ids'][id_mask]) in indexes: masks = result_dict['masks'][:, :, id_mask] masks = cv2.resize(masks.astype(np.uint8), dsize=(self.orig_shape[1], self.orig_shape[0])) detections_filtered['masks'].append(masks) detections_filtered['class_ids'].append(result_dict['class_ids'][id_mask]) detections_filtered['bboxes'].append(result_dict['bboxes'][id_mask]) detections_filtered['scores'].append(result_dict['scores'][id_mask]) return detections_filtered @staticmethod def to_array(detections: typ.Dict, img_size: typ.List[int]) -> np.ndarray: """ Convert detection into images (burn detections mask into raster). The result array is an [H, W, 1] tensor with pixel values as the class index and score detected on that pixel. Class index and score must be encoded as a more than three digits integer. With the following pseudo code: - c = class of pixel[i, j] - s = score of pixel[i, j] truncated to one significative digit. - r = result for pixel[i,j] - r = 100*c + 10*s Examples -------- Having the following 4x4 mask and score. Parameters ---------- detections: dict Detection dict, output of self.detect or self.filter img_size: list of int Size of image, [W, H] Returns ------- result_array: numpy.ndarray 3D tensor of masks. [H, W, 1] --> H: Height of image pixels W: Width of image pixels """ result_array = np.zeros([img_size[1], img_size[0], 1]) num_masks = len(detections['masks']) for id_mask in range(num_masks): class_id = detections['class_ids'][id_mask] scores = detections['scores'][id_mask] encoded_scores = int(10 * scores) # ## Cuidado con esto result_array[:, :, 0] += np.expand_dims((int(class_id)*100 + encoded_scores) * detections['masks'][id_mask][:, :], axis=-1)[:, :, 0] return result_array @staticmethod def to_raster(mask_array: np.ndarray, entity: int) -> np.ndarray: mask = (mask_array[:, :, :] // 100).astype(np.int32) == entity return np.squeeze(np.where(mask, mask_array, mask.astype(np.int32)), axis=-1) @staticmethod def visualize(image, results, path=""): mask = results['masks'] image = cv2.imread(image) if not mask.shape[-1]: cv2.imwrite(path, image) return None masks = np.split(mask, mask.shape[-1], axis=2) for id_mask, mask in enumerate(masks): if results['scores'][id_mask] >= 0.9: mask = mask.astype(np.uint8) contours, hierarchy = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) cv2.drawContours(image, contours, -1, (255, 0, 102), 3) for id_bbox, bbox in enumerate(results['bboxes']): image = cv2.rectangle(image, (bbox[0], bbox[1]), (bbox[0] + bbox[2], bbox[1] + bbox[3]), (255, 0, 102), 2) cv2.imwrite(path, image) return None def load(self): json_classes = json.load(open(self.conf, 'r')) class InferenceConfig(Config): # Give the configuration a recognizable name NAME = "skynet" IMAGES_PER_GPU = 1 NUM_CLASSES = 1 + len(json_classes) GPU_COUNT = 1 DETECTION_MIN_CONFIDENCE = 0 config = InferenceConfig() model_dir = os.path.join("tmp", "logs") graph = tf.Graph() self.session = tf.compat.v1.Session(graph=graph, config=self.session_conf) self.session.__enter__() with tf.device('gpu'): model = modellib.MaskRCNN(mode="inference", config=config, model_dir=model_dir) model.load_weights(self.weights, by_name=True) self.model = model self.graph = graph def unload(self): del self.model self.graph.finalize() del self.graph self.session.__exit__(None, None, None) del self.session tf.keras.backend.clear_session() gc.collect()