1
0
Fork 0

Added start method to parallelization

master
Luca Moretti 4 years ago
parent c1ce2de331
commit 864d55bfd6
  1. 5
      main_package/classes/estimators/structure_constraint_based_estimator.py
  2. 4
      main_package/classes/estimators/structure_score_based_estimator.py
  3. 5
      main_package/tests/estimators/test_structure_constraint_based_estimator.py
  4. 18
      main_package/tests/estimators/test_structure_score_based_estimator.py
  5. 3
      main_package/tests/optimizers/test_tabu_search.py

@ -24,6 +24,7 @@ from utility.decorators import timing,timing_write
import multiprocessing
from multiprocessing import Pool
from multiprocessing import get_context
class StructureConstraintBasedEstimator(se.StructureEstimator):
"""
@ -241,7 +242,9 @@ class StructureConstraintBasedEstimator(se.StructureEstimator):
self.sample_path.structure.clean_structure_edges()
'Estimate the best parents for each node'
with multiprocessing.Pool(processes=cpu_count) as pool:
#with multiprocessing.Pool(processes=cpu_count) as pool:
with get_context("spawn").Pool(processes=cpu_count) as pool:
list_edges_partial = pool.starmap(ctpc_algo, zip(
self.nodes,
total_vars_numb_array))

@ -24,6 +24,7 @@ import optimizers.tabu_search as tabu
from utility.decorators import timing,timing_write
from multiprocessing import get_context
#from numba import njit
@ -92,7 +93,8 @@ class StructureScoreBasedEstimator(se.StructureEstimator):
cpu_count = 1
'Estimate the best parents for each node'
with multiprocessing.Pool(processes=cpu_count) as pool:
with get_context("spawn").Pool(processes=cpu_count) as pool:
#with multiprocessing.Pool(processes=cpu_count) as pool:
list_edges_partial = pool.starmap(estimate_parents, zip(
self.nodes,
l_max_parents,

@ -15,6 +15,8 @@ import structure_graph.sample_path as sp
import estimators.structure_constraint_based_estimator as se
import utility.json_importer as ji
from multiprocessing import set_start_method
import copy
@ -22,7 +24,7 @@ class TestStructureConstraintBasedEstimator(unittest.TestCase):
@classmethod
def setUpClass(cls):
#cls.read_files = glob.glob(os.path.join('../../data', "*.json"))
cls.importer = ji.JsonImporter("../../data/networks_and_trajectories_ternary_data_15.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
cls.importer = ji.JsonImporter("../../data/networks_and_trajectories_ternary_data_15.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name',1)
cls.s1 = sp.SamplePath(cls.importer)
cls.s1.build_trajectories()
cls.s1.build_structure()
@ -31,6 +33,7 @@ class TestStructureConstraintBasedEstimator(unittest.TestCase):
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
set_start_method("spawn")
se1 = se.StructureConstraintBasedEstimator(self.s1,0.1,0.1)
edges = se1.estimate_structure(disable_multiprocessing=False)

@ -16,6 +16,9 @@ import structure_graph.sample_path as sp
import estimators.structure_score_based_estimator as se
import utility.json_importer as ji
from multiprocessing import set_start_method
class TestStructureScoreBasedEstimator(unittest.TestCase):
@ -23,7 +26,7 @@ class TestStructureScoreBasedEstimator(unittest.TestCase):
@classmethod
def setUpClass(cls):
#cls.read_files = glob.glob(os.path.join('../../data', "*.json"))
cls.importer = ji.JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
cls.importer = ji.JsonImporter("../../data/networks_and_trajectories_ternary_data_15.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
cls.s1 = sp.SamplePath(cls.importer)
cls.s1.build_trajectories()
cls.s1.build_structure()
@ -34,12 +37,17 @@ class TestStructureScoreBasedEstimator(unittest.TestCase):
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
set_start_method("spawn")
se1 = se.StructureScoreBasedEstimator(self.s1)
edges = se1.estimate_structure(
max_parents = None,
iterations_number = 100,
patience = None
)
max_parents = None,
iterations_number = 100,
patience = 35,
tabu_length = 15,
tabu_rules_duration = 15,
optimizer = 'tabu',
disable_multiprocessing=False
)
self.assertEqual(edges, true_edges)

@ -42,7 +42,8 @@ class TestTabuSearch(unittest.TestCase):
patience = None,
tabu_length = 15,
tabu_rules_duration = 15,
optimizer = 'tabu'
optimizer = 'tabu',
disable_multiprocessing=True
)