1
0
Fork 0

Fixed alpha_xx' bug

master
Luca Moretti 4 years ago
parent 1b11de4503
commit 6f41a207e1
  1. 16
      main_package/classes/estimators/fam_score_calculator.py
  2. 20
      main_package/classes/estimators/structure_score_based_estimator.py
  3. 3
      main_package/tests/estimators/test_parameters_estimator.py
  4. 43
      main_package/tests/estimators/test_structure_score_based_estimator.py

@ -143,6 +143,7 @@ class FamScoreCalculator:
Returns:
the value of the marginal likelihood over q
"""
return np.sum([self.variable_cim_xu_marginal_likelihood_q(cim, tau_xu, alpha_xu) for cim in cims])
def variable_cim_xu_marginal_likelihood_q(self,
@ -206,18 +207,27 @@ class FamScoreCalculator:
def get_fam_score(self,
cims: np.array,
tau_xu: float=1,
alpha_xu: float=1,
alpha_xxu: float=1):
alpha_xu: float=1):
"""
calculate the FamScore value of the node identified by the label node_id
Parameters:
cims: np.array with all the node's cims,
tau_xu: hyperparameter over the CTBNs q parameters
alpha_xu: hyperparameter over the CTBNs q parameters
alpha_xxu: hyperparameter over the CTBNs theta parameters
Returns:
the FamScore value of the node
"""
#print("------")
#print(self.marginal_likelihood_q(cims,
# tau_xu,
# alpha_xu))
#print(self.marginal_likelihood_theta(cims,
# alpha_xu,
# alpha_xxu))
'calculate alpha_xxu as a uniform distribution'
alpha_xxu = alpha_xu /(len(cims[0].state_residence_times) - 1)
return self.marginal_likelihood_q(cims,
tau_xu,
alpha_xu) \

@ -47,7 +47,7 @@ class StructureScoreBasedEstimator(se.StructureEstimator):
super().__init__(sample_path)
@timing
def estimate_structure(self, max_parents:int = None, iterations_number:int= 40, patience:int = None ):
"""
Compute the score-based algorithm to find the optimal structure
@ -79,19 +79,22 @@ class StructureScoreBasedEstimator(se.StructureEstimator):
'get the number of CPU'
cpu_count = multiprocessing.cpu_count()
#cpu_count = 1
'Estimate the best parents for each node'
with multiprocessing.Pool(processes=cpu_count) as pool:
list_edges_partial = pool.starmap(estimate_parents, zip(self.nodes,l_max_parents,l_iterations_number,l_patience))
#list_edges_partial = [estimate_parents(n,max_parents,iterations_number,patience) for n in self.nodes]
#list_edges_partial = p.map(estimate_parents, self.nodes)
#list_edges_partial= estimate_parents('Y',max_parents,iterations_number,patience)
'Concatenate all the edges list'
set_list_edges = set(itertools.chain.from_iterable(list_edges_partial))
print('-------------------------')
#print('-------------------------')
'TODO: Pensare a un modo migliore -- set difference sembra non funzionare '
'calculate precision and recall'
n_missing_edges = 0
n_added_fake_edges = 0
@ -117,12 +120,14 @@ class StructureScoreBasedEstimator(se.StructureEstimator):
# print(true_edge)
print(f"n archi reali non trovati: {n_missing_edges}")
print(f"n archi non reali aggiunti: {n_added_fake_edges}")
# print(f"n archi reali non trovati: {n_missing_edges}")
# print(f"n archi non reali aggiunti: {n_added_fake_edges}")
print(true_edges)
print(set_list_edges)
print(f"precision: {precision} ")
print(f"recall: {recall} ")
return set_list_edges
def estimate_parents(self,node_id:str, max_parents:int = None, iterations_number:int= 40, patience:int = 10 ):
@ -163,9 +168,10 @@ class StructureScoreBasedEstimator(se.StructureEstimator):
graph.remove_edges([parent_removed])
graph.add_edges([current_edge])
added = True
#print('**************************')
current_score = self.get_score_from_graph(graph,node_id)
if current_score > actual_best_score:
'update current best score'
actual_best_score = current_score

@ -17,7 +17,7 @@ class TestParametersEstimatior(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.read_files = glob.glob(os.path.join('../../data', "*.json"))
cls.importer = ji.JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
cls.importer = ji.JsonImporter("../../data/networks_and_trajectories_ternary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
cls.s1 = sp.SamplePath(cls.importer)
cls.s1.build_trajectories()
cls.s1.build_structure()
@ -45,6 +45,7 @@ class TestParametersEstimatior(unittest.TestCase):
sofc1 = p1.compute_parameters_for_node(node)
sampled_cims = self.aux_import_sampled_cims('dyn.cims')
sc = list(sampled_cims.values())
print(sampled_cims.values())
#print(sc[indx])
self.equality_of_cims_of_node(sc[indx], sofc1.actual_cims)

@ -9,6 +9,7 @@ import networkx as nx
import numpy as np
import psutil
from line_profiler import LineProfiler
import copy
import utility.cache as ch
import structure_graph.sample_path as sp
@ -22,45 +23,29 @@ class TestStructureScoreBasedEstimator(unittest.TestCase):
@classmethod
def setUpClass(cls):
#cls.read_files = glob.glob(os.path.join('../../data', "*.json"))
cls.importer = ji.JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
cls.importer = ji.JsonImporter("../../data/networks_and_trajectories_quaternary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
cls.s1 = sp.SamplePath(cls.importer)
cls.s1.build_trajectories()
cls.s1.build_structure()
def test_esecuzione(self):
def test_structure(self):
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
se1 = se.StructureScoreBasedEstimator(self.s1)
se1.estimate_structure(
max_parents = 6,
iterations_number = 80,
edges = se1.estimate_structure(
max_parents = None,
iterations_number = 100,
patience = None
)
if __name__ == '__main__':
unittest.main()
self.assertEqual(edges, true_edges)
'''
def test_init(self):
exp_alfa = 0.1
chi_alfa = 0.1
se1 = se.StructureScoreBasedEstimator(self.s1)
self.assertEqual(self.s1, se1.sample_path)
self.assertTrue(np.array_equal(se1.nodes, np.array(self.s1.structure.nodes_labels)))
self.assertTrue(np.array_equal(se1.nodes_indxs, self.s1.structure.nodes_indexes))
self.assertTrue(np.array_equal(se1.nodes_vals, self.s1.structure.nodes_values))
self.assertIsInstance(se1.complete_graph, nx.DiGraph)
self.assertIsInstance(se1.cache, ch.Cache)
if __name__ == '__main__':
unittest.main()
def test_build_complete_graph(self):
nodes_numb = len(self.s1.structure.nodes_labels)
se1 = se.StructureScoreBasedEstimator(self.s1)
cg = se1.build_complete_graph(self.s1.structure.nodes_labels)
self.assertEqual(len(cg.edges), nodes_numb*(nodes_numb - 1))
''for node in self.s1.structure.nodes_labels:
no_self_loops = self.s1.structure.nodes_labels[:]
no_self_loops.remove(node)
for n2 in no_self_loops:
self.assertIn((node, n2), cg.edges)''
'''