import numpy as np
from .utils import EHMNetNode, EHM2NetNode, EHMNet, EHM2Tree, gen_clusters
[docs]class EHM:
"""Efficient Hypothesis Management (EHM)
An implementation of the EHM algorithm, as documented in [EHM1]_.
"""
[docs] @staticmethod
def construct_net(validation_matrix):
"""Construct the EHM net as per Section 3.1 of [EHM1]_
Parameters
----------
validation_matrix: :class:`numpy.ndarray`
An indicator matrix of shape (num_tracks, num_detections + 1) indicating the possible
(aka. valid) associations between tracks and detections. The first column corresponds
to the null hypothesis (hence contains all ones).
Returns
-------
: :class:`~.EHMNet`
The constructed net object
"""
num_tracks = validation_matrix.shape[0]
# Initialise net
root_node = EHMNetNode(layer=-1) # Root node is at layer -1
net = EHMNet([root_node], validation_matrix=validation_matrix)
# A layer in the network is created for each track (not counting the root-node layer)
num_layers = num_tracks
for i in range(num_layers):
# Get list of nodes in previous layer
parent_nodes = [node for node in net.nodes if node.layer == i - 1]
# Get indices of hypothesised detections for the track
v_detections = set(np.flatnonzero(validation_matrix[i, :]))
# Compute accumulated measurements up to next layer (i+1)
acc = set()
for ii in range(i + 1, num_layers):
acc |= set(np.flatnonzero(validation_matrix[ii, :]))
# List of nodes in current layer
children_per_identity = dict()
# For all nodes in previous layer
for parent in parent_nodes:
# Exclude any detections already considered by parent nodes (always include null)
v_detections_m1 = (v_detections - parent.identity) | {0}
# Iterate over valid detections
for j in v_detections_m1:
# Identity
identity = acc.intersection(parent.identity | {j}) - {0}
# Find valid nodes in current layer that have the same identity
try:
v_children = children_per_identity[tuple(sorted(identity))]
except KeyError:
v_children = set()
# If layer is empty or no valid nodes exist, add new node
if not len(v_children):
# Create new node
child = EHMNetNode(layer=i, identity=identity)
# Add node to net
net.add_node(child, parent, j)
# Add node to list of child nodes
try:
children_per_identity[tuple(sorted(child.identity))].add(child)
except KeyError:
children_per_identity[tuple(sorted(child.identity))] = {child}
else:
# Simply add new edge or update existing one
for child in v_children:
net.add_edge(parent, child, j)
return net
[docs] @staticmethod
def compute_association_probabilities(net, likelihood_matrix):
"""Compute the joint association weights, as described in Section 3.3 of [EHM1]_
Parameters
----------
net: :class:`~.EHMNet`
A net object representing the valid joint association hypotheses
likelihood_matrix: :class:`numpy.ndarray`
A matrix of shape (num_tracks, num_detections + 1) containing the unnormalised
likelihoods for all combinations of tracks and detections. The first column corresponds
to the null hypothesis.
Returns
-------
:class:`numpy.ndarray`
A matrix of shape (num_tracks, num_detections + 1) containing the normalised
association probabilities for all combinations of tracks and detecrtons. The first
column corresponds to the null hypothesis.
"""
num_tracks, num_detections = likelihood_matrix.shape
num_nodes = net.num_nodes
# Compute p_D (Downward-pass) - Eq. (22) of [EHM1]
p_D = np.zeros((num_nodes,))
p_D[0] = 1
for child in net.nodes[1:]:
c_i = child.ind
parents = net.get_parents(child)
for parent in parents:
p_i = parent.ind
ids = list(net.edges[(parent, child)])
p_D[c_i] += np.sum(likelihood_matrix[child.layer, ids] * p_D[p_i])
# Compute p_U (Upward-pass) - Eq. (23) of [EHM1]
p_U = np.zeros((num_nodes,))
p_U[-1] = 1
for parent in reversed(net.nodes[:-1]):
p_i = parent.ind
children = net.get_children(parent)
for child in children:
c_i = child.ind
ids = list(net.edges[(parent, child)])
p_U[p_i] += np.sum(likelihood_matrix[child.layer, ids] * p_U[c_i])
# Compute p_DT - Eq. (21) of [EHM1]
p_DT = np.zeros((num_detections, num_nodes))
for child in net.nodes:
c_i = child.ind
# v_edges = {edge: ids for edge, ids in net.edges.items() if edge[1] == child}
# for edge, ids in v_edges.items():
# p_i = edge[0].ind
# for j in ids:
# p_DT[j, c_i] += p_D[p_i]
for parent in net.get_parents(child):
p_i = parent.ind
for j in net.edges[(parent, child)]:
p_DT[j, c_i] += p_D[p_i]
# Compute p_T - Eq. (20) of [EHM1]
p_T = np.ones((num_detections, num_nodes))
p_T[:, 0] = 0
for node in net.nodes[1:]:
n_i = node.ind
for j in range(num_detections):
p_T[j, n_i] = p_U[n_i] * likelihood_matrix[node.layer, j] * p_DT[j, n_i]
# Compute association weights - Eq. (15) of [EHM1]
a_matrix = np.zeros(likelihood_matrix.shape)
for i in range(num_tracks):
node_inds = [n_i for n_i, node in enumerate(net.nodes) if node.layer == i]
for j in range(num_detections):
a_matrix[i, j] = np.sum(p_T[j, node_inds])
# Normalise
a_matrix[i, :] = a_matrix[i, :] / np.sum(a_matrix[i, :])
return a_matrix
[docs] @classmethod
def run(cls, validation_matrix, likelihood_matrix):
"""Run EHM to compute and return association probabilities
Parameters
----------
validation_matrix : :class:`numpy.ndarray`
An indicator matrix of shape (num_tracks, num_detections + 1) indicating the possible
(aka. valid) associations between tracks and detections. The first column corresponds
to the null hypothesis (hence contains all ones).
likelihood_matrix: :class:`numpy.ndarray`
A matrix of shape (num_tracks, num_detections + 1) containing the unnormalised
likelihoods for all combinations of tracks and detections. The first column corresponds
to the null hypothesis.
Returns
-------
:class:`numpy.ndarray`
A matrix of shape (num_tracks, num_detections + 1) containing the normalised
association probabilities for all combinations of tracks and detections. The first
column corresponds to the null hypothesis.
"""
# Cluster tracks into groups that share common detections
clusters, missed_tracks = gen_clusters(validation_matrix, likelihood_matrix)
# Initialise the association probabilities matrix.
assoc_prob_matrix = np.zeros(likelihood_matrix.shape)
assoc_prob_matrix[missed_tracks, 0] = 1 # Null hypothesis is certain for missed tracks
# Perform EHM for each cluster
for cluster in clusters:
# Extract track and detection indices
c_tracks = cluster.tracks
c_detections = cluster.detections
# Extract validation and likelihood matrices for cluster
c_validation_matrix = cluster.validation_matrix
c_likelihood_matrix = cluster.likelihood_matrix
# Construct the EHM net
net = cls.construct_net(c_validation_matrix)
# Compute the association probabilities
c_assoc_prob_matrix = cls.compute_association_probabilities(net, c_likelihood_matrix)
# Map the association probabilities to the main matrix
for i, track in enumerate(c_tracks):
assoc_prob_matrix[track, c_detections] = c_assoc_prob_matrix[i, :]
return assoc_prob_matrix
[docs]class EHM2(EHM):
""" Efficient Hypothesis Management 2 (EHM2)
An implementation of the EHM2 algorithm, as documented in [EHM2]_.
"""
[docs] @classmethod
def construct_net(cls, validation_matrix):
""" Construct the EHM net as per Section 4 of [EHM2]_
Parameters
----------
validation_matrix: :class:`numpy.ndarray`
An indicator matrix of shape (num_tracks, num_detections + 1) indicating the possible
(aka. valid) associations between tracks and detections. The first column corresponds
to the null hypothesis (hence contains all ones).
Returns
-------
: :class:`~.EHMNet`
The constructed net object
Raises
------
ValueError
If the provided ``validation_matrix`` is such that tracks can be divided into separate clusters. See
the :ref:`Note <note1>` below for work-around.
.. _note1:
.. note::
If the provided ``validation_matrix`` is such that tracks can be divided into separate clusters, this
method will raise a ValueError exception. To work-around this issue, you can use the
:func:`~pyehm.utils.gen_clusters` function to first generate individual clusters and then generate a net
for each cluster, as shown below:
.. code-block:: python
from pyehm.core import EHM2
from pyehm.utils import gen_clusters
validation_matrix = <Your validation matrix>
clusters, _ = gen_clusters(validation_matrix)
nets = []
for cluster in clusters:
nets.append(EHM2.construct_net(cluster.validation_matrix)
"""
num_tracks = validation_matrix.shape[0]
# Construct tree
try:
tree = cls.construct_tree(validation_matrix)
except ValueError:
raise ValueError('The provided validation matrix results in multiple clusters of tracks')
# Initialise net
root_node = EHM2NetNode(layer=0, track=0, subnet=0)
net = EHMNet([root_node], validation_matrix=validation_matrix)
# Recursively construct next layers
cls._construct_net_layer(net, tree, 1)
# Compute and cache nodes per track
for i in range(num_tracks):
net.nodes_per_track[i] = [node for node in net.nodes if node.track == i]
return net
@classmethod
def _construct_net_layer(cls, net, tree, layer):
# Get list of nodes in previous layer of subtree
try:
parent_nodes = net.nodes_per_layer_subnet[(layer - 1, tree.subtree)]
except KeyError:
parent_nodes = set()
# Get indices of hypothesised detections for the track
v_detections = set(np.flatnonzero(net.validation_matrix[tree.track, :]))
# If this is not an end layer
if tree.children:
# Process each subtree
for child_tree in tree.children:
# Compute accumulated measurements up to next layer (i+1)
acc = {0} | child_tree.detections
# List of nodes in current layer
children_per_identity = dict()
# For all nodes in previous layer
for parent in parent_nodes:
# Exclude any detections already considered by parent nodes (always include null)
v_detections_m1 = (v_detections - parent.identity) | {0}
# Iterate over valid detections
for j in v_detections_m1:
# Identity
identity = acc.intersection(parent.identity | {j}) - {0}
# Find valid nodes in current layer that have the same identity
try:
v_children = children_per_identity[tuple(sorted(identity))]
except KeyError:
v_children = set()
# If layer is empty or no valid nodes exist, add new node
if not len(v_children):
# Create new node
child = EHM2NetNode(layer=layer, subnet=child_tree.subtree, track=child_tree.track,
identity=identity)
# Add node to net
net.add_node(child, parent, j)
# Add node to list of child nodes
try:
children_per_identity[tuple(sorted(child.identity))].add(child)
except KeyError:
children_per_identity[tuple(sorted(child.identity))] = {child}
else:
# Simply add new edge or update existing one
for child in v_children:
net.add_edge(parent, child, j)
else:
# For all nodes in previous layer
for parent in parent_nodes:
# Exclude any detections already considered by parent nodes (always include null)
v_detections_m1 = (v_detections - parent.identity) | {0}
# Get leaf child, if any
try:
child = next(iter(net.nodes_per_layer_subnet[(layer, tree.subtree)]))
except (KeyError, StopIteration):
child = None
# Iterate over valid detections
for j in v_detections_m1:
# If layer is empty or no valid node exist, add new node
if not child:
# Create new node
child = EHM2NetNode(layer=layer, subnet=tree.subtree)
# Add node to net
net.add_node(child, parent, j)
else:
# Simply add new edge or update existing one
net.add_edge(parent, child, j)
# Create new layers for each sub-tree
for i, child_tree in enumerate(tree.children):
cls._construct_net_layer(net, child_tree, layer + 1)
[docs] @staticmethod
def construct_tree(validation_matrix):
""" Construct the EHM2 tree as per section 4.3 of [EHM2]_
Parameters
----------
validation_matrix: :class:`numpy.ndarray`
An indicator matrix of shape (num_tracks, num_detections + 1) indicating the possible
(aka. valid) associations between tracks and detections. The first column corresponds
to the null hypothesis (hence contains all ones).
Returns
-------
: :class:`~.EHM2Tree`
The constructed tree object
Raises
------
ValueError
If the provided ``validation_matrix`` is such that tracks can be divided into separate clusters. See
the :ref:`Note <note2>` below for work-around.
.. _note2:
.. note::
If the provided ``validation_matrix`` is such that tracks can be divided into separate clusters, this
method will raise a ValueError exception. To work-around this issue, you can use the
:func:`~pyehm.utils.gen_clusters` function to first generate individual clusters and then generate a tree
for each cluster, as shown below:
.. code-block:: python
from pyehm.core import EHM2
from pyehm.utils import gen_clusters
validation_matrix = <Your validation matrix>
clusters, _ = gen_clusters(validation_matrix)
trees = []
for cluster in clusters:
trees.append(EHM2.construct_tree(cluster.validation_matrix)
"""
num_tracks = validation_matrix.shape[0]
trees = []
last_subtree_index = -1
for i in reversed(range(num_tracks)):
# Get indices of hypothesised detections for the track (minus the null hypothesis)
v_detections = set(np.flatnonzero(validation_matrix[i, :])) - {0}
matched = []
for j, tree in enumerate(trees):
if v_detections.intersection(tree.detections):
matched.append(j)
if matched:
children = [trees[j] for j in matched]
detections = set()
for tree in children:
detections |= tree.detections
detections |= v_detections
subtree_index = np.max([c.subtree for c in children])
tree = EHM2Tree(i, children, detections, subtree_index)
trees = [trees[j] for j in range(len(trees)) if j not in matched]
else:
children = []
last_subtree_index += 1
tree = EHM2Tree(i, children, v_detections, last_subtree_index)
trees.append(tree)
if len(trees) > 1:
raise ValueError('The provided validation matrix results in multiple clusters of tracks')
tree = trees[0]
# Reverse subtree indices
max_subtree_ind = tree.subtree
for node in tree.nodes:
node.subtree = max_subtree_ind - node.subtree
return tree
[docs] @staticmethod
def compute_association_probabilities(net, likelihood_matrix):
""" Compute the joint association weights, as described in Section 4.2 of [EHM2]_
Parameters
----------
net: :class:`~.EHMNet`
A net object representing the valid joint association hypotheses
likelihood_matrix: :class:`numpy.ndarray`
A matrix of shape (num_tracks, num_detections + 1) containing the unnormalised
likelihoods for all combinations of tracks and detections. The first column corresponds
to the null hypothesis.
Returns
-------
:class:`numpy.ndarray`
A matrix of shape (num_tracks, num_detections + 1) containing the normalised
association probabilities for all combinations of tracks and detecrtons. The first
column corresponds to the null hypothesis.
"""
num_tracks, num_detections = likelihood_matrix.shape
num_nodes = net.num_nodes
nodes_forwards = net.nodes_forward
nodes_backwards = list(reversed(nodes_forwards))
# Precompute valid detections per track
v_detections_per_track = [set(np.flatnonzero(row)) for row in net.validation_matrix]
# Compute w_B (Backward-pass) - Eq. (47) of [EHM2]
w_B = np.zeros((num_nodes,))
for parent in nodes_backwards:
p_i = parent.ind
# If parent is a leaf node
if parent.track is None:
w_B[p_i] = 1
continue
weight = 0
v_detections = v_detections_per_track[parent.track] - parent.identity
for det_ind in v_detections:
v_children = net.children_per_detection[(parent, det_ind)]
weight_det = likelihood_matrix[parent.track, det_ind]
for child in v_children:
c_i = child.ind
weight_det *= w_B[c_i]
weight += weight_det
w_B[p_i] = weight
# Compute w_F (Forward-pass) - Eq. (49) of [EHM2]
w_F = np.zeros((num_nodes,))
w_F[0] = 1
for parent in nodes_forwards:
# Skip the leaf nodes
if parent.track is None:
continue
p_i = parent.ind
v_detections = v_detections_per_track[parent.track] - parent.identity
for det_ind in v_detections:
v_children = net.children_per_detection[(parent, det_ind)]
for child in v_children:
if child.track is None:
continue
c_i = child.ind
sibling_inds = list({c.ind for c in v_children} - {child.ind})
sibling_weight = np.prod(w_B[sibling_inds]) if len(sibling_inds) > 0 else 1
weight = likelihood_matrix[parent.track, det_ind] * w_F[p_i] * sibling_weight
w_F[c_i] += weight
# Compute association probs - Eq. (46) of [EHM2]
a_matrix = np.zeros(likelihood_matrix.shape)
for track in range(num_tracks):
v_detections = v_detections_per_track[track]
for detection in v_detections:
for parent in net.nodes_per_track[track]:
try:
v_children = net.children_per_detection[(parent, detection)]
except KeyError:
continue
weight = likelihood_matrix[track, detection] * w_F[parent.ind]
for child in v_children:
weight *= w_B[child.ind]
a_matrix[track, detection] += weight
a_matrix[track, :] /= np.sum(a_matrix[track, :])
return a_matrix