Source code for pyehm.plugins.stonesoup

# -*- coding: utf-8 -*-
import numpy as np
from stonesoup.dataassociator.probability import JPDA
from stonesoup.types.detection import MissedDetection
from stonesoup.types.hypothesis import SingleProbabilityHypothesis
from stonesoup.types.multihypothesis import MultipleHypothesis
from stonesoup.types.numeric import Probability

from pyehm.core import EHM, EHM2


[docs]class JPDAWithEHM(JPDA): """ Joint Probabilistic Data Association with Efficient Hypothesis Management (EHM) This is a faster alternative of the standard :class:`~.JPDA` algorithm, which makes use of Efficient Hypothesis Management (EHM) to efficiently compute the joint associations. See Maskell et al. (2004) [EHM1]_ for more details. """
[docs] def associate(self, tracks, detections, timestamp, **kwargs): """Associate tracks and detections Parameters ---------- tracks : set of :class:`stonesoup.types.track.Track` Tracks which detections will be associated to. detections : set of :class:`stonesoup.types.detection.Detection` Detections to be associated to tracks. timestamp : :class:`datetime.datetime` Timestamp to be used for missed detections and to predict to. Returns ------- : mapping of :class:`stonesoup.types.track.Track` : :class:`stonesoup.types.hypothesis.Hypothesis` Mapping of track to Hypothesis """ # Calculate MultipleHypothesis for each Track over all # available Detections hypotheses = { track: self.hypothesiser.hypothesise(track, detections, timestamp) for track in tracks} multi_hypotheses = self._compute_multi_hypotheses(tracks, detections, hypotheses, timestamp) return multi_hypotheses
@staticmethod def _run_ehm(validation_matrix, likelihood_matrix): return EHM.run(validation_matrix, likelihood_matrix) @staticmethod def _calc_validation_and_likelihood_matrices(tracks, detections, hypotheses): """ Compute the validation and likelihood matrices Parameters ---------- tracks: list of :class:`stonesoup.types.track.Track` Current tracked objects detections : list of :class:`stonesoup.types.detection.Detection` Retrieved measurements hypotheses: dict Key value pairs of tracks with associated detections Returns ------- :class:`numpy.ndarray` An indicator matrix of shape (num_tracks, num_detections + 1) indicating the possible (aka. valid) associations between tracks and detections. The first column corresponds to the null hypothesis (hence contains all ones). :class:`numpy.ndarray` A matrix of shape (num_tracks, num_detections + 1) containing the unnormalised likelihoods for all combinations of tracks and detections. The first column corresponds to the null hypothesis. """ # Ensure tracks and detections are lists (not sets) tracks, detections = list(tracks), list(detections) # Construct validation and likelihood matrices # Both matrices have shape (num_tracks, num_detections + 1), where the first column # corresponds to the null hypothesis. num_tracks, num_detections = len(tracks), len(detections) likelihood_matrix = np.zeros((num_tracks, num_detections + 1)) for i, track in enumerate(tracks): for hyp in hypotheses[track]: if not hyp: likelihood_matrix[i, 0] = hyp.weight else: j = next(d_i for d_i, detection in enumerate(detections) if hyp.measurement == detection) likelihood_matrix[i, j + 1] = hyp.weight validation_matrix = likelihood_matrix > 0 return validation_matrix.astype(int), likelihood_matrix.astype(float) @classmethod def _compute_multi_hypotheses(cls, tracks, detections, hypotheses, time): # Tracks and detections must be in a list so we can keep track of their order track_list = list(tracks) detection_list = list(detections) # Calculate validation and likelihood matrices validation_matrix, likelihood_matrix = \ cls._calc_validation_and_likelihood_matrices(track_list, detection_list, hypotheses) # Run EHM assoc_prob_matrix = cls._run_ehm(validation_matrix, likelihood_matrix) # Calculate MultiMeasurementHypothesis for each Track over all # available Detections with probabilities drawn from the association matrix new_hypotheses = dict() for i, track in enumerate(track_list): single_measurement_hypotheses = list() # Null measurement hypothesis null_hypothesis = next((hyp for hyp in hypotheses[track] if not hyp), None) prob_misdetect = Probability(assoc_prob_matrix[i, 0]) single_measurement_hypotheses.append( SingleProbabilityHypothesis( null_hypothesis.prediction, MissedDetection(timestamp=time), measurement_prediction=null_hypothesis.measurement_prediction, probability=prob_misdetect)) # True hypotheses for hypothesis in hypotheses[track]: if not hypothesis: continue # Get the detection index j = next(d_i+1 for d_i, detection in enumerate(detection_list) if hypothesis.measurement == detection) pro_detect_assoc = Probability(assoc_prob_matrix[i, j]) single_measurement_hypotheses.append( SingleProbabilityHypothesis( hypothesis.prediction, hypothesis.measurement, measurement_prediction=hypothesis.measurement_prediction, probability=pro_detect_assoc)) new_hypotheses[track] = MultipleHypothesis(single_measurement_hypotheses, True, 1) return new_hypotheses
[docs]class JPDAWithEHM2(JPDAWithEHM): """ Joint Probabilistic Data Association with Efficient Hypothesis Management 2 (EHM2) This is an enhanced version of the :class:`~.JPDAWithEHM` algorithm, that makes use of the Efficient Hypothesis Management 2 (EHM2) algorithm to efficiently compute the joint associations. See Horridge et al. (2006) [EHM2]_ for more details. """ @staticmethod def _run_ehm(validation_matrix, likelihood_matrix): assoc_prob_matrix = EHM2.run(validation_matrix, likelihood_matrix) return assoc_prob_matrix