From e3511dc05aec70c20b2d54464342e1cc8480c6d4 Mon Sep 17 00:00:00 2001 From: philpMartin Date: Wed, 22 Jul 2020 23:06:16 +0200 Subject: [PATCH] Add tests to all CIM related classes --- .../classes/conditional_intensity_matrix.py | 33 +++-- main_package/classes/network_graph.py | 39 +++--- main_package/classes/parameters_estimator.py | 6 +- main_package/classes/set_of_cims.py | 37 +---- ...ated_cims.py => sets_of_cims_container.py} | 14 +- main_package/tests/test_cim.py | 41 ++++++ main_package/tests/test_json_importer.py | 2 + main_package/tests/test_networkgraph.py | 130 ++++++++++++++++-- main_package/tests/test_setofcims.py | 83 +++++++++++ .../tests/test_sets_of_cims_container.py | 24 ++++ main_package/tests/test_structure.py | 2 + 11 files changed, 321 insertions(+), 90 deletions(-) rename main_package/classes/{amalgamated_cims.py => sets_of_cims_container.py} (80%) create mode 100644 main_package/tests/test_cim.py create mode 100644 main_package/tests/test_setofcims.py create mode 100644 main_package/tests/test_sets_of_cims_container.py diff --git a/main_package/classes/conditional_intensity_matrix.py b/main_package/classes/conditional_intensity_matrix.py index d33a39a..035c48b 100644 --- a/main_package/classes/conditional_intensity_matrix.py +++ b/main_package/classes/conditional_intensity_matrix.py @@ -3,26 +3,29 @@ import numpy as np class ConditionalIntensityMatrix: - def __init__(self, dimension, state_residence_times, state_transition_matrix): - self.state_residence_times = state_residence_times - self.state_transition_matrix = state_transition_matrix + def __init__(self, state_residence_times, state_transition_matrix): + self._state_residence_times = state_residence_times + self._state_transition_matrix = state_transition_matrix #self.cim = np.zeros(shape=(dimension, dimension), dtype=float) - self.cim = self.state_transition_matrix.astype(np.float) + self._cim = self.state_transition_matrix.astype(np.float) - def update_state_transition_count(self, element_indx): - #print(element_indx) - #self.state_transition_matrix[element_indx[0]][element_indx[1]] += 1 - self.state_transition_matrix[element_indx] += 1 + def compute_cim_coefficients(self): + np.fill_diagonal(self._cim, self._cim.diagonal() * -1) + #print(self._cim) + self._cim = ((self._cim.T + 1) / (self._state_residence_times + 1)).T + #np.fill_diagonal(self.state_transition_matrix, 0) - def update_state_residence_time_for_state(self, state, time): - #print("Time updating In state", state, time) - self.state_residence_times[state] += time + @property + def state_residence_times(self): + return self._state_residence_times + @property + def state_transition_matrix(self): + return self._state_transition_matrix - def compute_cim_coefficients(self): - np.fill_diagonal(self.cim, self.cim.diagonal() * -1) - self.cim = ((self.cim.T + 1) / (self.state_residence_times + 1)).T - #np.fill_diagonal(self.state_transition_matrix, 0) + @property + def cim(self): + return self._cim def __repr__(self): return 'CIM:\n' + str(self.cim) diff --git a/main_package/classes/network_graph.py b/main_package/classes/network_graph.py index 07cb80b..ae4a04d 100644 --- a/main_package/classes/network_graph.py +++ b/main_package/classes/network_graph.py @@ -115,13 +115,14 @@ class NetworkGraph(): return fancy_indx - def build_time_scalar_indexing_structure_for_a_node(self, node_id, parents_id): - #print(node_id) - #print("Parents_id", parents_id) - T_vector = np.array([self.graph_struct.variables_frame.iloc[node_id, 1].astype(np.int)]) + def build_time_scalar_indexing_structure_for_a_node(self, node_indx, parents_indxs): + #print(node_indx) + #print("Parents_id", parents_indxs) + #T_vector = np.array([self.graph_struct.variables_frame.iloc[node_id, 1].astype(np.int)]) + T_vector = np.array([self.get_states_number_by_indx(node_indx)]) #print(T_vector) #print("Here ", self.graph_struct.variables_frame.iloc[parents_id[0], 1]) - T_vector = np.append(T_vector, [self.graph_struct.get_states_number_by_indx(x) for x in parents_id]) + T_vector = np.append(T_vector, [self.graph_struct.get_states_number_by_indx(x) for x in parents_indxs]) #print(T_vector) T_vector = T_vector.cumprod().astype(np.int) return T_vector @@ -130,32 +131,34 @@ class NetworkGraph(): def build_time_scalar_indexing_structure(self): parents_indexes_list = self._fancy_indexing for node_indx, p_indxs in zip(self.graph_struct.list_of_nodes_indexes(), parents_indexes_list): - if p_indxs.size == 0: - self._time_scalar_indexing_structure.append(np.array([self.get_states_number_by_indx(node_indx)], - dtype=np.int)) - else: + #if not p_indxs: + #self._time_scalar_indexing_structure.append(np.array([self.get_states_number_by_indx(node_indx)], + #dtype=np.int)) + #else: self._time_scalar_indexing_structure.append( self.build_time_scalar_indexing_structure_for_a_node(node_indx, p_indxs)) - def build_transition_scalar_indexing_structure_for_a_node(self, node_id, parents_id): - M_vector = np.array([self.graph_struct.variables_frame.iloc[node_id, 1], - self.graph_struct.variables_frame.iloc[node_id, 1].astype(np.int)]) - M_vector = np.append(M_vector, [self.graph_struct.get_states_number_by_indx(x) for x in parents_id]) + def build_transition_scalar_indexing_structure_for_a_node(self, node_indx, parents_indxs): + #M_vector = np.array([self.graph_struct.variables_frame.iloc[node_id, 1], + #self.graph_struct.variables_frame.iloc[node_id, 1].astype(np.int)]) + M_vector = np.array([self.get_states_number_by_indx(node_indx), + self.get_states_number_by_indx(node_indx)]) + M_vector = np.append(M_vector, [self.graph_struct.get_states_number_by_indx(x) for x in parents_indxs]) M_vector = M_vector.cumprod().astype(np.int) return M_vector def build_transition_scalar_indexing_structure(self): parents_indexes_list = self._fancy_indexing - for node_indx, p_indxs in enumerate(parents_indexes_list): + for node_indx, p_indxs in zip(self.graph_struct.list_of_nodes_indexes(), parents_indexes_list): self._transition_scalar_indexing_structure.append( self.build_transition_scalar_indexing_structure_for_a_node(node_indx, p_indxs)) def build_time_columns_filtering_structure(self): parents_indexes_list = self._fancy_indexing - for node_indx, p_indxs in enumerate(parents_indexes_list): - if p_indxs.size == 0: - self._time_filtering.append(np.append(p_indxs, np.array([node_indx], dtype=np.int))) - else: + for node_indx, p_indxs in zip(self.graph_struct.list_of_nodes_indexes(), parents_indexes_list): + #if p_indxs.size == 0: + #self._time_filtering.append(np.append(p_indxs, np.array([node_indx], dtype=np.int))) + #else: self._time_filtering.append(np.append(np.array([node_indx], dtype=np.int), p_indxs)) def build_transition_columns_filtering_structure(self): diff --git a/main_package/classes/parameters_estimator.py b/main_package/classes/parameters_estimator.py index 700df7b..0162c82 100644 --- a/main_package/classes/parameters_estimator.py +++ b/main_package/classes/parameters_estimator.py @@ -6,7 +6,7 @@ import numba as nb import numpy as np import network_graph as ng import sample_path as sp -import amalgamated_cims as acims +import sets_of_cims_container as acims class ParametersEstimator: @@ -14,10 +14,10 @@ class ParametersEstimator: def __init__(self, sample_path, net_graph): self.sample_path = sample_path self.net_graph = net_graph - self.amalgamated_cims_struct = None + self.sets_of_cims_struct = None def init_amalgamated_cims_struct(self): - self.amalgamated_cims_struct = acims.AmalgamatedCims(self.net_graph.get_states_number_of_all_nodes_sorted(), + self.sets_of_cims_struct = acims.SetsOfCimsContainer(self.net_graph.get_states_number_of_all_nodes_sorted(), self.net_graph.get_nodes(), self.net_graph.get_ordered_by_indx_parents_values_for_all_nodes()) diff --git a/main_package/classes/set_of_cims.py b/main_package/classes/set_of_cims.py index f361eed..cc75015 100644 --- a/main_package/classes/set_of_cims.py +++ b/main_package/classes/set_of_cims.py @@ -1,5 +1,4 @@ import numpy as np -from numba import njit, int32 import conditional_intensity_matrix as cim @@ -23,48 +22,15 @@ class SetOfCims: self.build_actual_cims_structure() def build_actual_cims_structure(self): - #cims_number = 1 - #for state_number in self.parents_states_number: - #cims_number = cims_number * state_number if not self.parents_states_number: - #self.actual_cims = np.empty(1, dtype=cim.ConditionalIntensityMatrix) - #self.actual_cims[0] = cim.ConditionalIntensityMatrix(self.node_states_number) self.state_residence_times = np.zeros((1, self.node_states_number), dtype=np.float) self.transition_matrices = np.zeros((1,self.node_states_number, self.node_states_number), dtype=np.int) else: - #self.actual_cims = np.empty(self.parents_states_number, dtype=cim.ConditionalIntensityMatrix) - #self.build_actual_cims(self.actual_cims) - #for indx, matrix in enumerate(self.actual_cims): - #self.actual_cims[indx] = cim.ConditionalIntensityMatrix(self.node_states_number) self.state_residence_times = \ np.zeros((np.prod(self.parents_states_number), self.node_states_number), dtype=np.float) self.transition_matrices = np.zeros([np.prod(self.parents_states_number), self.node_states_number, self.node_states_number], dtype=np.int) - def update_state_transition(self, indexes, element_indx_tuple): - #matrix_indx = self.indexes_converter(indexes) - #print(indexes) - if not indexes: - self.actual_cims[0].update_state_transition_count(element_indx_tuple) - else: - self.actual_cims[indexes].update_state_transition_count(element_indx_tuple) - - def update_state_residence_time(self, which_matrix, which_element, time): - #matrix_indx = self.indexes_converter(which_matrix) - - if not which_matrix: - self.actual_cims[0].update_state_residence_time_for_state(which_element, time) - else: - #print(type(which_matrix)) - #print(self.actual_cims[(2,2)]) - self.actual_cims[which_matrix].update_state_residence_time_for_state(which_element, time) - - def build_actual_cims(self, cim_structure): - for indx in range(len(cim_structure)): - if cim_structure[indx] is None: - cim_structure[indx] = cim.ConditionalIntensityMatrix(self.node_states_number) - else: - self.build_actual_cims(cim_structure[indx]) def get_cims_number(self): return len(self.actual_cims) @@ -82,8 +48,7 @@ class SetOfCims: def build_cims(self, state_res_times, transition_matrices): for state_res_time_vector, transition_matrix in zip(state_res_times, transition_matrices): #print(state_res_time_vector, transition_matrix) - cim_to_add = cim.ConditionalIntensityMatrix(self.node_states_number, - state_res_time_vector, transition_matrix) + cim_to_add = cim.ConditionalIntensityMatrix(state_res_time_vector, transition_matrix) cim_to_add.compute_cim_coefficients() #print(cim_to_add) self.actual_cims.append(cim_to_add) diff --git a/main_package/classes/amalgamated_cims.py b/main_package/classes/sets_of_cims_container.py similarity index 80% rename from main_package/classes/amalgamated_cims.py rename to main_package/classes/sets_of_cims_container.py index dfd154a..a8520a7 100644 --- a/main_package/classes/amalgamated_cims.py +++ b/main_package/classes/sets_of_cims_container.py @@ -1,32 +1,29 @@ import set_of_cims as socim -import numpy as np -class AmalgamatedCims: + +class SetsOfCimsContainer: """ - Aggrega un insieme di oggetti SetOfCims indicizzandoli a partire dal node_id della variabile: - {X:SetofCimsX, Y:SetOfCimsY.......} + Aggrega un insieme di oggetti SetOfCims """ # list_of_vars_orders contiene tutte le liste con i parent ordinati secondo il valore indx - def __init__(self, states_number_per_node, list_of_keys, list_of_parents_states_number): + def __init__(self, list_of_keys, states_number_per_node, list_of_parents_states_number): self.sets_of_cims = [] self.init_cims_structure(list_of_keys, states_number_per_node, list_of_parents_states_number) #self.states_per_variable = states_number def init_cims_structure(self, keys, states_number_per_node, list_of_parents_states_number): - #print(keys) - #print(list_of_parents_states_number) for indx, key in enumerate(keys): self.sets_of_cims.append( socim.SetOfCims(key, list_of_parents_states_number[indx], states_number_per_node[indx])) - def get_set_of_cims(self, node_indx): return self.sets_of_cims[node_indx] def get_cims_of_node(self, node_indx, cim_indx): return self.sets_of_cims[node_indx].get_cim(cim_indx) +""" def get_vars_order(self, node): return self.actual_cims[node][1] @@ -35,3 +32,4 @@ class AmalgamatedCims: def update_state_residence_time_for_matrix(self, which_node, which_matrix, which_element, time): self.sets_of_cims[which_node].update_state_residence_time(which_matrix, which_element, time) +""" diff --git a/main_package/tests/test_cim.py b/main_package/tests/test_cim.py new file mode 100644 index 0000000..e23d7c2 --- /dev/null +++ b/main_package/tests/test_cim.py @@ -0,0 +1,41 @@ +import unittest +import numpy as np + +import conditional_intensity_matrix as cim + + +class TestConditionalIntensityMatrix(unittest.TestCase): + + @classmethod + def setUpClass(cls) -> None: + cls.state_res_times = np.random.rand(1, 3)[0] + cls.state_res_times = cls.state_res_times * 1000 + cls.state_transition_matrix = np.random.randint(1, 10000, (3, 3)) + for i in range(0, len(cls.state_res_times)): + cls.state_transition_matrix[i, i] = 0 + cls.state_transition_matrix[i, i] = np.sum(cls.state_transition_matrix[i]) + + def test_init(self): + c1 = cim.ConditionalIntensityMatrix(self.state_res_times, self.state_transition_matrix) + self.assertTrue(np.array_equal(self.state_res_times, c1.state_residence_times)) + self.assertTrue(np.array_equal(self.state_transition_matrix, c1.state_transition_matrix)) + self.assertEqual(c1.cim.dtype, np.float) + self.assertEqual(self.state_transition_matrix.shape, c1.cim.shape) + + def test_compute_cim_coefficients(self): + c1 = cim.ConditionalIntensityMatrix(self.state_res_times, self.state_transition_matrix) + c2 = self.state_transition_matrix.astype(np.float) + np.fill_diagonal(c2, c2.diagonal() * -1) + for i in range(0, len(self.state_res_times)): + for j in range(0, len(self.state_res_times)): + c2[i, j] = (c2[i, j] + 1) / (self.state_res_times[i] + 1) + c1.compute_cim_coefficients() + for i in range(0, len(c1.state_residence_times)): + self.assertTrue(np.isclose(np.sum(c1.cim[i]), 0.0, 1e-02, 1e-01)) + for i in range(0, len(self.state_res_times)): + for j in range(0, len(self.state_res_times)): + self.assertTrue(np.isclose(c1.cim[i, j], c2[i, j], 1e-02, 1e-01)) + + +if __name__ == '__main__': + unittest.main() diff --git a/main_package/tests/test_json_importer.py b/main_package/tests/test_json_importer.py index 6226970..d48420e 100644 --- a/main_package/tests/test_json_importer.py +++ b/main_package/tests/test_json_importer.py @@ -1,3 +1,5 @@ +import sys +sys.path.append("/Users/Zalum/Desktop/Tesi/CTBN_Project/main_package/classes/") import unittest import numpy as np import pandas as pd diff --git a/main_package/tests/test_networkgraph.py b/main_package/tests/test_networkgraph.py index effcd31..e76c16e 100644 --- a/main_package/tests/test_networkgraph.py +++ b/main_package/tests/test_networkgraph.py @@ -1,16 +1,17 @@ import unittest import networkx as nx +import numpy as np import sample_path as sp import network_graph as ng class TestNetworkGraph(unittest.TestCase): - - def setUp(self): - self.s1 = sp.SamplePath('../data', 'samples', 'dyn.str', 'variables', 'Time', 'Name') - self.s1.build_trajectories() - self.s1.build_structure() + @classmethod + def setUpClass(cls): + cls.s1 = sp.SamplePath('../data', 'samples', 'dyn.str', 'variables', 'Time', 'Name') + cls.s1.build_trajectories() + cls.s1.build_structure() def test_init(self): g1 = ng.NetworkGraph(self.s1.structure) @@ -30,14 +31,14 @@ class TestNetworkGraph(unittest.TestCase): for e in self.s1.structure.list_of_edges(): self.assertIn(tuple(e), g1.get_edges()) - def test_get_ordered_by_indx_set_of_parents(self): + """def test_get_ordered_by_indx_set_of_parents(self): g1 = ng.NetworkGraph(self.s1.structure) g1.add_nodes(self.s1.structure.list_of_nodes_labels()) g1.add_edges(self.s1.structure.list_of_edges()) sorted_par_list_aggregated_info = g1.get_ordered_by_indx_set_of_parents(g1.get_nodes()[2]) - self.test_aggregated_par_list_data(g1,g1.get_nodes()[2], sorted_par_list_aggregated_info) + self.test_aggregated_par_list_data(g1, g1.get_nodes()[2], sorted_par_list_aggregated_info)""" - def test_aggregated_par_list_data(self, graph, node_id, sorted_par_list_aggregated_info): + def aux_aggregated_par_list_data(self, graph, node_id, sorted_par_list_aggregated_info): for indx, element in enumerate(sorted_par_list_aggregated_info): if indx == 0: self.assertEqual(graph.get_parents_by_id(node_id), element) @@ -57,15 +58,124 @@ class TestNetworkGraph(unittest.TestCase): g1.add_edges(self.s1.structure.list_of_edges()) sorted_list_of_par_lists = g1.get_ord_set_of_par_of_all_nodes() for node, par_list in zip(g1.get_nodes_sorted_by_indx(), sorted_list_of_par_lists): - self.test_aggregated_par_list_data(g1, node, par_list) + self.aux_aggregated_par_list_data(g1, node, par_list) def test_get_ordered_by_indx_parents_values_for_all_nodes(self): g1 = ng.NetworkGraph(self.s1.structure) g1.add_nodes(self.s1.structure.list_of_nodes_labels()) g1.add_edges(self.s1.structure.list_of_edges()) g1.aggregated_info_about_nodes_parents = g1.get_ord_set_of_par_of_all_nodes() - print(g1.get_ordered_by_indx_parents_values_for_all_nodes()) + #print(g1.get_ordered_by_indx_parents_values_for_all_nodes()) + parents_values_list = g1.get_ordered_by_indx_parents_values_for_all_nodes() + for pv1, aggr in zip(parents_values_list, g1.aggregated_info_about_nodes_parents): + self.assertEqual(pv1, aggr[2]) + + def test_get_states_number_of_all_nodes_sorted(self): + g1 = ng.NetworkGraph(self.s1.structure) + g1.add_nodes(self.s1.structure.list_of_nodes_labels()) + g1.add_edges(self.s1.structure.list_of_edges()) + nodes_cardinality_list = g1.get_states_number_of_all_nodes_sorted() + for val, node in zip(nodes_cardinality_list, g1.get_nodes_sorted_by_indx()): + self.assertEqual(val, g1.get_states_number(node)) + + def test_build_fancy_indexing_structure_no_offset(self): + g1 = ng.NetworkGraph(self.s1.structure) + g1.add_nodes(self.s1.structure.list_of_nodes_labels()) + g1.add_edges(self.s1.structure.list_of_edges()) + g1.aggregated_info_about_nodes_parents = g1.get_ord_set_of_par_of_all_nodes() + fancy_indx = g1.build_fancy_indexing_structure(0) + for par_indxs, aggr in zip(fancy_indx, g1.aggregated_info_about_nodes_parents): + self.assertEqual(par_indxs, aggr[1]) + + def test_build_fancy_indexing_structure_offset(self): + pass #TODO il codice di netgraph deve gestire questo caso + + def aux_build_time_scalar_indexing_structure_for_a_node(self, graph, node_indx, parents_indxs): + time_scalar_indexing = graph.build_time_scalar_indexing_structure_for_a_node(node_indx, parents_indxs) + self.assertEqual(len(time_scalar_indexing), len(parents_indxs) + 1) + merged_list = parents_indxs[:] + merged_list.insert(0, node_indx) + #print(merged_list) + vals_list = [] + for node in merged_list: + vals_list.append(graph.get_states_number_by_indx(node)) + t_vec = np.array(vals_list) + t_vec = t_vec.cumprod() + #print(t_vec) + self.assertTrue(np.array_equal(time_scalar_indexing, t_vec)) + + def aux_build_transition_scalar_indexing_structure_for_a_node(self, graph, node_indx, parents_indxs): + transition_scalar_indexing = graph.build_transition_scalar_indexing_structure_for_a_node(node_indx, + parents_indxs) + print(transition_scalar_indexing) + self.assertEqual(len(transition_scalar_indexing), len(parents_indxs) + 2) + merged_list = parents_indxs[:] + merged_list.insert(0, node_indx) + merged_list.insert(0, node_indx) + vals_list = [] + for node in merged_list: + vals_list.append(graph.get_states_number_by_indx(node)) + m_vec = np.array([vals_list]) + m_vec = m_vec.cumprod() + self.assertTrue(np.array_equal(transition_scalar_indexing, m_vec)) + + def test_build_transition_scalar_indexing_structure(self): + g1 = ng.NetworkGraph(self.s1.structure) + g1.add_nodes(self.s1.structure.list_of_nodes_labels()) + g1.add_edges(self.s1.structure.list_of_edges()) + g1.aggregated_info_about_nodes_parents = g1.get_ord_set_of_par_of_all_nodes() + fancy_indx = g1.build_fancy_indexing_structure(0) + print(fancy_indx) + for node_id, p_indxs in zip(g1.graph_struct.list_of_nodes_indexes(), fancy_indx): + self.aux_build_transition_scalar_indexing_structure_for_a_node(g1, node_id, p_indxs) + + def test_build_time_scalar_indexing_structure(self): + g1 = ng.NetworkGraph(self.s1.structure) + g1.add_nodes(self.s1.structure.list_of_nodes_labels()) + g1.add_edges(self.s1.structure.list_of_edges()) + g1.aggregated_info_about_nodes_parents = g1.get_ord_set_of_par_of_all_nodes() + fancy_indx = g1.build_fancy_indexing_structure(0) + #print(fancy_indx) + for node_id, p_indxs in zip(g1.graph_struct.list_of_nodes_indexes(), fancy_indx): + self.aux_build_time_scalar_indexing_structure_for_a_node(g1, node_id, p_indxs) + + def test_build_time_columns_filtering_structure(self): + g1 = ng.NetworkGraph(self.s1.structure) + g1.add_nodes(self.s1.structure.list_of_nodes_labels()) + g1.add_edges(self.s1.structure.list_of_edges()) + g1.aggregated_info_about_nodes_parents = g1.get_ord_set_of_par_of_all_nodes() + g1._fancy_indexing = g1.build_fancy_indexing_structure(0) + g1.build_time_columns_filtering_structure() + print(g1.time_filtering) + t_filter = [] + for node_id, p_indxs in zip(g1.get_nodes_sorted_by_indx(), g1._fancy_indexing): + single_filter = [] + single_filter.append(g1.get_node_indx(node_id)) + single_filter.extend(p_indxs) + t_filter.append(np.array(single_filter)) + #print(t_filter) + for a1, a2 in zip(g1.time_filtering, t_filter): + self.assertTrue(np.array_equal(a1, a2)) + def test_build_transition_columns_filtering_structure(self): + g1 = ng.NetworkGraph(self.s1.structure) + g1.add_nodes(self.s1.structure.list_of_nodes_labels()) + g1.add_edges(self.s1.structure.list_of_edges()) + g1.aggregated_info_about_nodes_parents = g1.get_ord_set_of_par_of_all_nodes() + g1._fancy_indexing = g1.build_fancy_indexing_structure(0) + g1.build_transition_columns_filtering_structure() + print(g1.transition_filtering) + m_filter = [] + for node_id, p_indxs in zip(g1.get_nodes_sorted_by_indx(), g1._fancy_indexing): + single_filter = [] + single_filter.append(g1.get_node_indx(node_id) + g1.graph_struct.total_variables_number) + single_filter.append(g1.get_node_indx(node_id)) + single_filter.extend(p_indxs) + m_filter.append(np.array(single_filter)) + print(m_filter) + for a1, a2 in zip(g1.transition_filtering, m_filter): + self.assertTrue(np.array_equal(a1, a2)) +#TODO mancano i test sulle property e sui getters_vari if __name__ == '__main__': unittest.main() diff --git a/main_package/tests/test_setofcims.py b/main_package/tests/test_setofcims.py new file mode 100644 index 0000000..c685b15 --- /dev/null +++ b/main_package/tests/test_setofcims.py @@ -0,0 +1,83 @@ +import unittest +import numpy as np +import itertools + +import set_of_cims as soci + + +class TestSetOfCims(unittest.TestCase): + + @classmethod + def setUpClass(cls) -> None: + cls.node_id = 'X' + cls.possible_cardinalities = [2, 3] + #cls.possible_states = [[0,1], [0, 1, 2]] + cls.node_states_number = range(2, 4) + + def test_init(self): + # empty parent set + for sn in self.node_states_number: + self.aux_test_init(self.node_id, [], sn) + # one parent + for sn in self.node_states_number: + for p in itertools.product(self.possible_cardinalities, repeat=1): + self.aux_test_init(self.node_id, list(p), sn) + #two parents + for sn in self.node_states_number: + for p in itertools.product(self.possible_cardinalities, repeat=2): + self.aux_test_init(self.node_id, list(p), sn) + + def test_indexes_converter(self): + # empty parent set + for sn in self.node_states_number: + self.aux_test_indexes_converter(self.node_id, [], sn) + # one parent + for sn in self.node_states_number: + for p in itertools.product(self.possible_cardinalities, repeat=1): + self.aux_test_init(self.node_id, list(p), sn) + # two parents + for sn in self.node_states_number: + for p in itertools.product(self.possible_cardinalities, repeat=2): + self.aux_test_init(self.node_id, list(p), sn) + + def aux_test_indexes_converter(self, node_id, parents_states_number, node_states_number): + sofcims = soci.SetOfCims(node_id, parents_states_number, node_states_number) + if not parents_states_number: + self.assertEqual(sofcims.indexes_converter([]), 0) + else: + parents_possible_values = [] + for cardi in parents_states_number: + parents_possible_values.extend(range(0, cardi)) + for p in itertools.permutations(parents_possible_values, len(parents_states_number)): + self.assertEqual(sofcims.indexes_converter(list(p)), np.ravel_multi_index(list(p), parents_states_number)) + + def test_build_cims(self): + state_res_times_list = [] + transition_matrices_list = [] + so1 = soci.SetOfCims('X',[3], 3) + for i in range(0, 3): + state_res_times = np.random.rand(1, 3)[0] + state_res_times = state_res_times * 1000 + state_transition_matrix = np.random.randint(1, 10000, (3, 3)) + state_res_times_list.append(state_res_times) + transition_matrices_list.append(state_transition_matrix) + so1.build_cims(state_res_times_list, transition_matrices_list) + self.assertEqual(len(state_res_times_list), so1.get_cims_number()) + self.assertIsNone(so1.transition_matrices) + self.assertIsNone(so1.state_residence_times) + + def aux_test_init(self, node_id, parents_states_number, node_states_number): + sofcims = soci.SetOfCims(node_id, parents_states_number, node_states_number) + self.assertEqual(sofcims.node_id, node_id) + self.assertTrue(np.array_equal(sofcims.parents_states_number, parents_states_number)) + self.assertEqual(sofcims.node_states_number, node_states_number) + self.assertFalse(sofcims.actual_cims) + self.assertEqual(sofcims.state_residence_times.shape[0], np.prod(np.array(parents_states_number))) + self.assertEqual(len(sofcims.state_residence_times[0]),node_states_number) + self.assertEqual(sofcims.transition_matrices.shape[0], np.prod(np.array(parents_states_number))) + self.assertEqual(len(sofcims.transition_matrices[0][0]), node_states_number) + + + +if __name__ == '__main__': + unittest.main() diff --git a/main_package/tests/test_sets_of_cims_container.py b/main_package/tests/test_sets_of_cims_container.py new file mode 100644 index 0000000..ad38d4f --- /dev/null +++ b/main_package/tests/test_sets_of_cims_container.py @@ -0,0 +1,24 @@ +import unittest + +import sets_of_cims_container as scc +import set_of_cims as sc + + +class TestSetsOfCimsContainer(unittest.TestCase): + + @classmethod + def setUpClass(cls) -> None: + cls.variables = ['X', 'Y', 'Z'] + cls.states_per_node = [3, 3, 3] + cls.parents_states_list = [[], [3], [3, 3]] + + def test_init(self): + c1 = scc.SetsOfCimsContainer(self.variables, self.states_per_node, self.parents_states_list) + self.assertEqual(len(c1.sets_of_cims), len(self.variables)) + for set_of_cims in c1.sets_of_cims: + self.assertIsInstance(set_of_cims, sc.SetOfCims) + + + +if __name__ == '__main__': + unittest.main() diff --git a/main_package/tests/test_structure.py b/main_package/tests/test_structure.py index 41da770..1596732 100644 --- a/main_package/tests/test_structure.py +++ b/main_package/tests/test_structure.py @@ -1,3 +1,5 @@ +import sys +sys.path.append("/Users/Zalum/Desktop/Tesi/CTBN_Project/main_package/classes/") import unittest import pandas as pd import structure as st