1
0
Fork 0

Trajectory Generator

master
Pietro 4 years ago
parent e160b88167
commit a7794f9f2c
  1. 1
      .gitignore
  2. 3
      PyCTBN/PyCTBN/estimators/parameters_estimator.py
  3. 8
      PyCTBN/PyCTBN/structure_graph/conditional_intensity_matrix.py
  4. 11
      PyCTBN/PyCTBN/structure_graph/set_of_cims.py
  5. 71
      PyCTBN/PyCTBN/structure_graph/trajectory_generator.py
  6. 9
      PyCTBN/tests/structure_graph/test_cim.py
  7. 9
      PyCTBN/tests/structure_graph/test_setofcims.py
  8. 6
      PyCTBN/tests/utility/test_cache.py

1
.gitignore vendored

@ -7,3 +7,4 @@ __pycache__
**/results_data
**/.scannerwork
**/build
test.py

@ -33,7 +33,8 @@ class ParametersEstimator(object):
"""
p_vals = self._net_graph._aggregated_info_about_nodes_parents[2]
node_states_number = self._net_graph.get_states_number(node_id)
self._single_set_of_cims = SetOfCims(node_id, p_vals, node_states_number, self._net_graph.p_combs)
self._single_set_of_cims = SetOfCims(node_id = node_id, parents_states_number = p_vals,
node_states_number = node_states_number, p_combs = self._net_graph.p_combs)
def compute_parameters_for_node(self, node_id: str) -> SetOfCims:
"""Compute the CIMS of the node identified by the label ``node_id``.

@ -11,12 +11,16 @@ class ConditionalIntensityMatrix(object):
:type state_transition_matrix: numpy.ndArray
:_cim: the actual cim of the node
"""
def __init__(self, state_residence_times: np.array, state_transition_matrix: np.array):
def __init__(self, state_residence_times: np.array = None, state_transition_matrix: np.array = None,
cim: np.array = None):
"""Constructor Method
"""
self._state_residence_times = state_residence_times
self._state_transition_matrix = state_transition_matrix
self._cim = self.state_transition_matrix.astype(np.float64)
if cim is not None:
self._cim = cim
else:
self._cim = self.state_transition_matrix.astype(np.float64)
def compute_cim_coefficients(self) -> None:
"""Compute the coefficients of the matrix _cim by using the following equality q_xx' = M[x, x'] / T[x].

@ -23,7 +23,8 @@ class SetOfCims(object):
:_actual_cims: the cims of the node
"""
def __init__(self, node_id: str, parents_states_number: typing.List, node_states_number: int, p_combs: np.ndarray):
def __init__(self, node_id: str, parents_states_number: typing.List, node_states_number: int, p_combs: np.ndarray,
cims: np.ndarray = None):
"""Constructor Method
"""
self._node_id = node_id
@ -33,7 +34,11 @@ class SetOfCims(object):
self._state_residence_times = None
self._transition_matrices = None
self._p_combs = p_combs
self.build_times_and_transitions_structures()
if cims is not None:
self._actual_cims = cims
else:
self.build_times_and_transitions_structures()
def build_times_and_transitions_structures(self) -> None:
"""Initializes at the correct dimensions the state residence times matrix and the state transition matrices.
@ -57,7 +62,7 @@ class SetOfCims(object):
:type transition_matrices: numpy.ndArray
"""
for state_res_time_vector, transition_matrix in zip(state_res_times, transition_matrices):
cim_to_add = ConditionalIntensityMatrix(state_res_time_vector, transition_matrix)
cim_to_add = ConditionalIntensityMatrix(state_residence_times = state_res_time_vector, state_transition_matrix = transition_matrix)
cim_to_add.compute_cim_coefficients()
self._actual_cims.append(cim_to_add)
self._actual_cims = np.array(self._actual_cims)

@ -0,0 +1,71 @@
from ..utility.abstract_importer import AbstractImporter
from .conditional_intensity_matrix import ConditionalIntensityMatrix
from .set_of_cims import SetOfCims
from .trajectory import Trajectory
import numpy as np
import pandas as pd
import re
from numpy import random
class TrajectoryGenerator(object):
def __init__(self, importer: AbstractImporter):
self._importer = importer
self._importer.import_data(0)
self._vnames = self._importer._df_variables.iloc[:, 0].to_list()
self._parents = {}
for v in self._vnames:
self._parents[v] = self._importer._df_structure.where(self._importer._df_structure["To"] == v).dropna()["From"].tolist()
self._cims = {}
sampled_cims = self._importer._raw_data[0]["dyn.cims"]
for v in sampled_cims.keys():
p_combs = []
v_cims = []
for comb in sampled_cims[v].keys():
p_combs.append(np.array(re.findall(r"=(\d)", comb)).astype("int"))
cim = pd.DataFrame(sampled_cims[v][comb]).to_numpy()
v_cims.append(ConditionalIntensityMatrix(cim = cim))
sof = SetOfCims(node_id = v, parents_states_number = [self._importer._df_variables.where(self._importer._df_variables["Name"] == p)["Value"] for p in self._parents[v]],
node_states_number = self._importer._df_variables.where(self._importer._df_variables["Name"] == v)["Value"], p_combs = p_combs, cims = v_cims)
self._cims[v] = sof
def CTBN_Sample(self, t_end):
#Replace v_values with sigma (pandas dataframe)
t = 0
sigma = pd.DataFrame(columns = (["Time"] + self._vnames))
sigma.loc[len(sigma)] = [0] + [0 for v in self._vnames]
time = np.full(len(self._vnames), np.NaN)
while True:
for i in range(0, time.size):
if np.isnan(time[i]):
# Probability to transition from current state v_values[i] to (1 - v_values[i])
current_values = sigma.loc[len(sigma) - 1]
cim = self._cims[self._vnames[i]].filter_cims_with_mask(np.array([True for p in self._parents[self._vnames[i]]]),
[current_values.at[p] for p in self._parents[self._vnames[i]]])[0].cim
param = cim[current_values.at[self._vnames[i]]][1 - current_values.at[self._vnames[i]]]
time[i] = t + random.exponential(scale = param)
# next = index of the variable that will transition first
next = time.argmin()
t = time[next]
if t >= t_end:
return sigma
else:
new_row = pd.DataFrame(sigma[-1:].values, columns = sigma.columns)
new_row.loc[0].at[self._vnames[next]] = 1 - new_row.loc[0].at[self._vnames[next]]
new_row.loc[0].at["Time"] = round(t, 4)
sigma = sigma.append(new_row, ignore_index = True)
""" sup = sigma.loc[len(sigma) - 1]
sup.at[self._vnames[next]] = 1 - sup.at[self._vnames[next]]
sup.at["Time"] = round(t, 4)
sigma.loc[len(sigma)] = sup """
# undefine variable time
time[next] = np.NaN

@ -17,14 +17,16 @@ class TestConditionalIntensityMatrix(unittest.TestCase):
cls.state_transition_matrix[i, i] = np.sum(cls.state_transition_matrix[i])
def test_init(self):
c1 = ConditionalIntensityMatrix(self.state_res_times, self.state_transition_matrix)
c1 = ConditionalIntensityMatrix(state_residence_times = self.state_res_times,
state_transition_matrix = self.state_transition_matrix)
self.assertTrue(np.array_equal(self.state_res_times, c1.state_residence_times))
self.assertTrue(np.array_equal(self.state_transition_matrix, c1.state_transition_matrix))
self.assertEqual(c1.cim.dtype, np.float)
self.assertEqual(self.state_transition_matrix.shape, c1.cim.shape)
def test_compute_cim_coefficients(self):
c1 = ConditionalIntensityMatrix(self.state_res_times, self.state_transition_matrix)
c1 = ConditionalIntensityMatrix(state_residence_times = self.state_res_times,
state_transition_matrix = self.state_transition_matrix)
c2 = self.state_transition_matrix.astype(np.float)
np.fill_diagonal(c2, c2.diagonal() * -1)
for i in range(0, len(self.state_res_times)):
@ -38,7 +40,8 @@ class TestConditionalIntensityMatrix(unittest.TestCase):
self.assertTrue(np.isclose(c1.cim[i, j], c2[i, j], 1e01, 1e-01))
def test_repr(self):
c1 = ConditionalIntensityMatrix(self.state_res_times, self.state_transition_matrix)
c1 = ConditionalIntensityMatrix(state_residence_times = self.state_res_times,
state_transition_matrix = self.state_transition_matrix)
print(c1)

@ -49,7 +49,8 @@ class TestSetOfCims(unittest.TestCase):
def test_filter_cims_with_mask(self):
p_combs = self.build_p_comb_structure_for_a_node(self.possible_cardinalities)
sofc1 = SetOfCims('X', self.possible_cardinalities, 3, p_combs)
sofc1 = SetOfCims(node_id = 'X', parents_states_number = self.possible_cardinalities, node_states_number = 3,
p_combs = p_combs)
state_res_times_list = []
transition_matrices_list = []
for i in range(len(p_combs)):
@ -73,7 +74,8 @@ class TestSetOfCims(unittest.TestCase):
def aux_test_build_cims(self, node_id, p_values, node_states, p_combs):
state_res_times_list = []
transition_matrices_list = []
so1 = SetOfCims(node_id, p_values, node_states, p_combs)
so1 = SetOfCims(node_id = node_id, parents_states_number = p_values, node_states_number = node_states,
p_combs = p_combs)
for i in range(len(p_combs)):
state_res_times = np.random.rand(1, node_states)[0]
state_res_times = state_res_times * 1000
@ -87,7 +89,8 @@ class TestSetOfCims(unittest.TestCase):
self.assertIsNone(so1._state_residence_times)
def aux_test_init(self, node_id, parents_states_number, node_states_number, p_combs):
sofcims = SetOfCims(node_id, parents_states_number, node_states_number, p_combs)
sofcims = SetOfCims(node_id = node_id, parents_states_number = parents_states_number,
node_states_number = node_states_number, p_combs = p_combs)
self.assertEqual(sofcims._node_id, node_id)
self.assertTrue(np.array_equal(sofcims._p_combs, p_combs))
self.assertTrue(np.array_equal(sofcims._parents_states_number, parents_states_number))

@ -16,13 +16,13 @@ class TestCache(unittest.TestCase):
def test_put(self):
c1 = Cache()
pset1 = {'X', 'Y'}
sofc1 = SetOfCims('Z', [], 3, np.array([]))
sofc1 = SetOfCims(node_id = 'Z', parents_states_number = [], node_states_number = 3, p_combs = np.array([]))
c1.put(pset1, sofc1)
self.assertEqual(1, len(c1._actual_cache))
self.assertEqual(1, len(c1._list_of_sets_of_parents))
self.assertEqual(sofc1, c1._actual_cache[0])
pset2 = {'X'}
sofc2 = SetOfCims('Z', [], 3, np.array([]))
sofc2 = SetOfCims(node_id = 'Z', parents_states_number = [], node_states_number = 3, p_combs = np.array([]))
c1.put(pset2, sofc2)
self.assertEqual(2, len(c1._actual_cache))
self.assertEqual(2, len(c1._list_of_sets_of_parents))
@ -31,7 +31,7 @@ class TestCache(unittest.TestCase):
def test_find(self):
c1 = Cache()
pset1 = {'X', 'Y'}
sofc1 = SetOfCims('Z', [], 3, np.array([]))
sofc1 = SetOfCims(node_id = 'Z', parents_states_number = [], node_states_number = 3, p_combs = np.array([]))
c1.put(pset1, sofc1)
self.assertEqual(1, len(c1._actual_cache))
self.assertEqual(1, len(c1._list_of_sets_of_parents))