1
0
Fork 0

test PoolExecutor

master
Luca Moretti 4 years ago
parent 864d55bfd6
commit aa7a7a488b
  1. 32
      main_package/classes/estimators/structure_score_based_estimator.py
  2. 9
      main_package/tests/estimators/test_structure_score_based_estimator.py

@ -10,6 +10,8 @@ from networkx.readwrite import json_graph
from random import choice from random import choice
import concurrent.futures
import copy import copy
import utility.cache as ch import utility.cache as ch
import structure_graph.conditional_intensity_matrix as condim import structure_graph.conditional_intensity_matrix as condim
@ -92,17 +94,27 @@ class StructureScoreBasedEstimator(se.StructureEstimator):
if disable_multiprocessing: if disable_multiprocessing:
cpu_count = 1 cpu_count = 1
'Estimate the best parents for each node'
with get_context("spawn").Pool(processes=cpu_count) as pool:
#with get_context("spawn").Pool(processes=cpu_count) as pool:
#with multiprocessing.Pool(processes=cpu_count) as pool: #with multiprocessing.Pool(processes=cpu_count) as pool:
list_edges_partial = pool.starmap(estimate_parents, zip(
self.nodes, 'Estimate the best parents for each node'
l_max_parents, with concurrent.futures.ProcessPoolExecutor(max_workers=cpu_count) as executor:
l_iterations_number, list_edges_partial = executor.map(estimate_parents,
l_patience, self.nodes,
l_tabu_length, l_max_parents,
l_tabu_rules_duration, l_iterations_number,
l_optimizer)) l_patience,
l_tabu_length,
l_tabu_rules_duration,
l_optimizer)
# list_edges_partial = [estimate_parents(n,max_parents,iterations_number,patience,tabu_length,tabu_rules_duration,optimizer) for n in self.nodes] # list_edges_partial = [estimate_parents(n,max_parents,iterations_number,patience,tabu_length,tabu_rules_duration,optimizer) for n in self.nodes]
#list_edges_partial = p.map(estimate_parents, self.nodes) #list_edges_partial = p.map(estimate_parents, self.nodes)
#list_edges_partial= estimate_parents('Q',max_parents,iterations_number,patience,tabu_length,tabu_rules_duration,optimizer) #list_edges_partial= estimate_parents('Q',max_parents,iterations_number,patience,tabu_length,tabu_rules_duration,optimizer)

@ -16,9 +16,6 @@ import structure_graph.sample_path as sp
import estimators.structure_score_based_estimator as se import estimators.structure_score_based_estimator as se
import utility.json_importer as ji import utility.json_importer as ji
from multiprocessing import set_start_method
class TestStructureScoreBasedEstimator(unittest.TestCase): class TestStructureScoreBasedEstimator(unittest.TestCase):
@ -26,7 +23,7 @@ class TestStructureScoreBasedEstimator(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
#cls.read_files = glob.glob(os.path.join('../../data', "*.json")) #cls.read_files = glob.glob(os.path.join('../../data', "*.json"))
cls.importer = ji.JsonImporter("../../data/networks_and_trajectories_ternary_data_15.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name') cls.importer = ji.JsonImporter("../../data/networks_and_trajectories_binary_data_10.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
cls.s1 = sp.SamplePath(cls.importer) cls.s1 = sp.SamplePath(cls.importer)
cls.s1.build_trajectories() cls.s1.build_trajectories()
cls.s1.build_structure() cls.s1.build_structure()
@ -37,7 +34,7 @@ class TestStructureScoreBasedEstimator(unittest.TestCase):
true_edges = copy.deepcopy(self.s1.structure.edges) true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges)) true_edges = set(map(tuple, true_edges))
set_start_method("spawn")
se1 = se.StructureScoreBasedEstimator(self.s1) se1 = se.StructureScoreBasedEstimator(self.s1)
edges = se1.estimate_structure( edges = se1.estimate_structure(
max_parents = None, max_parents = None,
@ -46,7 +43,7 @@ class TestStructureScoreBasedEstimator(unittest.TestCase):
tabu_length = 15, tabu_length = 15,
tabu_rules_duration = 15, tabu_rules_duration = 15,
optimizer = 'tabu', optimizer = 'tabu',
disable_multiprocessing=False disable_multiprocessing=True
) )