diff --git a/crabs/detection_tracking/config/inference_config.yaml b/crabs/detection_tracking/config/inference_config.yaml deleted file mode 100644 index a6905b40..00000000 --- a/crabs/detection_tracking/config/inference_config.yaml +++ /dev/null @@ -1,11 +0,0 @@ -iou_threshold: 0.1 -score_threshold: 0.1 -# Maximum number of frames to keep alive a track without associated detections. -max_age: 10 -# Minimum number of associated detections before track is initialised -min_hits: 1 -# save video inference -save_video: False -# Save predicted tracks in VIA csv format and export corresponding frames -# This is useful to prepare for manual labelling of tracks -save_csv_and_frames: False diff --git a/crabs/detection_tracking/inference_model.py b/crabs/detection_tracking/inference_model.py deleted file mode 100644 index cf067a27..00000000 --- a/crabs/detection_tracking/inference_model.py +++ /dev/null @@ -1,426 +0,0 @@ -import argparse -import csv -import logging -import os -import sys -from pathlib import Path -from typing import Any, Optional, TextIO, Tuple - -import cv2 -import numpy as np -import torch -import torchvision.transforms.v2 as transforms -import yaml # type: ignore - -from crabs.detection_tracking.models import FasterRCNN -from crabs.detection_tracking.sort import Sort -from crabs.detection_tracking.tracking_utils import ( - evaluate_mota, - get_ground_truth_data, - save_frame_and_csv, - write_tracked_bbox_to_csv, -) -from crabs.detection_tracking.visualization import draw_bbox - -DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") - - -class DetectorInference: - """ - A class for performing object detection or tracking inference on a video - using a trained model. - - Parameters - ---------- - args : argparse.Namespace) - Command-line arguments containing configuration settings. - - Attributes - ---------- - args : argparse.Namespace - The command-line arguments provided. - video_path : str - The path to the input video. - sort_tracker : Sort - An instance of the sorting algorithm used for tracking. - """ - - def __init__(self, args: argparse.Namespace) -> None: - self.args = args - self.config_file = args.config_file - self.video_path = args.video_path - - self.video_file_root = f"{Path(self.video_path).stem}" - self.trained_model = self.load_trained_model() - self.load_config_yaml() - self.sort_tracker = Sort( - max_age=self.config["max_age"], - min_hits=self.config["min_hits"], - iou_threshold=self.config["iou_threshold"], - ) - - def load_config_yaml(self): - with open(self.config_file, "r") as f: - self.config = yaml.safe_load(f) - - def load_trained_model(self) -> torch.nn.Module: - """ - Load the trained model. - - Returns - ------- - torch.nn.Module - """ - # Get trained model - trained_model = FasterRCNN.load_from_checkpoint( - self.args.checkpoint_path - ) - trained_model.eval() - trained_model.to(DEVICE) - return trained_model - - def prep_sort(self, prediction: dict) -> np.ndarray: - """ - Put predictions in format expected by SORT - - Parameters - ---------- - prediction : dict - The dictionary containing predicted bounding boxes, scores, and labels. - - Returns - ------- - np.ndarray: - An array containing sorted bounding boxes of detected objects. - """ - pred_boxes = prediction[0]["boxes"].detach().cpu().numpy() - pred_scores = prediction[0]["scores"].detach().cpu().numpy() - pred_labels = prediction[0]["labels"].detach().cpu().numpy() - - pred_sort = [] - for box, score, label in zip(pred_boxes, pred_scores, pred_labels): - if score > self.config["score_threshold"]: - bbox = np.concatenate((box, [score])) - pred_sort.append(bbox) - - return np.asarray(pred_sort) - - def load_video(self) -> None: - """ - Load the input video, and prepare the output video if required. - """ - self.video = cv2.VideoCapture(self.video_path) - if not self.video.isOpened(): - raise Exception("Error opening video file") - - # create directory to save output - os.makedirs(self.args.output_dir, exist_ok=True) - - if self.config["save_video"]: - frame_width = int(self.video.get(cv2.CAP_PROP_FRAME_WIDTH)) - frame_height = int(self.video.get(cv2.CAP_PROP_FRAME_HEIGHT)) - cap_fps = self.video.get(cv2.CAP_PROP_FPS) - - output_file = os.path.join( - self.args.output_dir, - f"{os.path.basename(self.video_file_root)}_output_video.mp4", - ) - output_codec = cv2.VideoWriter_fourcc("m", "p", "4", "v") - self.video_output = cv2.VideoWriter( - output_file, output_codec, cap_fps, (frame_width, frame_height) - ) - - def prep_csv_writer(self) -> Tuple[Any, TextIO]: - """ - Prepare csv writer to output tracking results - """ - - crabs_tracks_label_dir = ( - Path(self.args.output_dir) / "crabs_tracks_label" - ) - self.tracking_output_dir = ( - crabs_tracks_label_dir / self.video_file_root - ) - # Create the subdirectory for the specific video file root - self.tracking_output_dir.mkdir(parents=True, exist_ok=True) - - csv_file = open( - f"{str(self.tracking_output_dir / self.video_file_root)}.csv", - "w", - ) - csv_writer = csv.writer(csv_file) - - # write header following VIA convention - # https://www.robots.ox.ac.uk/~vgg/software/via/docs/face_track_annotation.html - csv_writer.writerow( - ( - "filename", - "file_size", - "file_attributes", - "region_count", - "region_id", - "region_shape_attributes", - "region_attributes", - ) - ) - - return csv_writer, csv_file - - def evaluate_tracking( - self, - gt_boxes_list: list, - tracked_boxes_list: list, - ) -> list[float]: - """ - Evaluate tracking performance using the Multi-Object Tracking Accuracy (MOTA) metric. - - Parameters - ---------- - gt_boxes_list : list[list[float]] - List of ground truth bounding boxes for each frame. - tracked_boxes_list : list[list[float]] - List of tracked bounding boxes for each frame. - - Returns - ------- - list[float]: - The computed MOTA (Multi-Object Tracking Accuracy) score for the tracking performance. - """ - mota_values = [] - prev_frame_ids: Optional[list[list[int]]] = None - # prev_frame_ids = None - for gt_boxes, tracked_boxes in zip(gt_boxes_list, tracked_boxes_list): - mota = evaluate_mota( - gt_boxes, - tracked_boxes, - self.config["iou_threshold"], - prev_frame_ids, - ) - mota_values.append(mota) - # Update previous frame IDs for the next iteration - prev_frame_ids = [[box[-1] for box in tracked_boxes]] - - return mota_values - - def get_prediction(self, frame: np.ndarray) -> torch.Tensor: - """ - Get prediction from the trained model for a given frame. - - Parameters - ---------- - frame : np.ndarray - The input frame for which prediction is to be obtained. - - Returns - ------- - torch.Tensor: - The prediction tensor from the trained model. - """ - transform = transforms.Compose( - [ - transforms.ToImage(), - transforms.ToDtype(torch.float32, scale=True), - ] - ) - img = transform(frame).to(DEVICE) - img = img.unsqueeze(0) - with torch.no_grad(): - prediction = self.trained_model(img) - return prediction - - def update_tracking(self, prediction: dict) -> list[list[float]]: - """ - Update the tracking system with the latest prediction. - - Parameters - ---------- - prediction : dict - Dictionary containing predicted bounding boxes, scores, and labels. - - Returns - ------- - list[list[float]]: - list of tracked bounding boxes after updating the tracking system. - """ - pred_sort = self.prep_sort(prediction) - tracked_boxes = self.sort_tracker.update(pred_sort) - self.tracked_list.append(tracked_boxes) - return tracked_boxes - - def save_required_output( - self, - tracked_boxes: list[list[float]], - frame: np.ndarray, - frame_number: int, - ) -> None: - """ - Handle the output based argument options. - - Parameters - ---------- - tracked_boxes : list[list[float]] - list of tracked bounding boxes. - frame : np.ndarray - The current frame. - frame_number : int - The frame number. - """ - frame_name = f"{self.video_file_root}_frame_{frame_number:08d}.png" - if self.config["save_csv_and_frames"]: - save_frame_and_csv( - frame_name, - self.tracking_output_dir, - tracked_boxes, - frame, - frame_number, - self.csv_writer, - ) - else: - for bbox in tracked_boxes: - write_tracked_bbox_to_csv( - bbox, frame, frame_name, self.csv_writer - ) - - if self.config["save_video"]: - frame_copy = frame.copy() - for bbox in tracked_boxes: - xmin, ymin, xmax, ymax, id = bbox - draw_bbox( - frame_copy, - (xmin, ymin), - (xmax, ymax), - (0, 0, 255), - f"id : {int(id)}", - ) - self.video_output.write(frame_copy) - - def run_inference(self): - """ - Run object detection + tracking on the video frames. - """ - # initialisation - frame_number = 1 - self.tracked_list = [] - - self.csv_writer, csv_file = self.prep_csv_writer() - - # Loop through frames of the video in batches - while self.video.isOpened(): - # Break if beyond end frame (mostly for debugging) - if ( - self.args.max_frames_to_read - and frame_number > self.args.max_frames_to_read - ): - break - - # read frame - ret, frame = self.video.read() - if not ret: - print("No frame read. Exiting...") - break - - prediction = self.get_prediction(frame) - - # run tracking - self.prep_sort(prediction) - tracked_boxes = self.update_tracking(prediction) - self.save_required_output(tracked_boxes, frame, frame_number) - - # update frame - frame_number += 1 - - if self.args.gt_dir: - logging.info("evaluate the tracking") - gt_boxes_list = get_ground_truth_data(self.args.gt_dir) - mota_values = self.evaluate_tracking( - gt_boxes_list, self.tracked_list, self.config["iou_threshold"] - ) - overall_mota = np.mean(mota_values) - logging.info("Overall MOTA:", overall_mota) - - # Close input video - self.video.release() - - # Close outputs - if self.config["save_video"]: - self.video_output.release() - - if self.config["save_csv_and_frames"]: - csv_file.close() - - -def main(args) -> None: - """ - Main function to run the inference on video based on the trained model. - - Parameters - ---------- - args : argparse - Arguments or configuration settings for testing. - - Returns - ------- - None - """ - - inference = DetectorInference(args) - inference.load_video() - inference.run_inference() - - -def inference_parse_args(args): - parser = argparse.ArgumentParser() - parser.add_argument( - "--checkpoint_path", - type=str, - required=True, - help="location of checkpoint of the trained model", - ) - parser.add_argument( - "--video_path", - type=str, - required=True, - help="location of images and coco annotation", - ) - parser.add_argument( - "--config_file", - type=str, - default=str( - Path(__file__).parent / "config" / "inference_config.yaml" - ), - help=( - "Location of YAML config to control training. " - "Default: crabs-exploration/crabs/detection_tracking/config/inference_config.yaml" - ), - ) - parser.add_argument( - "--output_dir", - type=str, - default="crabs_track_output", - help="Directory to save the track output", - ) - parser.add_argument( - "--max_frames_to_read", - type=int, - default=None, - help="Maximum number of frames to read (mostly for debugging).", - ) - parser.add_argument( - "--gt_dir", - type=str, - default=None, - help="Location of json file containing ground truth annotations.", - ) - return parser.parse_args(args) - - -def app_wrapper(): - torch.set_float32_matmul_precision("medium") - - inference_args = inference_parse_args(sys.argv[1:]) - main(inference_args) - - -if __name__ == "__main__": - app_wrapper() - diff --git a/crabs/detection_tracking/sort.py b/crabs/detection_tracking/sort.py deleted file mode 100644 index 8f291b78..00000000 --- a/crabs/detection_tracking/sort.py +++ /dev/null @@ -1,301 +0,0 @@ -""" -SORT: A Simple, Online and Realtime Tracker -Copyright (C) 2016-2020 Alex Bewley alex@bewley.ai - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see . -""" - -from __future__ import print_function - -import numpy as np -from filterpy.kalman import KalmanFilter - - -def linear_assignment(cost_matrix): - try: - import lap - - _, x, y = lap.lapjv(cost_matrix, extend_cost=True) - return np.array([[y[i], i] for i in x if i >= 0]) # - except ImportError: - from scipy.optimize import linear_sum_assignment - - x, y = linear_sum_assignment(cost_matrix) - return np.array(list(zip(x, y))) - - -def iou_batch(bb_test, bb_gt): - """ - From SORT: Computes IOU between two bboxes in the form [x1,y1,x2,y2] - """ - bb_gt = np.expand_dims(bb_gt, 0) - bb_test = np.expand_dims(bb_test, 1) - - xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0]) - yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1]) - xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2]) - yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3]) - w = np.maximum(0.0, xx2 - xx1) - h = np.maximum(0.0, yy2 - yy1) - wh = w * h - o = wh / ( - (bb_test[..., 2] - bb_test[..., 0]) - * (bb_test[..., 3] - bb_test[..., 1]) - + (bb_gt[..., 2] - bb_gt[..., 0]) * (bb_gt[..., 3] - bb_gt[..., 1]) - - wh - ) - return o - - -def convert_bbox_to_z(bbox): - """ - Takes a bounding box in the form [x1,y1,x2,y2] and returns z in the form - [x,y,s,r] where x,y is the centre of the box and s is the scale/area and r is - the aspect ratio - """ - w = bbox[2] - bbox[0] - h = bbox[3] - bbox[1] - x = bbox[0] + w / 2.0 - y = bbox[1] + h / 2.0 - s = w * h # scale is just area - r = w / float(h) - return np.array([x, y, s, r]).reshape((4, 1)) - - -def convert_x_to_bbox(x, score=None): - """ - Takes a bounding box in the centre form [x,y,s,r] and returns it in the form - [x1,y1,x2,y2] where x1,y1 is the top left and x2,y2 is the bottom right - """ - w = np.sqrt(x[2] * x[3]) - h = x[2] / w - if score is None: - return np.array( - [x[0] - w / 2.0, x[1] - h / 2.0, x[0] + w / 2.0, x[1] + h / 2.0] - ).reshape((1, 4)) - else: - return np.array( - [ - x[0] - w / 2.0, - x[1] - h / 2.0, - x[0] + w / 2.0, - x[1] + h / 2.0, - score, - ] - ).reshape((1, 5)) - - -class KalmanBoxTracker(object): - """ - This class represents the internal state of individual tracked objects - observed as bbox. - """ - - count = 0 - - def __init__(self, bbox): - """ - Initialises a tracker using initial bounding box. - """ - # define constant velocity model - self.kf = KalmanFilter(dim_x=7, dim_z=4) - self.kf.F = np.array( - [ - [1, 0, 0, 0, 1, 0, 0], - [0, 1, 0, 0, 0, 1, 0], - [0, 0, 1, 0, 0, 0, 1], - [0, 0, 0, 1, 0, 0, 0], - [0, 0, 0, 0, 1, 0, 0], - [0, 0, 0, 0, 0, 1, 0], - [0, 0, 0, 0, 0, 0, 1], - ] - ) - self.kf.H = np.array( - [ - [1, 0, 0, 0, 0, 0, 0], - [0, 1, 0, 0, 0, 0, 0], - [0, 0, 1, 0, 0, 0, 0], - [0, 0, 0, 1, 0, 0, 0], - ] - ) - - self.kf.R[2:, 2:] *= 10.0 - self.kf.P[ - 4:, 4: - ] *= 1000.0 # give high uncertainty to the unobservable initial velocities - self.kf.P *= 10.0 - self.kf.Q[-1, -1] *= 0.01 - self.kf.Q[4:, 4:] *= 0.01 - - self.kf.x[:4] = convert_bbox_to_z(bbox) - self.time_since_update = 0 - self.id = KalmanBoxTracker.count - KalmanBoxTracker.count += 1 - self.history = [] - self.hits = 0 - self.hit_streak = 0 - self.age = 0 - - def update(self, bbox): - """ - Updates the state vector with observed bbox. - """ - self.time_since_update = 0 - self.history = [] - self.hits += 1 - self.hit_streak += 1 - self.kf.update(convert_bbox_to_z(bbox)) - - def predict(self): - """ - Advances the state vector and returns the predicted bounding box estimate. - """ - if (self.kf.x[6] + self.kf.x[2]) <= 0: - self.kf.x[6] *= 0.0 - self.kf.predict() - self.age += 1 - if self.time_since_update > 0: - self.hit_streak = 0 - self.time_since_update += 1 - self.history.append(convert_x_to_bbox(self.kf.x)) - return self.history[-1] - - def get_state(self): - """ - Returns the current bounding box estimate. - """ - return convert_x_to_bbox(self.kf.x) - - -def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3): - """ - Assigns detections to tracked object (both represented as bounding boxes) - - Returns 3 lists of matches, unmatched_detections and unmatched_trackers - """ - if len(trackers) == 0: - return ( - np.empty((0, 2), dtype=int), - np.arange(len(detections)), - np.empty((0, 5), dtype=int), - ) - - iou_matrix = iou_batch(detections, trackers) - - if min(iou_matrix.shape) > 0: - a = (iou_matrix > iou_threshold).astype(np.int32) - if a.sum(1).max() == 1 and a.sum(0).max() == 1: - matched_indices = np.stack(np.where(a), axis=1) - else: - matched_indices = linear_assignment(-iou_matrix) - else: - matched_indices = np.empty(shape=(0, 2)) - - unmatched_detections = [] - for d, det in enumerate(detections): - if d not in matched_indices[:, 0]: - unmatched_detections.append(d) - unmatched_trackers = [] - for t, trk in enumerate(trackers): - if t not in matched_indices[:, 1]: - unmatched_trackers.append(t) - - # filter out matched with low IOU - matches = [] - for m in matched_indices: - if iou_matrix[m[0], m[1]] < iou_threshold: - unmatched_detections.append(m[0]) - unmatched_trackers.append(m[1]) - else: - matches.append(m.reshape(1, 2)) - if len(matches) == 0: - matches = np.empty((0, 2), dtype=int) - else: - matches = np.concatenate(matches, axis=0) - - return ( - matches, - np.array(unmatched_detections), - np.array(unmatched_trackers), - ) - - -class Sort(object): - def __init__(self, max_age=1, min_hits=3, iou_threshold=0.3): - """ - Sets key parameters for SORT - """ - self.max_age = max_age - self.min_hits = min_hits - self.iou_threshold = iou_threshold - self.trackers = [] - self.frame_count = 0 - - def update(self, dets=np.empty((0, 5))): - """ - Params: - dets - a numpy array of detections in the format - [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...] - Requires: this method must be called once for each frame even with - empty detections (use np.empty((0, 5)) for frames without detections). - Returns the a similar array, where the last column is the object ID. - - NOTE: The number of objects returned may differ from the number of - detections provided. - """ - self.frame_count += 1 - # get predicted locations from existing trackers. - trks = np.zeros((len(self.trackers), 5)) - to_del = [] - ret = [] - for t, trk in enumerate(trks): - pos = self.trackers[t].predict()[0] - trk[:] = [pos[0], pos[1], pos[2], pos[3], 0] - if np.any(np.isnan(pos)): - to_del.append(t) - trks = np.ma.compress_rows(np.ma.masked_invalid(trks)) - for t in reversed(to_del): - self.trackers.pop(t) - ( - matched, - unmatched_dets, - unmatched_trks, - ) = associate_detections_to_trackers(dets, trks, self.iou_threshold) - - # update matched trackers with assigned detections - for m in matched: - self.trackers[m[1]].update(dets[m[0], :]) - - # create and initialise new trackers for unmatched detections - for i in unmatched_dets: - trk = KalmanBoxTracker(dets[i, :]) - self.trackers.append(trk) - i = len(self.trackers) - for trk in reversed(self.trackers): - d = trk.get_state()[0] - if (trk.time_since_update < 1) and ( - trk.hit_streak >= self.min_hits - or self.frame_count <= self.min_hits - ): - ret.append( - np.concatenate((d, [trk.id + 1])).reshape(1, -1) - ) # +1 as MOT benchmark requires positive - i -= 1 - # remove dead tracklet - if trk.time_since_update > self.max_age: - self.trackers.pop(i) - if len(ret) > 0: - return np.concatenate(ret) - return np.empty((0, 5)) - diff --git a/crabs/detection_tracking/tracking_utils.py b/crabs/detection_tracking/tracking_utils.py deleted file mode 100644 index b68cfa48..00000000 --- a/crabs/detection_tracking/tracking_utils.py +++ /dev/null @@ -1,355 +0,0 @@ -import csv -import json -import logging -from pathlib import Path -from typing import Any, Dict, Optional - -import cv2 -import numpy as np - - -def calculate_iou(box1: np.ndarray, box2: np.ndarray) -> float: - """ - Calculate IoU (Intersection over Union) of two bounding boxes. - - Parameters - ---------- - box1 (np.ndarray): - Coordinates [x1, y1, x2, y2] of the first bounding box. - Here, (x1, y1) represents the top-left corner, and (x2, y2) represents the bottom-right corner. - box2 (np.ndarray): - Coordinates [x1, y1, x2, y2] of the second bounding box. - Here, (x1, y1) represents the top-left corner, and (x2, y2) represents the bottom-right corner. - - Returns - ------- - float: - IoU value. - """ - x1_box1, y1_box1, x2_box1, y2_box1 = box1 - x1_box2, y1_box2, x2_box2, y2_box2 = box2 - - # Calculate intersection coordinates - x1_intersect = max(x1_box1, x1_box2) - y1_intersect = max(y1_box1, y1_box2) - x2_intersect = min(x2_box1, x2_box2) - y2_intersect = min(y2_box1, y2_box2) - - # Calculate area of intersection rectangle - intersect_width = max(0, x2_intersect - x1_intersect + 1) - intersect_height = max(0, y2_intersect - y1_intersect + 1) - intersect_area = intersect_width * intersect_height - - # Calculate area of individual bounding boxes - box1_area = (x2_box1 - x1_box1 + 1) * (y2_box1 - y1_box1 + 1) - box2_area = (x2_box2 - x1_box2 + 1) * (y2_box2 - y1_box2 + 1) - - iou = intersect_area / float(box1_area + box2_area - intersect_area) - - return iou - - -def count_identity_switches( - prev_frame_ids: Optional[list[list[int]]], - current_frame_ids: Optional[list[list[int]]], -) -> int: - """ - Count the number of identity switches between two sets of object IDs. - - Parameters - ---------- - prev_frame_ids : Optional[list[list[int]]] - List of object IDs in the previous frame. - current_frame_ids : Optional[list[list[int]]] - List of object IDs in the current frame. - - Returns - ------- - int - The number of identity switches between the two sets of object IDs. - """ - - if prev_frame_ids is None or current_frame_ids is None: - return 0 - - # Initialize count of identity switches - num_switches = 0 - - prev_ids = set(prev_frame_ids[0]) - current_ids = set(current_frame_ids[0]) - - # Calculate the number of switches by finding the difference in IDs - num_switches = len(prev_ids.symmetric_difference(current_ids)) - - return num_switches - - -def evaluate_mota( - gt_boxes: np.ndarray, - tracked_boxes: np.ndarray, - iou_threshold: float, - prev_frame_ids: Optional[list[list[int]]], -) -> float: - """ - Evaluate MOTA (Multiple Object Tracking Accuracy). - - MOTA is a metric used to evaluate the performance of object tracking algorithms. - - Parameters - ---------- - gt_boxes : np.ndarray - Ground truth bounding boxes of objects. - tracked_boxes : np.ndarray - Tracked bounding boxes of objects. - iou_threshold : float - Intersection over Union (IoU) threshold for considering a match. - prev_frame_ids : Optional[list[list[int]]] - IDs from the previous frame for identity switch detection. - - Returns - ------- - float - The computed MOTA (Multi-Object Tracking Accuracy) score for the tracking performance. - - Notes - ----- - MOTA is calculated using the following formula: - - MOTA = 1 - (Missed Detections + False Positives + Identity Switches) / Total Ground Truth - - - Missed Detections: Instances where the ground truth objects were not detected by the tracking algorithm. - - False Positives: Instances where the tracking algorithm produces a detection where there is no corresponding ground truth object. - - Identity Switches: Instances where the tracking algorithm assigns a different ID to an object compared to its ID in the previous frame. - - Total Ground Truth: The total number of ground truth objects in the scene. - - The MOTA score ranges from 0 to 1, with higher values indicating better tracking performance. - A MOTA score of 1 indicates perfect tracking, where there are no missed detections, false positives, or identity switches. - """ - total_gt = len(gt_boxes) - false_positive = 0 - - for i, tracked_box in enumerate(tracked_boxes): - best_iou = 0.0 - best_match = None - - for j, gt_box in enumerate(gt_boxes): - iou = calculate_iou(gt_box[:4], tracked_box[:4]) - if iou > iou_threshold and iou > best_iou: - best_iou = iou - best_match = j - if best_match is not None: - # successfully found a matching ground truth box for the tracked box. - # set the corresponding ground truth box to None. - gt_boxes[best_match] = None - else: - false_positive += 1 - - missed_detections = 0 - for box in gt_boxes: - if box is not None and not np.all(np.isnan(box)): - # if true ground truth box was not matched with any tracked box - missed_detections += 1 - - tracked_ids = [[box[-1] for box in tracked_boxes]] - - num_switches = count_identity_switches(prev_frame_ids, tracked_ids) - - mota = 1 - (missed_detections + false_positive + num_switches) / total_gt - return mota - - -def extract_bounding_box_info(row: list[str]) -> Dict[str, Any]: - """ - Extracts bounding box information from a row of data. - - Parameters - ---------- - row : list[str] - A list representing a row of data containing information about a bounding box. - - Returns - ------- - Dict[str, Any]: - A dictionary containing the extracted bounding box information. - """ - filename = row[0] - region_shape_attributes = json.loads(row[5]) - region_attributes = json.loads(row[6]) - - x = region_shape_attributes["x"] - y = region_shape_attributes["y"] - width = region_shape_attributes["width"] - height = region_shape_attributes["height"] - track_id = region_attributes["track"] - - frame_number = int(filename.split("_")[-1].split(".")[0]) - 1 - return { - "frame_number": frame_number, - "x": x, - "y": y, - "width": width, - "height": height, - "id": track_id, - } - - -def create_gt_list( - ground_truth_data: list[Dict[str, Any]], gt_boxes_list: list[np.ndarray] -) -> list[np.ndarray]: - """ - Creates a list of ground truth bounding boxes organized by frame number. - - Parameters - ---------- - ground_truth_data : list[Dict[str, Any]] - A list containing ground truth bounding box data organized by frame number. - gt_boxes_list : list[np.ndarray] - A list to store the ground truth bounding boxes for each frame. - - Returns - ------- - list[np.ndarray]: - A list containing ground truth bounding boxes organized by frame number. - """ - for data in ground_truth_data: - frame_number = data["frame_number"] - bbox = np.array( - [ - data["x"], - data["y"], - data["x"] + data["width"], - data["y"] + data["height"], - data["id"], - ], - dtype=np.float32, - ) - if gt_boxes_list[frame_number].size == 0: - gt_boxes_list[frame_number] = bbox.reshape( - 1, -1 - ) # Initialize as a 2D array - else: - gt_boxes_list[frame_number] = np.vstack( - [gt_boxes_list[frame_number], bbox] - ) - return gt_boxes_list - - -def get_ground_truth_data(gt_dir: str) -> list[np.ndarray]: - """ - Extract ground truth bounding box data from a CSV file. - - Parameters - ---------- - gt_dir : str - The path to the CSV file containing ground truth data. - - Returns - ------- - list[np.ndarray]: - A list containing ground truth bounding box data organized by frame number. - The numpy array represent the coordinates and ID of the bounding box in the order: - x, y, x + width, y + height, ID - """ - ground_truth_data = [] - max_frame_number = 0 - - # Open the CSV file and read its contents line by line - with open(gt_dir, "r") as csvfile: - csvreader = csv.reader(csvfile) - next(csvreader) # Skip the header row - for row in csvreader: - data = extract_bounding_box_info(row) - ground_truth_data.append(data) - max_frame_number = max(max_frame_number, data["frame_number"]) - - # Initialize a list to store the ground truth bounding boxes for each frame - gt_boxes_list = [np.array([]) for _ in range(max_frame_number + 1)] - - gt_boxes_list = create_gt_list(ground_truth_data, gt_boxes_list) - return gt_boxes_list - - -def write_tracked_bbox_to_csv( - bbox: np.ndarray, - frame: np.ndarray, - frame_name: str, - csv_writer: Any, -) -> None: - """ - Write bounding box annotation to a CSV file. - - Parameters - ---------- - bbox : np.ndarray - A numpy array containing the bounding box coordinates - (xmin, ymin, xmax, ymax, id). - frame : np.ndarray - The frame to which the bounding box belongs. - frame_name : str - The name of the frame. - csv_writer : Any - The CSV writer object to write the annotation. - """ - # Bounding box geometry - xmin, ymin, xmax, ymax, id = bbox - width_box = int(xmax - xmin) - height_box = int(ymax - ymin) - - # Add to csv - csv_writer.writerow( - ( - frame_name, - frame.size, - '{{"clip":{}}}'.format("123"), - 1, - 0, - '{{"name":"rect","x":{},"y":{},"width":{},"height":{}}}'.format( - xmin, ymin, width_box, height_box - ), - '{{"track":"{}"}}'.format(int(id)), - ) - ) - - -def save_frame_and_csv( - frame_name: str, - tracking_output_dir: Path, - tracked_boxes: list[list[float]], - frame: np.ndarray, - frame_number: int, - csv_writer: Any, -) -> None: - """ - Save tracked bounding boxes as frames and write to a CSV file. - - Parameters - ---------- - video_file_root : str - The root path of the video file. - tracking_output_dir : Path - The directory where tracked frames and CSV file will be saved. - tracked_boxes : list[list[float]] - List of bounding boxes to be saved. - frame : np.ndarray - The frame image. - frame_number : int - The frame number. - csv_writer : Any - CSV writer object for writing bounding box data. - - Returns - ------- - None - """ - for bbox in tracked_boxes: - # Add bbox to csv - write_tracked_bbox_to_csv(bbox, frame, frame_name, csv_writer) - - # Save frame as PNG - once as per frame - frame_path = tracking_output_dir / frame_name - img_saved = cv2.imwrite(str(frame_path), frame) - if not img_saved: - logging.error( - f"Didn't save {frame_name}, frame {frame_number}, Skipping." - ) -