parent
0303cbe786
commit
0112859665
@ -1,4 +1,5 @@ |
||||
__pycache__ |
||||
.vscode |
||||
**/__pycache__ |
||||
**/data |
||||
**/data |
||||
**/results_data |
@ -0,0 +1,121 @@ |
||||
|
||||
import itertools |
||||
import json |
||||
import typing |
||||
|
||||
import networkx as nx |
||||
import numpy as np |
||||
from networkx.readwrite import json_graph |
||||
|
||||
from math import log |
||||
|
||||
from scipy.special import gamma |
||||
from random import choice |
||||
|
||||
import set_of_cims as soCims |
||||
import network_graph as net_graph |
||||
import conditional_intensity_matrix as cim_class |
||||
|
||||
|
||||
''' |
||||
TODO: Parlare dell'idea di ciclare sulle cim senza filtrare |
||||
TODO: Parlare di gamma in scipy e math(overflow) |
||||
TODO: Problema warning overflow |
||||
''' |
||||
|
||||
class FamScoreCalculator: |
||||
""" |
||||
Has the task of calculate the FamScore of a node |
||||
""" |
||||
|
||||
def __init__(self): |
||||
pass |
||||
|
||||
def marginal_likelihood_theta(self, |
||||
node_id: str, |
||||
set_of_cims: soCims.SetOfCims, |
||||
graph:net_graph.NetworkGraph): |
||||
""" |
||||
calculate the value of the marginal likelihood over theta of the node identified by the label node_id |
||||
Parameters: |
||||
node_id: the label of the node |
||||
Returns: |
||||
the value of the marginal likelihood over theta |
||||
""" |
||||
return 2 |
||||
|
||||
def marginal_likelihood_q(self, |
||||
cims: np.array, |
||||
tau_xu:float = 1, |
||||
alpha_xu:float = 1): |
||||
""" |
||||
calculate the value of the marginal likelihood over q of the node identified by the label node_id |
||||
Parameters: |
||||
cims: np.array with all the node's cims, |
||||
tau_xu: hyperparameter over the CTBN’s q parameters |
||||
alpha_xu: hyperparameter over the CTBN’s q parameters |
||||
Returns: |
||||
the value of the marginal likelihood over q |
||||
""" |
||||
return np.prod([self.variable_cim_xu_marginal_likelihood_q(cim,tau_xu,alpha_xu) for cim in cims]) |
||||
|
||||
def variable_cim_xu_marginal_likelihood_q(self, |
||||
cim:cim_class.ConditionalIntensityMatrix, |
||||
tau_xu:float = 1, |
||||
alpha_xu:float = 1): |
||||
'get cim length' |
||||
values=len(cim.state_residence_times) |
||||
|
||||
'compute the marginal likelihood for the current cim' |
||||
return np.prod([ |
||||
self.single_cim_xu_marginal_likelihood_q( |
||||
cim.state_transition_matrix[index,index], |
||||
cim.state_residence_times[index], |
||||
tau_xu, |
||||
alpha_xu) |
||||
for index in range(values)]) |
||||
|
||||
|
||||
def single_cim_xu_marginal_likelihood_q(self, |
||||
M_suff_stats:float, |
||||
T_suff_stats:float, |
||||
tau_xu:float = 1, |
||||
alpha_xu:float = 1): |
||||
""" |
||||
calculate the marginal likelihood of the node when assumes a specif value |
||||
and a specif parents's assignment |
||||
Parameters: |
||||
cims: np.array with all the node's cims, |
||||
tau_xu: hyperparameter over the CTBN’s q parameters |
||||
alpha_xu: hyperparameter over the CTBN’s q parameters |
||||
Returns: |
||||
the marginal likelihood of the node when assumes a specif value |
||||
""" |
||||
print(M_suff_stats) |
||||
return (gamma(alpha_xu + M_suff_stats + 1)* (tau_xu**(alpha_xu+1))) \ |
||||
/ \ |
||||
(gamma(alpha_xu + 1)*((tau_xu + T_suff_stats)**(alpha_xu + M_suff_stats + 1))) |
||||
|
||||
|
||||
def get_fam_score(self, |
||||
cims: np.array, |
||||
tau_xu:float = 1, |
||||
alpha_xu:float = 1, |
||||
alpha_xxu:float = 1): |
||||
""" |
||||
calculate the FamScore value of the node identified by the label node_id |
||||
Parameters: |
||||
cims: np.array with all the node's cims, |
||||
tau_xu: hyperparameter over the CTBN’s q parameters |
||||
alpha_xu: hyperparameter over the CTBN’s q parameters |
||||
alpha_xxu: hyperparameter over the CTBN’s theta parameters |
||||
Returns: |
||||
the FamScore value of the node |
||||
""" |
||||
return log( |
||||
self.marginal_likelihood_q(cims,tau_xu,alpha_xu) |
||||
) \ |
||||
+ \ |
||||
log( |
||||
self.marginal_likelihood_theta(cims,tau_xu,alpha_xu,alpha_xxu) |
||||
) |
@ -0,0 +1,168 @@ |
||||
|
||||
import itertools |
||||
import json |
||||
import typing |
||||
|
||||
import networkx as nx |
||||
import numpy as np |
||||
from networkx.readwrite import json_graph |
||||
|
||||
from random import choice |
||||
|
||||
import cache as ch |
||||
import conditional_intensity_matrix as condim |
||||
import network_graph as ng |
||||
import parameters_estimator as pe |
||||
import sample_path as sp |
||||
import structure as st |
||||
import fam_score_calculator as fam_score |
||||
|
||||
|
||||
''' |
||||
#TODO: Insert maximum number of parents |
||||
#TODO: Evaluate if it's better to start from a complete or an empty graph |
||||
#TODO: Create a parent class StructureEstimator and Two Subclasses (Score-Based and Constraint-Based) |
||||
''' |
||||
|
||||
class StructureScoreBasedEstimator: |
||||
""" |
||||
Has the task of estimating the network structure given the trajectories in samplepath by |
||||
using a score based approach. |
||||
|
||||
:sample_path: the sample_path object containing the trajectories and the real structure |
||||
|
||||
:nodes: the nodes labels |
||||
:nodes_vals: the nodes cardinalities |
||||
:nodes_indxs: the nodes indexes |
||||
:complete_graph: the complete directed graph built using the nodes labels in nodes |
||||
:cache: the cache object |
||||
""" |
||||
|
||||
def __init__(self, sample_path: sp.SamplePath): |
||||
self.sample_path = sample_path |
||||
self.nodes = np.array(self.sample_path.structure.nodes_labels) |
||||
self.nodes_vals = self.sample_path.structure.nodes_values |
||||
self.nodes_indxs = self.sample_path.structure.nodes_indexes |
||||
self.complete_graph = self.build_complete_graph(self.sample_path.structure.nodes_labels) |
||||
self.cache = ch.Cache() |
||||
|
||||
def build_complete_graph(self, node_ids: typing.List): |
||||
""" |
||||
Builds a complete directed graph (no self loops) given the nodes labels in the list node_ids: |
||||
|
||||
Parameters: |
||||
node_ids: the list of nodes labels |
||||
Returns: |
||||
a complete Digraph Object |
||||
""" |
||||
complete_graph = nx.DiGraph() |
||||
complete_graph.add_nodes_from(node_ids) |
||||
complete_graph.add_edges_from(itertools.permutations(node_ids, 2)) |
||||
return complete_graph |
||||
|
||||
|
||||
|
||||
def estimate_structure(self): |
||||
""" |
||||
Compute the score-based algorithm to find the optimal structure |
||||
|
||||
Parameters: |
||||
node_id: the label of the node |
||||
Returns: |
||||
void |
||||
|
||||
""" |
||||
estimate_parents = self.estimate_parents |
||||
'Estimate the best parents for each node' |
||||
#[estimate_parents(n) for n in self.nodes] |
||||
estimate_parents('X') |
||||
|
||||
def estimate_parents(self,node_id:str): |
||||
""" |
||||
Use the FamScore of a node in order to find the best parent nodes |
||||
|
||||
Parameters: |
||||
void |
||||
Returns: |
||||
void |
||||
|
||||
""" |
||||
'Create the graph for the single node' |
||||
graph = ng.NetworkGraph(self.sample_path.structure) |
||||
'inizialize the graph for a single node' |
||||
graph.fast_init(node_id) |
||||
|
||||
params_estimation = pe.ParametersEstimator(self.sample_path, graph) |
||||
|
||||
'Inizialize and compute parameters for node' |
||||
params_estimation.fast_init(node_id) |
||||
SoCims = params_estimation.compute_parameters_for_node(node_id) |
||||
|
||||
'Get the node\'s parents list' |
||||
parents = graph.get_parents_by_id(node_id) |
||||
|
||||
values = graph.get_states_number(parents[0]) |
||||
|
||||
print(f" actual_cims {len(SoCims.actual_cims)} padri {len(parents)} ") |
||||
|
||||
fam_score_obj = fam_score.FamScoreCalculator() |
||||
|
||||
score = fam_score_obj.get_fam_score(SoCims.actual_cims) |
||||
|
||||
'''mask = np.array([True,True]) |
||||
|
||||
cims = SoCims.filter_cims_with_mask(mask,[1,1]) |
||||
|
||||
# print(f"-----{len(SoCims.transition_matrices)}-------") |
||||
print(f"{cims[0].state_transition_matrix}") |
||||
|
||||
cims = SoCims.filter_cims_with_mask(mask,[0,0]) |
||||
|
||||
print(f"---parents {len(parents)}---------") |
||||
print(f"{cims[0].state_transition_matrix}") |
||||
''' |
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def generate_possible_sub_sets_of_size(self, u: typing.List, size: int, parent_label: str): |
||||
""" |
||||
Creates a list containing all possible subsets of the list u of size size, |
||||
that do not contains a the node identified by parent_label. |
||||
|
||||
Parameters: |
||||
u: the list of nodes |
||||
size: the size of the subsets |
||||
parent_label: the nodes to exclude in the subsets generation |
||||
Returns: |
||||
a Map Object containing a list of lists |
||||
|
||||
""" |
||||
list_without_test_parent = u[:] |
||||
list_without_test_parent.remove(parent_label) |
||||
return map(list, itertools.combinations(list_without_test_parent, size)) |
||||
|
||||
def save_results(self): |
||||
""" |
||||
Save the estimated Structure to a .json file |
||||
|
||||
Parameters: |
||||
void |
||||
Returns: |
||||
void |
||||
""" |
||||
res = json_graph.node_link_data(self.complete_graph) |
||||
name = self.sample_path.importer.file_path.rsplit('/',1)[-1] |
||||
#print(name) |
||||
name = 'results_' + name |
||||
with open(name, 'w+') as f: |
||||
json.dump(res, f) |
||||
|
||||
|
||||
def remove_diagonal_elements(self, matrix): |
||||
m = matrix.shape[0] |
||||
strided = np.lib.stride_tricks.as_strided |
||||
s0, s1 = matrix.strides |
||||
return strided(matrix.ravel()[1:], shape=(m - 1, m), strides=(s0 + s1, s1)).reshape(m, -1) |
||||
|
@ -0,0 +1,63 @@ |
||||
import sys |
||||
sys.path.append("../classes/") |
||||
import glob |
||||
import math |
||||
import os |
||||
import unittest |
||||
|
||||
import networkx as nx |
||||
import numpy as np |
||||
import psutil |
||||
from line_profiler import LineProfiler |
||||
|
||||
import cache as ch |
||||
import sample_path as sp |
||||
import structure_score_based_estimator as se |
||||
import json_importer as ji |
||||
|
||||
|
||||
|
||||
class TestStructureScoreBasedEstimator(unittest.TestCase): |
||||
|
||||
@classmethod |
||||
def setUpClass(cls): |
||||
cls.read_files = glob.glob(os.path.join('../data', "*.json")) |
||||
cls.importer = ji.JsonImporter(cls.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name') |
||||
cls.s1 = sp.SamplePath(cls.importer) |
||||
cls.s1.build_trajectories() |
||||
cls.s1.build_structure() |
||||
|
||||
|
||||
|
||||
|
||||
def test_esecuzione(self): |
||||
se1 = se.StructureScoreBasedEstimator(self.s1) |
||||
se1.estimate_structure() |
||||
|
||||
if __name__ == '__main__': |
||||
unittest.main() |
||||
|
||||
''' |
||||
|
||||
def test_init(self): |
||||
exp_alfa = 0.1 |
||||
chi_alfa = 0.1 |
||||
se1 = se.StructureScoreBasedEstimator(self.s1) |
||||
self.assertEqual(self.s1, se1.sample_path) |
||||
self.assertTrue(np.array_equal(se1.nodes, np.array(self.s1.structure.nodes_labels))) |
||||
self.assertTrue(np.array_equal(se1.nodes_indxs, self.s1.structure.nodes_indexes)) |
||||
self.assertTrue(np.array_equal(se1.nodes_vals, self.s1.structure.nodes_values)) |
||||
self.assertIsInstance(se1.complete_graph, nx.DiGraph) |
||||
self.assertIsInstance(se1.cache, ch.Cache) |
||||
|
||||
def test_build_complete_graph(self): |
||||
nodes_numb = len(self.s1.structure.nodes_labels) |
||||
se1 = se.StructureScoreBasedEstimator(self.s1) |
||||
cg = se1.build_complete_graph(self.s1.structure.nodes_labels) |
||||
self.assertEqual(len(cg.edges), nodes_numb*(nodes_numb - 1)) |
||||
''for node in self.s1.structure.nodes_labels: |
||||
no_self_loops = self.s1.structure.nodes_labels[:] |
||||
no_self_loops.remove(node) |
||||
for n2 in no_self_loops: |
||||
self.assertIn((node, n2), cg.edges)'' |
||||
''' |
Reference in new issue