1
0
Fork 0

Add share memory trajectories

parallel_struct_est
Filippo Martini 4 years ago
parent cc118cc176
commit f8e3d0fe2b
  1. 9
      PyCTBN/PyCTBN/parameters_estimator.py
  2. 16
      PyCTBN/PyCTBN/structure_estimator.py

@ -17,9 +17,10 @@ class ParametersEstimator:
:_single_set_of_cims: the set of cims object that will hold the cims of the node :_single_set_of_cims: the set of cims object that will hold the cims of the node
""" """
def __init__(self, trajectories: Trajectory, net_graph: NetworkGraph): def __init__(self, times, trajectories, net_graph: NetworkGraph):
"""Constructor Method """Constructor Method
""" """
self._times = times
self._trajectories = trajectories self._trajectories = trajectories
self._net_graph = net_graph self._net_graph = net_graph
self._single_set_of_cims = None self._single_set_of_cims = None
@ -45,13 +46,13 @@ class ParametersEstimator:
node_indx = self._net_graph.get_node_indx(node_id) node_indx = self._net_graph.get_node_indx(node_id)
state_res_times = self._single_set_of_cims._state_residence_times state_res_times = self._single_set_of_cims._state_residence_times
transition_matrices = self._single_set_of_cims._transition_matrices transition_matrices = self._single_set_of_cims._transition_matrices
self.compute_state_res_time_for_node(node_indx, self._trajectories.times, self.compute_state_res_time_for_node(node_indx, self._times,
self._trajectories.trajectory, self._trajectories,
self._net_graph.time_filtering, self._net_graph.time_filtering,
self._net_graph.time_scalar_indexing_strucure, self._net_graph.time_scalar_indexing_strucure,
state_res_times) state_res_times)
self.compute_state_transitions_for_a_node(node_indx, self.compute_state_transitions_for_a_node(node_indx,
self._trajectories.complete_trajectory, self._trajectories,
self._net_graph.transition_filtering, self._net_graph.transition_filtering,
self._net_graph.transition_scalar_indexing_structure, self._net_graph.transition_scalar_indexing_structure,
transition_matrices) transition_matrices)

@ -9,6 +9,7 @@ from networkx.readwrite import json_graph
from scipy.stats import chi2 as chi2_dist from scipy.stats import chi2 as chi2_dist
from scipy.stats import f as f_dist from scipy.stats import f as f_dist
import multiprocessing import multiprocessing
from multiprocessing import shared_memory
from .cache import Cache from .cache import Cache
@ -40,6 +41,15 @@ class StructureEstimator:
""" """
self._sample_path = sample_path self._sample_path = sample_path
self._nodes = np.array(self._sample_path.structure.nodes_labels) self._nodes = np.array(self._sample_path.structure.nodes_labels)
self._shm_times = multiprocessing.shared_memory.SharedMemory(create=True,
size=self._sample_path.trajectories.times.nbytes)
self._shm_trajectories = multiprocessing.shared_memory.SharedMemory(create=True,
size=self._sample_path.trajectories.complete_trajectory.nbytes)
self._times = np.ndarray(self._sample_path.trajectories.times.shape, self._sample_path.trajectories.times.dtype, self._shm_times.buf)
self._times[:] = self._sample_path.trajectories.times[:]
self._trajectories = np.ndarray(self._sample_path.trajectories.complete_trajectory.shape, self._sample_path.trajectories.complete_trajectory.dtype, self._shm_trajectories.buf)
self._trajectories[:] = self._sample_path.trajectories.complete_trajectory[:]
self._nodes_vals = self._sample_path.structure.nodes_values self._nodes_vals = self._sample_path.structure.nodes_values
self._nodes_indxs = self._sample_path.structure.nodes_indexes self._nodes_indxs = self._sample_path.structure.nodes_indexes
self._complete_graph = self.build_complete_graph(self._sample_path.structure.nodes_labels) self._complete_graph = self.build_complete_graph(self._sample_path.structure.nodes_labels)
@ -112,7 +122,7 @@ class StructureEstimator:
s1 = Structure(l1, indxs1, vals1, eds1, tot_vars_count) s1 = Structure(l1, indxs1, vals1, eds1, tot_vars_count)
g1 = NetworkGraph(s1) g1 = NetworkGraph(s1)
g1.fast_init(test_child) g1.fast_init(test_child)
p1 = ParametersEstimator(self._sample_path.trajectories, g1) p1 = ParametersEstimator(self._times, self._trajectories, g1)
p1.fast_init(test_child) p1.fast_init(test_child)
sofc1 = p1.compute_parameters_for_node(test_child) sofc1 = p1.compute_parameters_for_node(test_child)
cache.put(set(p_set), sofc1) cache.put(set(p_set), sofc1)
@ -130,7 +140,7 @@ class StructureEstimator:
s2 = Structure(l2, indxs2, vals2, eds2, tot_vars_count) s2 = Structure(l2, indxs2, vals2, eds2, tot_vars_count)
g2 = NetworkGraph(s2) g2 = NetworkGraph(s2)
g2.fast_init(test_child) g2.fast_init(test_child)
p2 = ParametersEstimator(self._sample_path.trajectories, g2) p2 = ParametersEstimator(self._times, self._trajectories, g2)
p2.fast_init(test_child) p2.fast_init(test_child)
sofc2 = p2.compute_parameters_for_node(test_child) sofc2 = p2.compute_parameters_for_node(test_child)
cache.put(set(p_set), sofc2) cache.put(set(p_set), sofc2)
@ -247,7 +257,7 @@ class StructureEstimator:
"""Save the estimated Structure to a .json file in the path where the data are loaded from. """Save the estimated Structure to a .json file in the path where the data are loaded from.
The file is named as the input dataset but the `results_` word is appended to the results file. The file is named as the input dataset but the `results_` word is appended to the results file.
""" """
res = json_graph.node_link_data(self._result_graph_graph) res = json_graph.node_link_data(self._result_graph)
name = self._sample_path._importer.file_path.rsplit('/', 1)[-1] + str(self._sample_path._importer.dataset_id()) name = self._sample_path._importer.file_path.rsplit('/', 1)[-1] + str(self._sample_path._importer.dataset_id())
name = 'results_' + name name = 'results_' + name
with open(name, 'w') as f: with open(name, 'w') as f: