1
0
Fork 0

Refactor shared_memory init in StructureEstimator

parallel_struct_est
Filippo Martini 4 years ago
parent 4befa5f3e5
commit f1eed08e07
  1. 34
      PyCTBN/PyCTBN/structure_estimator.py

@ -42,19 +42,8 @@ class StructureEstimator:
#self._sample_path = sample_path #self._sample_path = sample_path
self._nodes = np.array(sample_path.structure.nodes_labels) self._nodes = np.array(sample_path.structure.nodes_labels)
self._tot_vars_number = sample_path.total_variables_count self._tot_vars_number = sample_path.total_variables_count
self._shm_times = multiprocessing.shared_memory.SharedMemory(name='sh_times', create=True, self._times = sample_path.trajectories.times
size=sample_path.trajectories.times.nbytes) self._trajectories = sample_path.trajectories.complete_trajectory
self._shm_trajectories = multiprocessing.shared_memory.SharedMemory(name='sh_traj', create=True,
size=sample_path.trajectories.complete_trajectory.nbytes)
self._times = np.ndarray(sample_path.trajectories.times.shape, sample_path.trajectories.times.dtype,
self._shm_times.buf)
self._times[:] = sample_path.trajectories.times[:]
self._trajectories = np.ndarray(sample_path.trajectories.complete_trajectory.shape,
sample_path.trajectories.complete_trajectory.dtype, self._shm_trajectories.buf)
self._trajectories[:] = sample_path.trajectories.complete_trajectory[:]
self._nodes_vals = sample_path.structure.nodes_values self._nodes_vals = sample_path.structure.nodes_values
self._nodes_indxs = sample_path.structure.nodes_indexes self._nodes_indxs = sample_path.structure.nodes_indexes
self._complete_graph = self.build_complete_graph(sample_path.structure.nodes_labels) self._complete_graph = self.build_complete_graph(sample_path.structure.nodes_labels)
@ -262,6 +251,17 @@ class StructureEstimator:
"""Compute the CTPC algorithm over the entire net. """Compute the CTPC algorithm over the entire net.
""" """
ctpc_algo = StructureEstimator.one_iteration_of_CTPC_algorithm ctpc_algo = StructureEstimator.one_iteration_of_CTPC_algorithm
shm_times = multiprocessing.shared_memory.SharedMemory(name='sh_times', create=True,
size=self._.times.nbytes)
shm_trajectories = multiprocessing.shared_memory.SharedMemory(name='sh_traj', create=True,
size=self._trajectories.nbytes)
times_arr = np.ndarray(self._times.shape, self._times.dtype,
shm_times.buf)
times_arr[:] = self._times[:]
trajectories_arr = np.ndarray(self._trajectories.shape,
self._trajectories.dtype, shm_trajectories.buf)
trajectories_arr[:] = self._trajectories[:]
total_vars_numb_list = [self._tot_vars_number] * len(self._nodes) total_vars_numb_list = [self._tot_vars_number] * len(self._nodes)
parents_list = [list(self._complete_graph.predecessors(var_id)) for var_id in self._nodes] parents_list = [list(self._complete_graph.predecessors(var_id)) for var_id in self._nodes]
nodes_array_list = [self._nodes] * len(self._nodes) nodes_array_list = [self._nodes] * len(self._nodes)
@ -280,10 +280,10 @@ class StructureEstimator:
nodes_array_list, nodes_indxs_array_list, nodes_vals_array_list, nodes_array_list, nodes_indxs_array_list, nodes_vals_array_list,
tests_alfa_dims_list, datas_dims_list)) tests_alfa_dims_list, datas_dims_list))
self._shm_times.close() shm_times.close()
self._shm_times.unlink() shm_times.unlink()
self._shm_trajectories.close() shm_trajectories.close()
self._shm_trajectories.unlink() shm_trajectories.unlink()
self._result_graph = self.build_result_graph(self._nodes, parent_sets) self._result_graph = self.build_result_graph(self._nodes, parent_sets)
def save_results(self) -> None: def save_results(self) -> None: