From 28e3000ed1226aaebec04a753aed39d01d7ff267 Mon Sep 17 00:00:00 2001 From: Luca Moretti Date: Sun, 20 Dec 2020 21:08:14 +0100 Subject: [PATCH] Added possible implementation of a parallel constraint-based approach --- .../structure_constraint_based_estimator.py | 32 +++++++- .../optimizers/constraint_based_optimizer.py | 30 +++++-- ...st_structure_constraint_based_estimator.py | 80 +++---------------- 3 files changed, 62 insertions(+), 80 deletions(-) diff --git a/main_package/classes/estimators/structure_constraint_based_estimator.py b/main_package/classes/estimators/structure_constraint_based_estimator.py index db533c2..34eaa10 100644 --- a/main_package/classes/estimators/structure_constraint_based_estimator.py +++ b/main_package/classes/estimators/structure_constraint_based_estimator.py @@ -21,6 +21,9 @@ import optimizers.constraint_based_optimizer as optimizer from utility.decorators import timing,timing_write +import multiprocessing +from multiprocessing import Pool + class StructureConstraintBasedEstimator(se.StructureEstimator): """ @@ -209,7 +212,7 @@ class StructureConstraintBasedEstimator(se.StructureEstimator): node_id = var_id, structure_estimator = self, tot_vars_count = tot_vars_count) - optimizer_obj.optimize_structure() + return optimizer_obj.optimize_structure() @timing def ctpc_algorithm(self): @@ -222,12 +225,33 @@ class StructureConstraintBasedEstimator(se.StructureEstimator): """ ctpc_algo = self.one_iteration_of_CTPC_algorithm total_vars_numb = self.sample_path.total_variables_count - [ctpc_algo(n, total_vars_numb) for n in self.nodes] + n_nodes= len(self.nodes) + + total_vars_numb_array = [total_vars_numb] * n_nodes + + 'get the number of CPU' + cpu_count = multiprocessing.cpu_count() + + #if disable_multiprocessing: + #cpu_count = 1 + + 'Remove all the edges from the structure' + self.sample_path.structure.clean_structure_edges() + + 'Estimate the best parents for each node' + with multiprocessing.Pool(processes=cpu_count) as pool: + list_edges_partial = pool.starmap(ctpc_algo, zip( + self.nodes, + total_vars_numb_array)) + #list_edges_partial = [ctpc_algo(n,total_vars_numb) for n in self.nodes] + + return set(itertools.chain.from_iterable(list_edges_partial)) + + @timing def estimate_structure(self): - self.ctpc_algorithm() - return set(self.complete_graph.edges) + return self.ctpc_algorithm() diff --git a/main_package/classes/optimizers/constraint_based_optimizer.py b/main_package/classes/optimizers/constraint_based_optimizer.py index 7fefa02..dc6b6cc 100644 --- a/main_package/classes/optimizers/constraint_based_optimizer.py +++ b/main_package/classes/optimizers/constraint_based_optimizer.py @@ -12,6 +12,8 @@ from random import choice from abc import ABC +import copy + from optimizers.optimizer import Optimizer from estimators import structure_estimator as se @@ -48,7 +50,16 @@ class ConstraintBasedOptimizer(Optimizer): """ print("##################TESTING VAR################", self.node_id) - u = list(self.structure_estimator.complete_graph.predecessors(self.node_id)) + + graph = ng.NetworkGraph(self.structure_estimator.sample_path.structure) + + other_nodes = [node for node in self.structure_estimator.sample_path.structure.nodes_labels if node != self.node_id] + + for possible_parent in other_nodes: + graph.add_edges([(possible_parent,self.node_id)]) + + + u = other_nodes #tests_parents_numb = len(u) #complete_frame = self.complete_graph_frame #test_frame = complete_frame.loc[complete_frame['To'].isin([self.node_id])] @@ -57,22 +68,26 @@ class ConstraintBasedOptimizer(Optimizer): while b < len(u): #for parent_id in u: parent_indx = 0 - while parent_indx < len(u): + list_parent= copy.deepcopy(u) + for possible_parent in list_parent: removed = False #if not list(self.structure_estimator.generate_possible_sub_sets_of_size(u, b, u[parent_indx])): #break - S = self.structure_estimator.generate_possible_sub_sets_of_size(u, b, u[parent_indx]) + S = self.structure_estimator.generate_possible_sub_sets_of_size(u, b, possible_parent) #print("U Set", u) #print("S", S) - test_parent = u[parent_indx] + test_parent = possible_parent #print("Test Parent", test_parent) for parents_set in S: #print("Parent Set", parents_set) #print("Test Parent", test_parent) if self.structure_estimator.complete_test(test_parent, self.node_id, parents_set, child_states_numb, self.tot_vars_count): #print("Removing EDGE:", test_parent, self.node_id) - self.structure_estimator.complete_graph.remove_edge(test_parent, self.node_id) - u.remove(test_parent) + graph.remove_edges([(test_parent, self.node_id)]) + other_nodes.remove(test_parent) + print(f"TEST PARENT: {test_parent}") + if u.__contains__(test_parent): + u.remove(test_parent) removed = True break #else: @@ -80,4 +95,5 @@ class ConstraintBasedOptimizer(Optimizer): if not removed: parent_indx += 1 b += 1 - self.structure_estimator.cache.clear() \ No newline at end of file + self.structure_estimator.cache.clear() + return graph.edges diff --git a/main_package/tests/estimators/test_structure_constraint_based_estimator.py b/main_package/tests/estimators/test_structure_constraint_based_estimator.py index cc1487c..7f8621a 100644 --- a/main_package/tests/estimators/test_structure_constraint_based_estimator.py +++ b/main_package/tests/estimators/test_structure_constraint_based_estimator.py @@ -15,85 +15,27 @@ import structure_graph.sample_path as sp import estimators.structure_constraint_based_estimator as se import utility.json_importer as ji +import copy -class TestStructureConstraintBasedEstimator(unittest.TestCase): +class TestStructureConstraintBasedEstimator(unittest.TestCase): @classmethod def setUpClass(cls): - cls.read_files = glob.glob(os.path.join('../../data', "*.json")) - cls.importer = ji.JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name') + #cls.read_files = glob.glob(os.path.join('../../data', "*.json")) + cls.importer = ji.JsonImporter("../../data/networks_and_trajectories_binary_data_01_15.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name') cls.s1 = sp.SamplePath(cls.importer) cls.s1.build_trajectories() cls.s1.build_structure() - def test_init(self): - exp_alfa = 0.1 - chi_alfa = 0.1 - se1 = se.StructureConstraintBasedEstimator(self.s1, exp_alfa, chi_alfa) - self.assertEqual(self.s1, se1.sample_path) - self.assertTrue(np.array_equal(se1.nodes, np.array(self.s1.structure.nodes_labels))) - self.assertTrue(np.array_equal(se1.nodes_indxs, self.s1.structure.nodes_indexes)) - self.assertTrue(np.array_equal(se1.nodes_vals, self.s1.structure.nodes_values)) - self.assertEqual(se1.exp_test_sign, exp_alfa) - self.assertEqual(se1.chi_test_alfa, chi_alfa) - self.assertIsInstance(se1.complete_graph, nx.DiGraph) - self.assertIsInstance(se1.cache, ch.Cache) - - def test_build_complete_graph(self): - exp_alfa = 0.1 - chi_alfa = 0.1 - nodes_numb = len(self.s1.structure.nodes_labels) - se1 = se.StructureConstraintBasedEstimator(self.s1, exp_alfa, chi_alfa) - cg = se1.build_complete_graph(self.s1.structure.nodes_labels) - self.assertEqual(len(cg.edges), nodes_numb*(nodes_numb - 1)) - for node in self.s1.structure.nodes_labels: - no_self_loops = self.s1.structure.nodes_labels[:] - no_self_loops.remove(node) - for n2 in no_self_loops: - self.assertIn((node, n2), cg.edges) - - def test_generate_possible_sub_sets_of_size(self): - exp_alfa = 0.1 - chi_alfa = 0.1 - nodes_numb = len(self.s1.structure.nodes_labels) - se1 = se.StructureConstraintBasedEstimator(self.s1, exp_alfa, chi_alfa) - - for node in self.s1.structure.nodes_labels: - for b in range(nodes_numb): - sets = se1.generate_possible_sub_sets_of_size(self.s1.structure.nodes_labels, b, node) - sets2 = se1.generate_possible_sub_sets_of_size(self.s1.structure.nodes_labels, b, node) - self.assertEqual(len(list(sets)), math.floor(math.factorial(nodes_numb - 1) / - (math.factorial(b)*math.factorial(nodes_numb -1 - b)))) - for sset in sets2: - self.assertFalse(node in sset) + def test_structure(self): + true_edges = copy.deepcopy(self.s1.structure.edges) + true_edges = set(map(tuple, true_edges)) - def test_time(self): - se1 = se.StructureConstraintBasedEstimator(self.s1, 0.1, 0.1) - lp = LineProfiler() - lp.add_function(se1.complete_test) - lp.add_function(se1.one_iteration_of_CTPC_algorithm) - lp.add_function(se1.independence_test) - lp_wrapper = lp(se1.ctpc_algorithm) - lp_wrapper() - lp.print_stats() - print(se1.complete_graph.edges) - print(self.s1.structure.edges) - for ed in self.s1.structure.edges: - self.assertIn(tuple(ed), se1.complete_graph.edges) - tuples_edges = [tuple(rec) for rec in self.s1.structure.edges] - spurious_edges = [] - for ed in se1.complete_graph.edges: - if not(ed in tuples_edges): - spurious_edges.append(ed) - print("Spurious Edges:",spurious_edges) - se1.save_results() + se1 = se.StructureConstraintBasedEstimator(self.s1,0.1,0.1) + edges = se1.ctpc_algorithm() + - def test_memory(self): - se1 = se.StructureConstraintBasedEstimator(self.s1, 0.1, 0.1) - se1.ctpc_algorithm() - current_process = psutil.Process(os.getpid()) - mem = current_process.memory_info().rss - print("Average Memory Usage in MB:", mem / 10**6) + self.assertEqual(edges, true_edges) if __name__ == '__main__': unittest.main()