1
0
Fork 0

Modify Shared Memory objects in StructureEst; Refactor all most class methods to static

parallel_struct_est
Filippo Martini 4 years ago
parent 5021d425b6
commit 4befa5f3e5
  1. 108
      PyCTBN/PyCTBN/structure_estimator.py
  2. 2
      PyCTBN/tests/test_structure_estimator.py

@ -42,16 +42,19 @@ class StructureEstimator:
#self._sample_path = sample_path #self._sample_path = sample_path
self._nodes = np.array(sample_path.structure.nodes_labels) self._nodes = np.array(sample_path.structure.nodes_labels)
self._tot_vars_number = sample_path.total_variables_count self._tot_vars_number = sample_path.total_variables_count
self._shm_times = multiprocessing.shared_memory.SharedMemory(create=True, self._shm_times = multiprocessing.shared_memory.SharedMemory(name='sh_times', create=True,
size=sample_path.trajectories.times.nbytes) size=sample_path.trajectories.times.nbytes)
self._shm_trajectories = multiprocessing.shared_memory.SharedMemory(create=True, self._shm_trajectories = multiprocessing.shared_memory.SharedMemory(name='sh_traj', create=True,
size=sample_path.trajectories.complete_trajectory.nbytes) size=sample_path.trajectories.complete_trajectory.nbytes)
self._times = np.ndarray(sample_path.trajectories.times.shape, sample_path.trajectories.times.dtype, self._shm_times.buf) self._times = np.ndarray(sample_path.trajectories.times.shape, sample_path.trajectories.times.dtype,
self._shm_times.buf)
self._times[:] = sample_path.trajectories.times[:] self._times[:] = sample_path.trajectories.times[:]
self._trajectories = np.ndarray(sample_path.trajectories.complete_trajectory.shape, self._trajectories = np.ndarray(sample_path.trajectories.complete_trajectory.shape,
sample_path.trajectories.complete_trajectory.dtype, self._shm_trajectories.buf) sample_path.trajectories.complete_trajectory.dtype, self._shm_trajectories.buf)
self._trajectories[:] = sample_path.trajectories.complete_trajectory[:] self._trajectories[:] = sample_path.trajectories.complete_trajectory[:]
self._nodes_vals = sample_path.structure.nodes_values self._nodes_vals = sample_path.structure.nodes_values
self._nodes_indxs = sample_path.structure.nodes_indexes self._nodes_indxs = sample_path.structure.nodes_indexes
self._complete_graph = self.build_complete_graph(sample_path.structure.nodes_labels) self._complete_graph = self.build_complete_graph(sample_path.structure.nodes_labels)
@ -86,8 +89,11 @@ class StructureEstimator:
result_graph.add_edges_from(edges) result_graph.add_edges_from(edges)
return result_graph return result_graph
def complete_test(self, test_parent: str, test_child: str, parent_set: typing.List, child_states_numb: int, @staticmethod
tot_vars_count: int, cache: Cache) -> bool: def complete_test(test_parent: str, test_child: str, parent_set: typing.List, child_states_numb: int,
tot_vars_count: int, cache: Cache, nodes: np.ndarray,
nodes_indxs: np.ndarray, nodes_vals: np.ndarray, times :np.ndarray, trajectories: np.ndarray,
exp_alfa: float, chi_alfa: float) -> bool:
"""Performs a complete independence test on the directed graphs G1 = {test_child U parent_set} """Performs a complete independence test on the directed graphs G1 = {test_child U parent_set}
G2 = {G1 U test_parent} (added as an additional parent of the test_child). G2 = {G1 U test_parent} (added as an additional parent of the test_child).
Generates all the necessary structures and datas to perform the tests. Generates all the necessary structures and datas to perform the tests.
@ -111,43 +117,26 @@ class StructureEstimator:
parents = np.array(parent_set) parents = np.array(parent_set)
parents = np.append(parents, test_parent) parents = np.append(parents, test_parent)
sorted_parents = self._nodes[np.isin(self._nodes, parents)] sorted_parents = nodes[np.isin(nodes, parents)]
cims_filter = sorted_parents != test_parent cims_filter = sorted_parents != test_parent
"""
sofc1 = cache.find(set(p_set))
if not sofc1:
bool_mask1 = np.isin(self._nodes, complete_info)
l1 = list(self._nodes[bool_mask1])
indxs1 = self._nodes_indxs[bool_mask1]
vals1 = self._nodes_vals[bool_mask1]
eds1 = list(itertools.product(parent_set,test_child))
s1 = Structure(l1, indxs1, vals1, eds1, tot_vars_count)
g1 = NetworkGraph(s1)
g1.fast_init(test_child)
p1 = ParametersEstimator(g1)
p1.fast_init(test_child)
sofc1 = p1.compute_parameters_for_node(test_child, self._times, self._trajectories)
cache.put(set(p_set), sofc1)
"""
#sofc2 = None #sofc2 = None
p_set.insert(0, test_parent) p_set.insert(0, test_parent)
sofc2 = cache.find(set(p_set)) sofc2 = cache.find(set(p_set))
if not sofc2: if not sofc2:
complete_info.append(test_parent) complete_info.append(test_parent)
bool_mask2 = np.isin(self._nodes, complete_info) bool_mask2 = np.isin(nodes, complete_info)
l2 = list(self._nodes[bool_mask2]) l2 = list(nodes[bool_mask2])
indxs2 = self._nodes_indxs[bool_mask2] indxs2 = nodes_indxs[bool_mask2]
vals2 = self._nodes_vals[bool_mask2] vals2 = nodes_vals[bool_mask2]
eds2 = list(itertools.product(p_set, test_child)) eds2 = list(itertools.product(p_set, test_child))
s2 = Structure(l2, indxs2, vals2, eds2, tot_vars_count) s2 = Structure(l2, indxs2, vals2, eds2, tot_vars_count)
g2 = NetworkGraph(s2) g2 = NetworkGraph(s2)
g2.fast_init(test_child) g2.fast_init(test_child)
p2 = ParametersEstimator(g2) p2 = ParametersEstimator(g2)
p2.fast_init(test_child) p2.fast_init(test_child)
sofc2 = p2.compute_parameters_for_node(test_child, self._times, self._trajectories) sofc2 = p2.compute_parameters_for_node(test_child, times, trajectories)
cache.put(set(p_set), sofc2) cache.put(set(p_set), sofc2)
del p_set[0] del p_set[0]
@ -157,18 +146,19 @@ class StructureEstimator:
g2.fast_init(test_child) g2.fast_init(test_child)
p2 = ParametersEstimator(g2) p2 = ParametersEstimator(g2)
p2.fast_init(test_child) p2.fast_init(test_child)
sofc1 = p2.compute_parameters_for_node(test_child, self._times, self._trajectories) sofc1 = p2.compute_parameters_for_node(test_child, times, trajectories)
cache.put(set(p_set), sofc1) cache.put(set(p_set), sofc1)
for cim1, p_comb in zip(sofc1.actual_cims, sofc1.p_combs): for cim1, p_comb in zip(sofc1.actual_cims, sofc1.p_combs):
cond_cims = sofc2.filter_cims_with_mask(cims_filter, p_comb) cond_cims = sofc2.filter_cims_with_mask(cims_filter, p_comb)
for cim2 in cond_cims: for cim2 in cond_cims:
if not self.independence_test(child_states_numb, cim1, cim2): if not StructureEstimator.independence_test(child_states_numb, cim1, cim2, exp_alfa, chi_alfa):
return False return False
return True return True
def independence_test(self, child_states_numb: int, cim1: ConditionalIntensityMatrix, @staticmethod
cim2: ConditionalIntensityMatrix) -> bool: def independence_test(child_states_numb: int, cim1: ConditionalIntensityMatrix,
cim2: ConditionalIntensityMatrix, exp_alfa, chi_test_alfa) -> bool:
"""Compute the actual independence test using two cims. """Compute the actual independence test using two cims.
It is performed first the exponential test and if the null hypothesis is not rejected, It is performed first the exponential test and if the null hypothesis is not rejected,
it is performed also the chi_test. it is performed also the chi_test.
@ -189,7 +179,6 @@ class StructureEstimator:
C1 = cim1.cim C1 = cim1.cim
C2 = cim2.cim C2 = cim2.cim
F_stats = C2.diagonal() / C1.diagonal() F_stats = C2.diagonal() / C1.diagonal()
exp_alfa = self._exp_test_sign
for val in range(0, child_states_numb): for val in range(0, child_states_numb):
if F_stats[val] < f_dist.ppf(exp_alfa / 2, r1s[val], r2s[val]) or \ if F_stats[val] < f_dist.ppf(exp_alfa / 2, r1s[val], r2s[val]) or \
F_stats[val] > f_dist.ppf(1 - exp_alfa / 2, r1s[val], r2s[val]): F_stats[val] > f_dist.ppf(1 - exp_alfa / 2, r1s[val], r2s[val]):
@ -197,7 +186,7 @@ class StructureEstimator:
M1_no_diag = M1[~np.eye(M1.shape[0], dtype=bool)].reshape(M1.shape[0], -1) M1_no_diag = M1[~np.eye(M1.shape[0], dtype=bool)].reshape(M1.shape[0], -1)
M2_no_diag = M2[~np.eye(M2.shape[0], dtype=bool)].reshape( M2_no_diag = M2[~np.eye(M2.shape[0], dtype=bool)].reshape(
M2.shape[0], -1) M2.shape[0], -1)
chi_2_quantile = chi2_dist.ppf(1 - self._chi_test_alfa, child_states_numb - 1) chi_2_quantile = chi2_dist.ppf(1 - chi_test_alfa, child_states_numb - 1)
Ks = np.sqrt(r1s / r2s) Ks = np.sqrt(r1s / r2s)
Ls = np.sqrt(r2s / r1s) Ls = np.sqrt(r2s / r1s)
for val in range(0, child_states_numb): for val in range(0, child_states_numb):
@ -207,7 +196,11 @@ class StructureEstimator:
return False return False
return True return True
def one_iteration_of_CTPC_algorithm(self, var_id: str, cache: Cache, tot_vars_count: int) -> typing.List: @staticmethod
def one_iteration_of_CTPC_algorithm(var_id: str, child_states_numb: int,
u: typing.List, cache: Cache,
tot_vars_count: int, nodes, nodes_indxs,
nodes_vals, tests_alfas: typing.Tuple, shm_dims: typing.List) -> typing.List:
"""Performs an iteration of the CTPC algorithm using the node ``var_id`` as ``test_child``. """Performs an iteration of the CTPC algorithm using the node ``var_id`` as ``test_child``.
:param var_id: the node label of the test child :param var_id: the node label of the test child
@ -215,20 +208,23 @@ class StructureEstimator:
:param tot_vars_count: the number of _nodes in the net :param tot_vars_count: the number of _nodes in the net
:type tot_vars_count: int :type tot_vars_count: int
""" """
u = list(self._complete_graph.predecessors(var_id)) existing_shm_times = shared_memory.SharedMemory(name='sh_times')
#child_states_numb = self._sample_path.structure.get_states_number(var_id) times = np.ndarray(shm_dims[0], dtype=np.float, buffer=existing_shm_times.buf)
child_states_numb = self._nodes_vals[np.where(self._nodes == var_id)][0] existing_shm_traj = shared_memory.SharedMemory(name='sh_traj')
#print("Child States Numb", child_states_numb) trajectory = np.ndarray(shm_dims[1], dtype=np.int64, buffer=existing_shm_traj.buf)
b = 0 b = 0
while b < len(u): while b < len(u):
parent_indx = 0 parent_indx = 0
while parent_indx < len(u): while parent_indx < len(u):
removed = False removed = False
S = self.generate_possible_sub_sets_of_size(u, b, u[parent_indx]) S = StructureEstimator.generate_possible_sub_sets_of_size(u, b, u[parent_indx])
test_parent = u[parent_indx] test_parent = u[parent_indx]
for parents_set in S: for parents_set in S:
if self.complete_test(test_parent, var_id, parents_set, child_states_numb, tot_vars_count, cache): if StructureEstimator.complete_test(test_parent, var_id, parents_set,
#self._complete_graph.remove_edge(test_parent, var_id) child_states_numb, tot_vars_count, cache,
nodes, nodes_indxs, nodes_vals, times, trajectory,
tests_alfas[0], tests_alfas[1]):
u.remove(test_parent) u.remove(test_parent)
removed = True removed = True
break break
@ -236,10 +232,15 @@ class StructureEstimator:
parent_indx += 1 parent_indx += 1
b += 1 b += 1
#print("Parent set for node ", var_id, " : ", u) #print("Parent set for node ", var_id, " : ", u)
del times
existing_shm_times.close()
del trajectory
existing_shm_traj.close()
cache.clear() cache.clear()
return u return u
def generate_possible_sub_sets_of_size(self, u: typing.List, size: int, parent_label: str) -> \ @staticmethod
def generate_possible_sub_sets_of_size(u: typing.List, size: int, parent_label: str) -> \
typing.Iterator: typing.Iterator:
"""Creates a list containing all possible subsets of the list ``u`` of size ``size``, """Creates a list containing all possible subsets of the list ``u`` of size ``size``,
that do not contains a the node identified by ``parent_label``. that do not contains a the node identified by ``parent_label``.
@ -260,16 +261,29 @@ class StructureEstimator:
def ctpc_algorithm(self, multi_processing: bool) -> None: def ctpc_algorithm(self, multi_processing: bool) -> None:
"""Compute the CTPC algorithm over the entire net. """Compute the CTPC algorithm over the entire net.
""" """
ctpc_algo = self.one_iteration_of_CTPC_algorithm ctpc_algo = StructureEstimator.one_iteration_of_CTPC_algorithm
#total_vars_numb = self._sample_path.total_variables_count
total_vars_numb_list = [self._tot_vars_number] * len(self._nodes) total_vars_numb_list = [self._tot_vars_number] * len(self._nodes)
parents_list = [list(self._complete_graph.predecessors(var_id)) for var_id in self._nodes]
nodes_array_list = [self._nodes] * len(self._nodes)
nodes_indxs_array_list = [self._nodes_indxs] * len(self._nodes)
nodes_vals_array_list = [self._nodes_vals] * len(self._nodes)
tests_alfa_dims_list = [(self._exp_test_sign, self._chi_test_alfa)] * len(self._nodes)
datas_dims_list = [[self._times.shape, self._trajectories.shape]] * len(self._nodes)
if multi_processing: if multi_processing:
cpu_count = multiprocessing.cpu_count() cpu_count = multiprocessing.cpu_count()
else: else:
cpu_count = 1 cpu_count = 1
print("CPU COUNT", cpu_count) print("CPU COUNT", cpu_count)
with multiprocessing.Pool(processes=cpu_count) as pool: with multiprocessing.Pool(processes=cpu_count) as pool:
parent_sets = pool.starmap(ctpc_algo, zip(self._nodes, self._caches, total_vars_numb_list)) parent_sets = pool.starmap(ctpc_algo, zip(self._nodes, self._nodes_vals, parents_list,
self._caches, total_vars_numb_list,
nodes_array_list, nodes_indxs_array_list, nodes_vals_array_list,
tests_alfa_dims_list, datas_dims_list))
self._shm_times.close()
self._shm_times.unlink()
self._shm_trajectories.close()
self._shm_trajectories.unlink()
self._result_graph = self.build_result_graph(self._nodes, parent_sets) self._result_graph = self.build_result_graph(self._nodes, parent_sets)
def save_results(self) -> None: def save_results(self) -> None:

@ -83,6 +83,8 @@ class TestStructureEstimator(unittest.TestCase):
lp_wrapper = lp(se1.ctpc_algorithm) lp_wrapper = lp(se1.ctpc_algorithm)
lp_wrapper(MULTI_PROCESSING) lp_wrapper(MULTI_PROCESSING)
lp.print_stats() lp.print_stats()
#paralell_time = timeit.timeit(se1.ctpc_algorithm, MULTI_PROCESSING, number=1)
#print("EXEC TIME:", paralell_time)
print(se1._result_graph.edges) print(se1._result_graph.edges)
print(self.s1.structure.edges) print(self.s1.structure.edges)
for ed in self.s1.structure.edges: for ed in self.s1.structure.edges: