1
0
Fork 0

Added possible implementation of a parallel constraint-based approach

master
Luca Moretti 4 years ago
parent c1b073119f
commit 28e3000ed1
  1. 32
      main_package/classes/estimators/structure_constraint_based_estimator.py
  2. 26
      main_package/classes/optimizers/constraint_based_optimizer.py
  3. 78
      main_package/tests/estimators/test_structure_constraint_based_estimator.py

@ -21,6 +21,9 @@ import optimizers.constraint_based_optimizer as optimizer
from utility.decorators import timing,timing_write
import multiprocessing
from multiprocessing import Pool
class StructureConstraintBasedEstimator(se.StructureEstimator):
"""
@ -209,7 +212,7 @@ class StructureConstraintBasedEstimator(se.StructureEstimator):
node_id = var_id,
structure_estimator = self,
tot_vars_count = tot_vars_count)
optimizer_obj.optimize_structure()
return optimizer_obj.optimize_structure()
@timing
def ctpc_algorithm(self):
@ -222,12 +225,33 @@ class StructureConstraintBasedEstimator(se.StructureEstimator):
"""
ctpc_algo = self.one_iteration_of_CTPC_algorithm
total_vars_numb = self.sample_path.total_variables_count
[ctpc_algo(n, total_vars_numb) for n in self.nodes]
n_nodes= len(self.nodes)
total_vars_numb_array = [total_vars_numb] * n_nodes
'get the number of CPU'
cpu_count = multiprocessing.cpu_count()
#if disable_multiprocessing:
#cpu_count = 1
'Remove all the edges from the structure'
self.sample_path.structure.clean_structure_edges()
'Estimate the best parents for each node'
with multiprocessing.Pool(processes=cpu_count) as pool:
list_edges_partial = pool.starmap(ctpc_algo, zip(
self.nodes,
total_vars_numb_array))
#list_edges_partial = [ctpc_algo(n,total_vars_numb) for n in self.nodes]
return set(itertools.chain.from_iterable(list_edges_partial))
@timing
def estimate_structure(self):
self.ctpc_algorithm()
return set(self.complete_graph.edges)
return self.ctpc_algorithm()

@ -12,6 +12,8 @@ from random import choice
from abc import ABC
import copy
from optimizers.optimizer import Optimizer
from estimators import structure_estimator as se
@ -48,7 +50,16 @@ class ConstraintBasedOptimizer(Optimizer):
"""
print("##################TESTING VAR################", self.node_id)
u = list(self.structure_estimator.complete_graph.predecessors(self.node_id))
graph = ng.NetworkGraph(self.structure_estimator.sample_path.structure)
other_nodes = [node for node in self.structure_estimator.sample_path.structure.nodes_labels if node != self.node_id]
for possible_parent in other_nodes:
graph.add_edges([(possible_parent,self.node_id)])
u = other_nodes
#tests_parents_numb = len(u)
#complete_frame = self.complete_graph_frame
#test_frame = complete_frame.loc[complete_frame['To'].isin([self.node_id])]
@ -57,21 +68,25 @@ class ConstraintBasedOptimizer(Optimizer):
while b < len(u):
#for parent_id in u:
parent_indx = 0
while parent_indx < len(u):
list_parent= copy.deepcopy(u)
for possible_parent in list_parent:
removed = False
#if not list(self.structure_estimator.generate_possible_sub_sets_of_size(u, b, u[parent_indx])):
#break
S = self.structure_estimator.generate_possible_sub_sets_of_size(u, b, u[parent_indx])
S = self.structure_estimator.generate_possible_sub_sets_of_size(u, b, possible_parent)
#print("U Set", u)
#print("S", S)
test_parent = u[parent_indx]
test_parent = possible_parent
#print("Test Parent", test_parent)
for parents_set in S:
#print("Parent Set", parents_set)
#print("Test Parent", test_parent)
if self.structure_estimator.complete_test(test_parent, self.node_id, parents_set, child_states_numb, self.tot_vars_count):
#print("Removing EDGE:", test_parent, self.node_id)
self.structure_estimator.complete_graph.remove_edge(test_parent, self.node_id)
graph.remove_edges([(test_parent, self.node_id)])
other_nodes.remove(test_parent)
print(f"TEST PARENT: {test_parent}")
if u.__contains__(test_parent):
u.remove(test_parent)
removed = True
break
@ -81,3 +96,4 @@ class ConstraintBasedOptimizer(Optimizer):
parent_indx += 1
b += 1
self.structure_estimator.cache.clear()
return graph.edges

@ -15,85 +15,27 @@ import structure_graph.sample_path as sp
import estimators.structure_constraint_based_estimator as se
import utility.json_importer as ji
import copy
class TestStructureConstraintBasedEstimator(unittest.TestCase):
class TestStructureConstraintBasedEstimator(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.read_files = glob.glob(os.path.join('../../data', "*.json"))
cls.importer = ji.JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
#cls.read_files = glob.glob(os.path.join('../../data', "*.json"))
cls.importer = ji.JsonImporter("../../data/networks_and_trajectories_binary_data_01_15.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
cls.s1 = sp.SamplePath(cls.importer)
cls.s1.build_trajectories()
cls.s1.build_structure()
def test_init(self):
exp_alfa = 0.1
chi_alfa = 0.1
se1 = se.StructureConstraintBasedEstimator(self.s1, exp_alfa, chi_alfa)
self.assertEqual(self.s1, se1.sample_path)
self.assertTrue(np.array_equal(se1.nodes, np.array(self.s1.structure.nodes_labels)))
self.assertTrue(np.array_equal(se1.nodes_indxs, self.s1.structure.nodes_indexes))
self.assertTrue(np.array_equal(se1.nodes_vals, self.s1.structure.nodes_values))
self.assertEqual(se1.exp_test_sign, exp_alfa)
self.assertEqual(se1.chi_test_alfa, chi_alfa)
self.assertIsInstance(se1.complete_graph, nx.DiGraph)
self.assertIsInstance(se1.cache, ch.Cache)
def test_build_complete_graph(self):
exp_alfa = 0.1
chi_alfa = 0.1
nodes_numb = len(self.s1.structure.nodes_labels)
se1 = se.StructureConstraintBasedEstimator(self.s1, exp_alfa, chi_alfa)
cg = se1.build_complete_graph(self.s1.structure.nodes_labels)
self.assertEqual(len(cg.edges), nodes_numb*(nodes_numb - 1))
for node in self.s1.structure.nodes_labels:
no_self_loops = self.s1.structure.nodes_labels[:]
no_self_loops.remove(node)
for n2 in no_self_loops:
self.assertIn((node, n2), cg.edges)
def test_generate_possible_sub_sets_of_size(self):
exp_alfa = 0.1
chi_alfa = 0.1
nodes_numb = len(self.s1.structure.nodes_labels)
se1 = se.StructureConstraintBasedEstimator(self.s1, exp_alfa, chi_alfa)
def test_structure(self):
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
for node in self.s1.structure.nodes_labels:
for b in range(nodes_numb):
sets = se1.generate_possible_sub_sets_of_size(self.s1.structure.nodes_labels, b, node)
sets2 = se1.generate_possible_sub_sets_of_size(self.s1.structure.nodes_labels, b, node)
self.assertEqual(len(list(sets)), math.floor(math.factorial(nodes_numb - 1) /
(math.factorial(b)*math.factorial(nodes_numb -1 - b))))
for sset in sets2:
self.assertFalse(node in sset)
se1 = se.StructureConstraintBasedEstimator(self.s1,0.1,0.1)
edges = se1.ctpc_algorithm()
def test_time(self):
se1 = se.StructureConstraintBasedEstimator(self.s1, 0.1, 0.1)
lp = LineProfiler()
lp.add_function(se1.complete_test)
lp.add_function(se1.one_iteration_of_CTPC_algorithm)
lp.add_function(se1.independence_test)
lp_wrapper = lp(se1.ctpc_algorithm)
lp_wrapper()
lp.print_stats()
print(se1.complete_graph.edges)
print(self.s1.structure.edges)
for ed in self.s1.structure.edges:
self.assertIn(tuple(ed), se1.complete_graph.edges)
tuples_edges = [tuple(rec) for rec in self.s1.structure.edges]
spurious_edges = []
for ed in se1.complete_graph.edges:
if not(ed in tuples_edges):
spurious_edges.append(ed)
print("Spurious Edges:",spurious_edges)
se1.save_results()
def test_memory(self):
se1 = se.StructureConstraintBasedEstimator(self.s1, 0.1, 0.1)
se1.ctpc_algorithm()
current_process = psutil.Process(os.getpid())
mem = current_process.memory_info().rss
print("Average Memory Usage in MB:", mem / 10**6)
self.assertEqual(edges, true_edges)
if __name__ == '__main__':
unittest.main()