1
0
Fork 0

Add tests to all CIM related classes

parallel_struct_est
philpMartin 4 years ago
parent 462e29bb17
commit e3511dc05a
  1. 33
      main_package/classes/conditional_intensity_matrix.py
  2. 39
      main_package/classes/network_graph.py
  3. 6
      main_package/classes/parameters_estimator.py
  4. 37
      main_package/classes/set_of_cims.py
  5. 14
      main_package/classes/sets_of_cims_container.py
  6. 41
      main_package/tests/test_cim.py
  7. 2
      main_package/tests/test_json_importer.py
  8. 130
      main_package/tests/test_networkgraph.py
  9. 83
      main_package/tests/test_setofcims.py
  10. 24
      main_package/tests/test_sets_of_cims_container.py
  11. 2
      main_package/tests/test_structure.py

@ -3,26 +3,29 @@ import numpy as np
class ConditionalIntensityMatrix:
def __init__(self, dimension, state_residence_times, state_transition_matrix):
self.state_residence_times = state_residence_times
self.state_transition_matrix = state_transition_matrix
def __init__(self, state_residence_times, state_transition_matrix):
self._state_residence_times = state_residence_times
self._state_transition_matrix = state_transition_matrix
#self.cim = np.zeros(shape=(dimension, dimension), dtype=float)
self.cim = self.state_transition_matrix.astype(np.float)
self._cim = self.state_transition_matrix.astype(np.float)
def update_state_transition_count(self, element_indx):
#print(element_indx)
#self.state_transition_matrix[element_indx[0]][element_indx[1]] += 1
self.state_transition_matrix[element_indx] += 1
def compute_cim_coefficients(self):
np.fill_diagonal(self._cim, self._cim.diagonal() * -1)
#print(self._cim)
self._cim = ((self._cim.T + 1) / (self._state_residence_times + 1)).T
#np.fill_diagonal(self.state_transition_matrix, 0)
def update_state_residence_time_for_state(self, state, time):
#print("Time updating In state", state, time)
self.state_residence_times[state] += time
@property
def state_residence_times(self):
return self._state_residence_times
@property
def state_transition_matrix(self):
return self._state_transition_matrix
def compute_cim_coefficients(self):
np.fill_diagonal(self.cim, self.cim.diagonal() * -1)
self.cim = ((self.cim.T + 1) / (self.state_residence_times + 1)).T
#np.fill_diagonal(self.state_transition_matrix, 0)
@property
def cim(self):
return self._cim
def __repr__(self):
return 'CIM:\n' + str(self.cim)

@ -115,13 +115,14 @@ class NetworkGraph():
return fancy_indx
def build_time_scalar_indexing_structure_for_a_node(self, node_id, parents_id):
#print(node_id)
#print("Parents_id", parents_id)
T_vector = np.array([self.graph_struct.variables_frame.iloc[node_id, 1].astype(np.int)])
def build_time_scalar_indexing_structure_for_a_node(self, node_indx, parents_indxs):
#print(node_indx)
#print("Parents_id", parents_indxs)
#T_vector = np.array([self.graph_struct.variables_frame.iloc[node_id, 1].astype(np.int)])
T_vector = np.array([self.get_states_number_by_indx(node_indx)])
#print(T_vector)
#print("Here ", self.graph_struct.variables_frame.iloc[parents_id[0], 1])
T_vector = np.append(T_vector, [self.graph_struct.get_states_number_by_indx(x) for x in parents_id])
T_vector = np.append(T_vector, [self.graph_struct.get_states_number_by_indx(x) for x in parents_indxs])
#print(T_vector)
T_vector = T_vector.cumprod().astype(np.int)
return T_vector
@ -130,32 +131,34 @@ class NetworkGraph():
def build_time_scalar_indexing_structure(self):
parents_indexes_list = self._fancy_indexing
for node_indx, p_indxs in zip(self.graph_struct.list_of_nodes_indexes(), parents_indexes_list):
if p_indxs.size == 0:
self._time_scalar_indexing_structure.append(np.array([self.get_states_number_by_indx(node_indx)],
dtype=np.int))
else:
#if not p_indxs:
#self._time_scalar_indexing_structure.append(np.array([self.get_states_number_by_indx(node_indx)],
#dtype=np.int))
#else:
self._time_scalar_indexing_structure.append(
self.build_time_scalar_indexing_structure_for_a_node(node_indx, p_indxs))
def build_transition_scalar_indexing_structure_for_a_node(self, node_id, parents_id):
M_vector = np.array([self.graph_struct.variables_frame.iloc[node_id, 1],
self.graph_struct.variables_frame.iloc[node_id, 1].astype(np.int)])
M_vector = np.append(M_vector, [self.graph_struct.get_states_number_by_indx(x) for x in parents_id])
def build_transition_scalar_indexing_structure_for_a_node(self, node_indx, parents_indxs):
#M_vector = np.array([self.graph_struct.variables_frame.iloc[node_id, 1],
#self.graph_struct.variables_frame.iloc[node_id, 1].astype(np.int)])
M_vector = np.array([self.get_states_number_by_indx(node_indx),
self.get_states_number_by_indx(node_indx)])
M_vector = np.append(M_vector, [self.graph_struct.get_states_number_by_indx(x) for x in parents_indxs])
M_vector = M_vector.cumprod().astype(np.int)
return M_vector
def build_transition_scalar_indexing_structure(self):
parents_indexes_list = self._fancy_indexing
for node_indx, p_indxs in enumerate(parents_indexes_list):
for node_indx, p_indxs in zip(self.graph_struct.list_of_nodes_indexes(), parents_indexes_list):
self._transition_scalar_indexing_structure.append(
self.build_transition_scalar_indexing_structure_for_a_node(node_indx, p_indxs))
def build_time_columns_filtering_structure(self):
parents_indexes_list = self._fancy_indexing
for node_indx, p_indxs in enumerate(parents_indexes_list):
if p_indxs.size == 0:
self._time_filtering.append(np.append(p_indxs, np.array([node_indx], dtype=np.int)))
else:
for node_indx, p_indxs in zip(self.graph_struct.list_of_nodes_indexes(), parents_indexes_list):
#if p_indxs.size == 0:
#self._time_filtering.append(np.append(p_indxs, np.array([node_indx], dtype=np.int)))
#else:
self._time_filtering.append(np.append(np.array([node_indx], dtype=np.int), p_indxs))
def build_transition_columns_filtering_structure(self):

@ -6,7 +6,7 @@ import numba as nb
import numpy as np
import network_graph as ng
import sample_path as sp
import amalgamated_cims as acims
import sets_of_cims_container as acims
class ParametersEstimator:
@ -14,10 +14,10 @@ class ParametersEstimator:
def __init__(self, sample_path, net_graph):
self.sample_path = sample_path
self.net_graph = net_graph
self.amalgamated_cims_struct = None
self.sets_of_cims_struct = None
def init_amalgamated_cims_struct(self):
self.amalgamated_cims_struct = acims.AmalgamatedCims(self.net_graph.get_states_number_of_all_nodes_sorted(),
self.sets_of_cims_struct = acims.SetsOfCimsContainer(self.net_graph.get_states_number_of_all_nodes_sorted(),
self.net_graph.get_nodes(),
self.net_graph.get_ordered_by_indx_parents_values_for_all_nodes())

@ -1,5 +1,4 @@
import numpy as np
from numba import njit, int32
import conditional_intensity_matrix as cim
@ -23,48 +22,15 @@ class SetOfCims:
self.build_actual_cims_structure()
def build_actual_cims_structure(self):
#cims_number = 1
#for state_number in self.parents_states_number:
#cims_number = cims_number * state_number
if not self.parents_states_number:
#self.actual_cims = np.empty(1, dtype=cim.ConditionalIntensityMatrix)
#self.actual_cims[0] = cim.ConditionalIntensityMatrix(self.node_states_number)
self.state_residence_times = np.zeros((1, self.node_states_number), dtype=np.float)
self.transition_matrices = np.zeros((1,self.node_states_number, self.node_states_number), dtype=np.int)
else:
#self.actual_cims = np.empty(self.parents_states_number, dtype=cim.ConditionalIntensityMatrix)
#self.build_actual_cims(self.actual_cims)
#for indx, matrix in enumerate(self.actual_cims):
#self.actual_cims[indx] = cim.ConditionalIntensityMatrix(self.node_states_number)
self.state_residence_times = \
np.zeros((np.prod(self.parents_states_number), self.node_states_number), dtype=np.float)
self.transition_matrices = np.zeros([np.prod(self.parents_states_number), self.node_states_number,
self.node_states_number], dtype=np.int)
def update_state_transition(self, indexes, element_indx_tuple):
#matrix_indx = self.indexes_converter(indexes)
#print(indexes)
if not indexes:
self.actual_cims[0].update_state_transition_count(element_indx_tuple)
else:
self.actual_cims[indexes].update_state_transition_count(element_indx_tuple)
def update_state_residence_time(self, which_matrix, which_element, time):
#matrix_indx = self.indexes_converter(which_matrix)
if not which_matrix:
self.actual_cims[0].update_state_residence_time_for_state(which_element, time)
else:
#print(type(which_matrix))
#print(self.actual_cims[(2,2)])
self.actual_cims[which_matrix].update_state_residence_time_for_state(which_element, time)
def build_actual_cims(self, cim_structure):
for indx in range(len(cim_structure)):
if cim_structure[indx] is None:
cim_structure[indx] = cim.ConditionalIntensityMatrix(self.node_states_number)
else:
self.build_actual_cims(cim_structure[indx])
def get_cims_number(self):
return len(self.actual_cims)
@ -82,8 +48,7 @@ class SetOfCims:
def build_cims(self, state_res_times, transition_matrices):
for state_res_time_vector, transition_matrix in zip(state_res_times, transition_matrices):
#print(state_res_time_vector, transition_matrix)
cim_to_add = cim.ConditionalIntensityMatrix(self.node_states_number,
state_res_time_vector, transition_matrix)
cim_to_add = cim.ConditionalIntensityMatrix(state_res_time_vector, transition_matrix)
cim_to_add.compute_cim_coefficients()
#print(cim_to_add)
self.actual_cims.append(cim_to_add)

@ -1,32 +1,29 @@
import set_of_cims as socim
import numpy as np
class AmalgamatedCims:
class SetsOfCimsContainer:
"""
Aggrega un insieme di oggetti SetOfCims indicizzandoli a partire dal node_id della variabile:
{X:SetofCimsX, Y:SetOfCimsY.......}
Aggrega un insieme di oggetti SetOfCims
"""
# list_of_vars_orders contiene tutte le liste con i parent ordinati secondo il valore indx
def __init__(self, states_number_per_node, list_of_keys, list_of_parents_states_number):
def __init__(self, list_of_keys, states_number_per_node, list_of_parents_states_number):
self.sets_of_cims = []
self.init_cims_structure(list_of_keys, states_number_per_node, list_of_parents_states_number)
#self.states_per_variable = states_number
def init_cims_structure(self, keys, states_number_per_node, list_of_parents_states_number):
#print(keys)
#print(list_of_parents_states_number)
for indx, key in enumerate(keys):
self.sets_of_cims.append(
socim.SetOfCims(key, list_of_parents_states_number[indx], states_number_per_node[indx]))
def get_set_of_cims(self, node_indx):
return self.sets_of_cims[node_indx]
def get_cims_of_node(self, node_indx, cim_indx):
return self.sets_of_cims[node_indx].get_cim(cim_indx)
"""
def get_vars_order(self, node):
return self.actual_cims[node][1]
@ -35,3 +32,4 @@ class AmalgamatedCims:
def update_state_residence_time_for_matrix(self, which_node, which_matrix, which_element, time):
self.sets_of_cims[which_node].update_state_residence_time(which_matrix, which_element, time)
"""

@ -0,0 +1,41 @@
import unittest
import numpy as np
import conditional_intensity_matrix as cim
class TestConditionalIntensityMatrix(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.state_res_times = np.random.rand(1, 3)[0]
cls.state_res_times = cls.state_res_times * 1000
cls.state_transition_matrix = np.random.randint(1, 10000, (3, 3))
for i in range(0, len(cls.state_res_times)):
cls.state_transition_matrix[i, i] = 0
cls.state_transition_matrix[i, i] = np.sum(cls.state_transition_matrix[i])
def test_init(self):
c1 = cim.ConditionalIntensityMatrix(self.state_res_times, self.state_transition_matrix)
self.assertTrue(np.array_equal(self.state_res_times, c1.state_residence_times))
self.assertTrue(np.array_equal(self.state_transition_matrix, c1.state_transition_matrix))
self.assertEqual(c1.cim.dtype, np.float)
self.assertEqual(self.state_transition_matrix.shape, c1.cim.shape)
def test_compute_cim_coefficients(self):
c1 = cim.ConditionalIntensityMatrix(self.state_res_times, self.state_transition_matrix)
c2 = self.state_transition_matrix.astype(np.float)
np.fill_diagonal(c2, c2.diagonal() * -1)
for i in range(0, len(self.state_res_times)):
for j in range(0, len(self.state_res_times)):
c2[i, j] = (c2[i, j] + 1) / (self.state_res_times[i] + 1)
c1.compute_cim_coefficients()
for i in range(0, len(c1.state_residence_times)):
self.assertTrue(np.isclose(np.sum(c1.cim[i]), 0.0, 1e-02, 1e-01))
for i in range(0, len(self.state_res_times)):
for j in range(0, len(self.state_res_times)):
self.assertTrue(np.isclose(c1.cim[i, j], c2[i, j], 1e-02, 1e-01))
if __name__ == '__main__':
unittest.main()

@ -1,3 +1,5 @@
import sys
sys.path.append("/Users/Zalum/Desktop/Tesi/CTBN_Project/main_package/classes/")
import unittest
import numpy as np
import pandas as pd

@ -1,16 +1,17 @@
import unittest
import networkx as nx
import numpy as np
import sample_path as sp
import network_graph as ng
class TestNetworkGraph(unittest.TestCase):
def setUp(self):
self.s1 = sp.SamplePath('../data', 'samples', 'dyn.str', 'variables', 'Time', 'Name')
self.s1.build_trajectories()
self.s1.build_structure()
@classmethod
def setUpClass(cls):
cls.s1 = sp.SamplePath('../data', 'samples', 'dyn.str', 'variables', 'Time', 'Name')
cls.s1.build_trajectories()
cls.s1.build_structure()
def test_init(self):
g1 = ng.NetworkGraph(self.s1.structure)
@ -30,14 +31,14 @@ class TestNetworkGraph(unittest.TestCase):
for e in self.s1.structure.list_of_edges():
self.assertIn(tuple(e), g1.get_edges())
def test_get_ordered_by_indx_set_of_parents(self):
"""def test_get_ordered_by_indx_set_of_parents(self):
g1 = ng.NetworkGraph(self.s1.structure)
g1.add_nodes(self.s1.structure.list_of_nodes_labels())
g1.add_edges(self.s1.structure.list_of_edges())
sorted_par_list_aggregated_info = g1.get_ordered_by_indx_set_of_parents(g1.get_nodes()[2])
self.test_aggregated_par_list_data(g1,g1.get_nodes()[2], sorted_par_list_aggregated_info)
self.test_aggregated_par_list_data(g1, g1.get_nodes()[2], sorted_par_list_aggregated_info)"""
def test_aggregated_par_list_data(self, graph, node_id, sorted_par_list_aggregated_info):
def aux_aggregated_par_list_data(self, graph, node_id, sorted_par_list_aggregated_info):
for indx, element in enumerate(sorted_par_list_aggregated_info):
if indx == 0:
self.assertEqual(graph.get_parents_by_id(node_id), element)
@ -57,15 +58,124 @@ class TestNetworkGraph(unittest.TestCase):
g1.add_edges(self.s1.structure.list_of_edges())
sorted_list_of_par_lists = g1.get_ord_set_of_par_of_all_nodes()
for node, par_list in zip(g1.get_nodes_sorted_by_indx(), sorted_list_of_par_lists):
self.test_aggregated_par_list_data(g1, node, par_list)
self.aux_aggregated_par_list_data(g1, node, par_list)
def test_get_ordered_by_indx_parents_values_for_all_nodes(self):
g1 = ng.NetworkGraph(self.s1.structure)
g1.add_nodes(self.s1.structure.list_of_nodes_labels())
g1.add_edges(self.s1.structure.list_of_edges())
g1.aggregated_info_about_nodes_parents = g1.get_ord_set_of_par_of_all_nodes()
print(g1.get_ordered_by_indx_parents_values_for_all_nodes())
#print(g1.get_ordered_by_indx_parents_values_for_all_nodes())
parents_values_list = g1.get_ordered_by_indx_parents_values_for_all_nodes()
for pv1, aggr in zip(parents_values_list, g1.aggregated_info_about_nodes_parents):
self.assertEqual(pv1, aggr[2])
def test_get_states_number_of_all_nodes_sorted(self):
g1 = ng.NetworkGraph(self.s1.structure)
g1.add_nodes(self.s1.structure.list_of_nodes_labels())
g1.add_edges(self.s1.structure.list_of_edges())
nodes_cardinality_list = g1.get_states_number_of_all_nodes_sorted()
for val, node in zip(nodes_cardinality_list, g1.get_nodes_sorted_by_indx()):
self.assertEqual(val, g1.get_states_number(node))
def test_build_fancy_indexing_structure_no_offset(self):
g1 = ng.NetworkGraph(self.s1.structure)
g1.add_nodes(self.s1.structure.list_of_nodes_labels())
g1.add_edges(self.s1.structure.list_of_edges())
g1.aggregated_info_about_nodes_parents = g1.get_ord_set_of_par_of_all_nodes()
fancy_indx = g1.build_fancy_indexing_structure(0)
for par_indxs, aggr in zip(fancy_indx, g1.aggregated_info_about_nodes_parents):
self.assertEqual(par_indxs, aggr[1])
def test_build_fancy_indexing_structure_offset(self):
pass #TODO il codice di netgraph deve gestire questo caso
def aux_build_time_scalar_indexing_structure_for_a_node(self, graph, node_indx, parents_indxs):
time_scalar_indexing = graph.build_time_scalar_indexing_structure_for_a_node(node_indx, parents_indxs)
self.assertEqual(len(time_scalar_indexing), len(parents_indxs) + 1)
merged_list = parents_indxs[:]
merged_list.insert(0, node_indx)
#print(merged_list)
vals_list = []
for node in merged_list:
vals_list.append(graph.get_states_number_by_indx(node))
t_vec = np.array(vals_list)
t_vec = t_vec.cumprod()
#print(t_vec)
self.assertTrue(np.array_equal(time_scalar_indexing, t_vec))
def aux_build_transition_scalar_indexing_structure_for_a_node(self, graph, node_indx, parents_indxs):
transition_scalar_indexing = graph.build_transition_scalar_indexing_structure_for_a_node(node_indx,
parents_indxs)
print(transition_scalar_indexing)
self.assertEqual(len(transition_scalar_indexing), len(parents_indxs) + 2)
merged_list = parents_indxs[:]
merged_list.insert(0, node_indx)
merged_list.insert(0, node_indx)
vals_list = []
for node in merged_list:
vals_list.append(graph.get_states_number_by_indx(node))
m_vec = np.array([vals_list])
m_vec = m_vec.cumprod()
self.assertTrue(np.array_equal(transition_scalar_indexing, m_vec))
def test_build_transition_scalar_indexing_structure(self):
g1 = ng.NetworkGraph(self.s1.structure)
g1.add_nodes(self.s1.structure.list_of_nodes_labels())
g1.add_edges(self.s1.structure.list_of_edges())
g1.aggregated_info_about_nodes_parents = g1.get_ord_set_of_par_of_all_nodes()
fancy_indx = g1.build_fancy_indexing_structure(0)
print(fancy_indx)
for node_id, p_indxs in zip(g1.graph_struct.list_of_nodes_indexes(), fancy_indx):
self.aux_build_transition_scalar_indexing_structure_for_a_node(g1, node_id, p_indxs)
def test_build_time_scalar_indexing_structure(self):
g1 = ng.NetworkGraph(self.s1.structure)
g1.add_nodes(self.s1.structure.list_of_nodes_labels())
g1.add_edges(self.s1.structure.list_of_edges())
g1.aggregated_info_about_nodes_parents = g1.get_ord_set_of_par_of_all_nodes()
fancy_indx = g1.build_fancy_indexing_structure(0)
#print(fancy_indx)
for node_id, p_indxs in zip(g1.graph_struct.list_of_nodes_indexes(), fancy_indx):
self.aux_build_time_scalar_indexing_structure_for_a_node(g1, node_id, p_indxs)
def test_build_time_columns_filtering_structure(self):
g1 = ng.NetworkGraph(self.s1.structure)
g1.add_nodes(self.s1.structure.list_of_nodes_labels())
g1.add_edges(self.s1.structure.list_of_edges())
g1.aggregated_info_about_nodes_parents = g1.get_ord_set_of_par_of_all_nodes()
g1._fancy_indexing = g1.build_fancy_indexing_structure(0)
g1.build_time_columns_filtering_structure()
print(g1.time_filtering)
t_filter = []
for node_id, p_indxs in zip(g1.get_nodes_sorted_by_indx(), g1._fancy_indexing):
single_filter = []
single_filter.append(g1.get_node_indx(node_id))
single_filter.extend(p_indxs)
t_filter.append(np.array(single_filter))
#print(t_filter)
for a1, a2 in zip(g1.time_filtering, t_filter):
self.assertTrue(np.array_equal(a1, a2))
def test_build_transition_columns_filtering_structure(self):
g1 = ng.NetworkGraph(self.s1.structure)
g1.add_nodes(self.s1.structure.list_of_nodes_labels())
g1.add_edges(self.s1.structure.list_of_edges())
g1.aggregated_info_about_nodes_parents = g1.get_ord_set_of_par_of_all_nodes()
g1._fancy_indexing = g1.build_fancy_indexing_structure(0)
g1.build_transition_columns_filtering_structure()
print(g1.transition_filtering)
m_filter = []
for node_id, p_indxs in zip(g1.get_nodes_sorted_by_indx(), g1._fancy_indexing):
single_filter = []
single_filter.append(g1.get_node_indx(node_id) + g1.graph_struct.total_variables_number)
single_filter.append(g1.get_node_indx(node_id))
single_filter.extend(p_indxs)
m_filter.append(np.array(single_filter))
print(m_filter)
for a1, a2 in zip(g1.transition_filtering, m_filter):
self.assertTrue(np.array_equal(a1, a2))
#TODO mancano i test sulle property e sui getters_vari
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,83 @@
import unittest
import numpy as np
import itertools
import set_of_cims as soci
class TestSetOfCims(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.node_id = 'X'
cls.possible_cardinalities = [2, 3]
#cls.possible_states = [[0,1], [0, 1, 2]]
cls.node_states_number = range(2, 4)
def test_init(self):
# empty parent set
for sn in self.node_states_number:
self.aux_test_init(self.node_id, [], sn)
# one parent
for sn in self.node_states_number:
for p in itertools.product(self.possible_cardinalities, repeat=1):
self.aux_test_init(self.node_id, list(p), sn)
#two parents
for sn in self.node_states_number:
for p in itertools.product(self.possible_cardinalities, repeat=2):
self.aux_test_init(self.node_id, list(p), sn)
def test_indexes_converter(self):
# empty parent set
for sn in self.node_states_number:
self.aux_test_indexes_converter(self.node_id, [], sn)
# one parent
for sn in self.node_states_number:
for p in itertools.product(self.possible_cardinalities, repeat=1):
self.aux_test_init(self.node_id, list(p), sn)
# two parents
for sn in self.node_states_number:
for p in itertools.product(self.possible_cardinalities, repeat=2):
self.aux_test_init(self.node_id, list(p), sn)
def aux_test_indexes_converter(self, node_id, parents_states_number, node_states_number):
sofcims = soci.SetOfCims(node_id, parents_states_number, node_states_number)
if not parents_states_number:
self.assertEqual(sofcims.indexes_converter([]), 0)
else:
parents_possible_values = []
for cardi in parents_states_number:
parents_possible_values.extend(range(0, cardi))
for p in itertools.permutations(parents_possible_values, len(parents_states_number)):
self.assertEqual(sofcims.indexes_converter(list(p)), np.ravel_multi_index(list(p), parents_states_number))
def test_build_cims(self):
state_res_times_list = []
transition_matrices_list = []
so1 = soci.SetOfCims('X',[3], 3)
for i in range(0, 3):
state_res_times = np.random.rand(1, 3)[0]
state_res_times = state_res_times * 1000
state_transition_matrix = np.random.randint(1, 10000, (3, 3))
state_res_times_list.append(state_res_times)
transition_matrices_list.append(state_transition_matrix)
so1.build_cims(state_res_times_list, transition_matrices_list)
self.assertEqual(len(state_res_times_list), so1.get_cims_number())
self.assertIsNone(so1.transition_matrices)
self.assertIsNone(so1.state_residence_times)
def aux_test_init(self, node_id, parents_states_number, node_states_number):
sofcims = soci.SetOfCims(node_id, parents_states_number, node_states_number)
self.assertEqual(sofcims.node_id, node_id)
self.assertTrue(np.array_equal(sofcims.parents_states_number, parents_states_number))
self.assertEqual(sofcims.node_states_number, node_states_number)
self.assertFalse(sofcims.actual_cims)
self.assertEqual(sofcims.state_residence_times.shape[0], np.prod(np.array(parents_states_number)))
self.assertEqual(len(sofcims.state_residence_times[0]),node_states_number)
self.assertEqual(sofcims.transition_matrices.shape[0], np.prod(np.array(parents_states_number)))
self.assertEqual(len(sofcims.transition_matrices[0][0]), node_states_number)
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,24 @@
import unittest
import sets_of_cims_container as scc
import set_of_cims as sc
class TestSetsOfCimsContainer(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.variables = ['X', 'Y', 'Z']
cls.states_per_node = [3, 3, 3]
cls.parents_states_list = [[], [3], [3, 3]]
def test_init(self):
c1 = scc.SetsOfCimsContainer(self.variables, self.states_per_node, self.parents_states_list)
self.assertEqual(len(c1.sets_of_cims), len(self.variables))
for set_of_cims in c1.sets_of_cims:
self.assertIsInstance(set_of_cims, sc.SetOfCims)
if __name__ == '__main__':
unittest.main()

@ -1,3 +1,5 @@
import sys
sys.path.append("/Users/Zalum/Desktop/Tesi/CTBN_Project/main_package/classes/")
import unittest
import pandas as pd
import structure as st