diff --git a/PyCTBN/PyCTBN/structure_estimator.py b/PyCTBN/PyCTBN/structure_estimator.py index 8c51d52..26e2fec 100644 --- a/PyCTBN/PyCTBN/structure_estimator.py +++ b/PyCTBN/PyCTBN/structure_estimator.py @@ -42,16 +42,19 @@ class StructureEstimator: #self._sample_path = sample_path self._nodes = np.array(sample_path.structure.nodes_labels) self._tot_vars_number = sample_path.total_variables_count - self._shm_times = multiprocessing.shared_memory.SharedMemory(create=True, + self._shm_times = multiprocessing.shared_memory.SharedMemory(name='sh_times', create=True, size=sample_path.trajectories.times.nbytes) - self._shm_trajectories = multiprocessing.shared_memory.SharedMemory(create=True, - size=sample_path.trajectories.complete_trajectory.nbytes) - self._times = np.ndarray(sample_path.trajectories.times.shape, sample_path.trajectories.times.dtype, self._shm_times.buf) + self._shm_trajectories = multiprocessing.shared_memory.SharedMemory(name='sh_traj', create=True, + size=sample_path.trajectories.complete_trajectory.nbytes) + self._times = np.ndarray(sample_path.trajectories.times.shape, sample_path.trajectories.times.dtype, + self._shm_times.buf) self._times[:] = sample_path.trajectories.times[:] self._trajectories = np.ndarray(sample_path.trajectories.complete_trajectory.shape, sample_path.trajectories.complete_trajectory.dtype, self._shm_trajectories.buf) self._trajectories[:] = sample_path.trajectories.complete_trajectory[:] + + self._nodes_vals = sample_path.structure.nodes_values self._nodes_indxs = sample_path.structure.nodes_indexes self._complete_graph = self.build_complete_graph(sample_path.structure.nodes_labels) @@ -86,8 +89,11 @@ class StructureEstimator: result_graph.add_edges_from(edges) return result_graph - def complete_test(self, test_parent: str, test_child: str, parent_set: typing.List, child_states_numb: int, - tot_vars_count: int, cache: Cache) -> bool: + @staticmethod + def complete_test(test_parent: str, test_child: str, parent_set: typing.List, child_states_numb: int, + tot_vars_count: int, cache: Cache, nodes: np.ndarray, + nodes_indxs: np.ndarray, nodes_vals: np.ndarray, times :np.ndarray, trajectories: np.ndarray, + exp_alfa: float, chi_alfa: float) -> bool: """Performs a complete independence test on the directed graphs G1 = {test_child U parent_set} G2 = {G1 U test_parent} (added as an additional parent of the test_child). Generates all the necessary structures and datas to perform the tests. @@ -111,43 +117,26 @@ class StructureEstimator: parents = np.array(parent_set) parents = np.append(parents, test_parent) - sorted_parents = self._nodes[np.isin(self._nodes, parents)] + sorted_parents = nodes[np.isin(nodes, parents)] cims_filter = sorted_parents != test_parent - """ - sofc1 = cache.find(set(p_set)) - - if not sofc1: - bool_mask1 = np.isin(self._nodes, complete_info) - l1 = list(self._nodes[bool_mask1]) - indxs1 = self._nodes_indxs[bool_mask1] - vals1 = self._nodes_vals[bool_mask1] - eds1 = list(itertools.product(parent_set,test_child)) - s1 = Structure(l1, indxs1, vals1, eds1, tot_vars_count) - g1 = NetworkGraph(s1) - g1.fast_init(test_child) - p1 = ParametersEstimator(g1) - p1.fast_init(test_child) - sofc1 = p1.compute_parameters_for_node(test_child, self._times, self._trajectories) - cache.put(set(p_set), sofc1) - """ #sofc2 = None p_set.insert(0, test_parent) sofc2 = cache.find(set(p_set)) if not sofc2: complete_info.append(test_parent) - bool_mask2 = np.isin(self._nodes, complete_info) - l2 = list(self._nodes[bool_mask2]) - indxs2 = self._nodes_indxs[bool_mask2] - vals2 = self._nodes_vals[bool_mask2] + bool_mask2 = np.isin(nodes, complete_info) + l2 = list(nodes[bool_mask2]) + indxs2 = nodes_indxs[bool_mask2] + vals2 = nodes_vals[bool_mask2] eds2 = list(itertools.product(p_set, test_child)) s2 = Structure(l2, indxs2, vals2, eds2, tot_vars_count) g2 = NetworkGraph(s2) g2.fast_init(test_child) p2 = ParametersEstimator(g2) p2.fast_init(test_child) - sofc2 = p2.compute_parameters_for_node(test_child, self._times, self._trajectories) + sofc2 = p2.compute_parameters_for_node(test_child, times, trajectories) cache.put(set(p_set), sofc2) del p_set[0] @@ -157,18 +146,19 @@ class StructureEstimator: g2.fast_init(test_child) p2 = ParametersEstimator(g2) p2.fast_init(test_child) - sofc1 = p2.compute_parameters_for_node(test_child, self._times, self._trajectories) + sofc1 = p2.compute_parameters_for_node(test_child, times, trajectories) cache.put(set(p_set), sofc1) for cim1, p_comb in zip(sofc1.actual_cims, sofc1.p_combs): cond_cims = sofc2.filter_cims_with_mask(cims_filter, p_comb) for cim2 in cond_cims: - if not self.independence_test(child_states_numb, cim1, cim2): + if not StructureEstimator.independence_test(child_states_numb, cim1, cim2, exp_alfa, chi_alfa): return False return True - def independence_test(self, child_states_numb: int, cim1: ConditionalIntensityMatrix, - cim2: ConditionalIntensityMatrix) -> bool: + @staticmethod + def independence_test(child_states_numb: int, cim1: ConditionalIntensityMatrix, + cim2: ConditionalIntensityMatrix, exp_alfa, chi_test_alfa) -> bool: """Compute the actual independence test using two cims. It is performed first the exponential test and if the null hypothesis is not rejected, it is performed also the chi_test. @@ -189,7 +179,6 @@ class StructureEstimator: C1 = cim1.cim C2 = cim2.cim F_stats = C2.diagonal() / C1.diagonal() - exp_alfa = self._exp_test_sign for val in range(0, child_states_numb): if F_stats[val] < f_dist.ppf(exp_alfa / 2, r1s[val], r2s[val]) or \ F_stats[val] > f_dist.ppf(1 - exp_alfa / 2, r1s[val], r2s[val]): @@ -197,7 +186,7 @@ class StructureEstimator: M1_no_diag = M1[~np.eye(M1.shape[0], dtype=bool)].reshape(M1.shape[0], -1) M2_no_diag = M2[~np.eye(M2.shape[0], dtype=bool)].reshape( M2.shape[0], -1) - chi_2_quantile = chi2_dist.ppf(1 - self._chi_test_alfa, child_states_numb - 1) + chi_2_quantile = chi2_dist.ppf(1 - chi_test_alfa, child_states_numb - 1) Ks = np.sqrt(r1s / r2s) Ls = np.sqrt(r2s / r1s) for val in range(0, child_states_numb): @@ -207,7 +196,11 @@ class StructureEstimator: return False return True - def one_iteration_of_CTPC_algorithm(self, var_id: str, cache: Cache, tot_vars_count: int) -> typing.List: + @staticmethod + def one_iteration_of_CTPC_algorithm(var_id: str, child_states_numb: int, + u: typing.List, cache: Cache, + tot_vars_count: int, nodes, nodes_indxs, + nodes_vals, tests_alfas: typing.Tuple, shm_dims: typing.List) -> typing.List: """Performs an iteration of the CTPC algorithm using the node ``var_id`` as ``test_child``. :param var_id: the node label of the test child @@ -215,20 +208,23 @@ class StructureEstimator: :param tot_vars_count: the number of _nodes in the net :type tot_vars_count: int """ - u = list(self._complete_graph.predecessors(var_id)) - #child_states_numb = self._sample_path.structure.get_states_number(var_id) - child_states_numb = self._nodes_vals[np.where(self._nodes == var_id)][0] - #print("Child States Numb", child_states_numb) + existing_shm_times = shared_memory.SharedMemory(name='sh_times') + times = np.ndarray(shm_dims[0], dtype=np.float, buffer=existing_shm_times.buf) + existing_shm_traj = shared_memory.SharedMemory(name='sh_traj') + trajectory = np.ndarray(shm_dims[1], dtype=np.int64, buffer=existing_shm_traj.buf) b = 0 while b < len(u): parent_indx = 0 while parent_indx < len(u): removed = False - S = self.generate_possible_sub_sets_of_size(u, b, u[parent_indx]) + S = StructureEstimator.generate_possible_sub_sets_of_size(u, b, u[parent_indx]) test_parent = u[parent_indx] for parents_set in S: - if self.complete_test(test_parent, var_id, parents_set, child_states_numb, tot_vars_count, cache): - #self._complete_graph.remove_edge(test_parent, var_id) + if StructureEstimator.complete_test(test_parent, var_id, parents_set, + child_states_numb, tot_vars_count, cache, + nodes, nodes_indxs, nodes_vals, times, trajectory, + tests_alfas[0], tests_alfas[1]): + u.remove(test_parent) removed = True break @@ -236,10 +232,15 @@ class StructureEstimator: parent_indx += 1 b += 1 #print("Parent set for node ", var_id, " : ", u) + del times + existing_shm_times.close() + del trajectory + existing_shm_traj.close() cache.clear() return u - def generate_possible_sub_sets_of_size(self, u: typing.List, size: int, parent_label: str) -> \ + @staticmethod + def generate_possible_sub_sets_of_size(u: typing.List, size: int, parent_label: str) -> \ typing.Iterator: """Creates a list containing all possible subsets of the list ``u`` of size ``size``, that do not contains a the node identified by ``parent_label``. @@ -260,16 +261,29 @@ class StructureEstimator: def ctpc_algorithm(self, multi_processing: bool) -> None: """Compute the CTPC algorithm over the entire net. """ - ctpc_algo = self.one_iteration_of_CTPC_algorithm - #total_vars_numb = self._sample_path.total_variables_count + ctpc_algo = StructureEstimator.one_iteration_of_CTPC_algorithm total_vars_numb_list = [self._tot_vars_number] * len(self._nodes) + parents_list = [list(self._complete_graph.predecessors(var_id)) for var_id in self._nodes] + nodes_array_list = [self._nodes] * len(self._nodes) + nodes_indxs_array_list = [self._nodes_indxs] * len(self._nodes) + nodes_vals_array_list = [self._nodes_vals] * len(self._nodes) + tests_alfa_dims_list = [(self._exp_test_sign, self._chi_test_alfa)] * len(self._nodes) + datas_dims_list = [[self._times.shape, self._trajectories.shape]] * len(self._nodes) if multi_processing: cpu_count = multiprocessing.cpu_count() else: cpu_count = 1 print("CPU COUNT", cpu_count) with multiprocessing.Pool(processes=cpu_count) as pool: - parent_sets = pool.starmap(ctpc_algo, zip(self._nodes, self._caches, total_vars_numb_list)) + parent_sets = pool.starmap(ctpc_algo, zip(self._nodes, self._nodes_vals, parents_list, + self._caches, total_vars_numb_list, + nodes_array_list, nodes_indxs_array_list, nodes_vals_array_list, + tests_alfa_dims_list, datas_dims_list)) + + self._shm_times.close() + self._shm_times.unlink() + self._shm_trajectories.close() + self._shm_trajectories.unlink() self._result_graph = self.build_result_graph(self._nodes, parent_sets) def save_results(self) -> None: diff --git a/PyCTBN/tests/test_structure_estimator.py b/PyCTBN/tests/test_structure_estimator.py index 62ade8c..f1c5265 100644 --- a/PyCTBN/tests/test_structure_estimator.py +++ b/PyCTBN/tests/test_structure_estimator.py @@ -83,6 +83,8 @@ class TestStructureEstimator(unittest.TestCase): lp_wrapper = lp(se1.ctpc_algorithm) lp_wrapper(MULTI_PROCESSING) lp.print_stats() + #paralell_time = timeit.timeit(se1.ctpc_algorithm, MULTI_PROCESSING, number=1) + #print("EXEC TIME:", paralell_time) print(se1._result_graph.edges) print(self.s1.structure.edges) for ed in self.s1.structure.edges: