1
0
Fork 0

Improved test coverage to 98%

master
Luca Moretti 4 years ago
parent 34dbbd8bf0
commit 01f53c5607
  1. BIN
      .coverage
  2. 3
      .coveragerc
  3. 1
      .gitignore
  4. BIN
      CTBN_Diagramma_Dominio.pdf
  5. BIN
      CTBN_project_dominio.pdf
  6. 16
      PyCTBN/PyCTBN/__init__.py
  7. 26
      PyCTBN/PyCTBN/estimators/parameters_estimator.py
  8. 7
      PyCTBN/PyCTBN/estimators/structure_constraint_based_estimator.py
  9. 86
      PyCTBN/PyCTBN/estimators/structure_estimator.py
  10. 5
      PyCTBN/PyCTBN/estimators/structure_score_based_estimator.py
  11. 2
      PyCTBN/PyCTBN/optimizers/tabu_search.py
  12. 18
      PyCTBN/PyCTBN/structure_graph/network_graph.py
  13. 6
      PyCTBN/PyCTBN/structure_graph/structure.py
  14. 9
      PyCTBN/PyCTBN/utility/sample_importer.py
  15. 0
      PyCTBN/__init__.py
  16. BIN
      PyCTBN/test_data/networks_and_trajectories_binary_data_01_3.json
  17. BIN
      PyCTBN/test_data/networks_and_trajectories_binary_data_02_10_1.json
  18. BIN
      PyCTBN/test_data/networks_and_trajectories_ternary_data_01_6_1.json
  19. 0
      PyCTBN/tests/estimators/__init__.py
  20. 118
      PyCTBN/tests/estimators/test_structure_constraint_based_estimator.py
  21. 59
      PyCTBN/tests/estimators/test_structure_constraint_based_estimator_server.py
  22. 118
      PyCTBN/tests/estimators/test_structure_estimator.py
  23. 171
      PyCTBN/tests/estimators/test_structure_score_based_estimator.py
  24. 79
      PyCTBN/tests/estimators/test_structure_score_based_estimator_server.py
  25. 0
      PyCTBN/tests/optimizers/__init__.py
  26. 66
      PyCTBN/tests/optimizers/test_hill_climbing_search.py
  27. 83
      PyCTBN/tests/optimizers/test_tabu_search.py
  28. 0
      PyCTBN/tests/structure_graph/__init__.py
  29. 13
      PyCTBN/tests/structure_graph/test_sample_path.py
  30. 10
      PyCTBN/tests/structure_graph/test_structure.py
  31. 5
      PyCTBN/tests/structure_graph/test_trajectory.py
  32. 0
      PyCTBN/tests/utility/__init__.py
  33. 2
      PyCTBN/tests/utility/test_json_importer.py
  34. 4
      PyCTBN/tests/utility/test_sample_importer.py
  35. 0
      basic_main.py
  36. 2219
      coverage copy.xml
  37. 1207
      coverage.xml
  38. 0
      setup.py

Binary file not shown.

@ -0,0 +1,3 @@
[run]
omit =
*/tests/*

1
.gitignore vendored

@ -6,3 +6,4 @@ __pycache__
**/dist
**/results_data
**/.scannerwork
**/build

Binary file not shown.

Binary file not shown.

@ -1,8 +1,8 @@
import PyCTBN.estimators
from PyCTBN.estimators import *
import PyCTBN.optimizers
from PyCTBN.optimizers import *
import PyCTBN.structure_graph
from PyCTBN.structure_graph import *
import PyCTBN.utility
from PyCTBN.utility import *
import PyCTBN.PyCTBN.estimators
from PyCTBN.PyCTBN.estimators import *
import PyCTBN.PyCTBN.optimizers
from PyCTBN.PyCTBN.optimizers import *
import PyCTBN.PyCTBN.structure_graph
from PyCTBN.PyCTBN.structure_graph import *
import PyCTBN.PyCTBN.utility
from PyCTBN.PyCTBN.utility import *

@ -107,32 +107,6 @@ class ParametersEstimator(object):
M_raveled[diag_indices] = 0
M_raveled[diag_indices] = np.sum(M, axis=2).ravel()
def init_sets_cims_container(self):
self.sets_of_cims_struct = acims.SetsOfCimsContainer(self.net_graph.nodes,
self.net_graph.nodes_values,
self.net_graph.get_ordered_by_indx_parents_values_for_all_nodes(),
self.net_graph.p_combs)
def compute_parameters(self):
#print(self.net_graph.get_nodes())
#print(self.amalgamated_cims_struct.sets_of_cims)
#enumerate(zip(self.net_graph.get_nodes(), self.amalgamated_cims_struct.sets_of_cims))
for indx, aggr in enumerate(zip(self.net_graph.nodes, self.sets_of_cims_struct.sets_of_cims)):
#print(self.net_graph.time_filtering[indx])
#print(self.net_graph.time_scalar_indexing_strucure[indx])
self.compute_state_res_time_for_node(self.net_graph.get_node_indx(aggr[0]), self.sample_path.trajectories.times,
self.sample_path.trajectories.trajectory,
self.net_graph.time_filtering[indx],
self.net_graph.time_scalar_indexing_strucure[indx],
aggr[1]._state_residence_times)
#print(self.net_graph.transition_filtering[indx])
#print(self.net_graph.transition_scalar_indexing_structure[indx])
self.compute_state_transitions_for_a_node(self.net_graph.get_node_indx(aggr[0]),
self.sample_path.trajectories.complete_trajectory,
self.net_graph.transition_filtering[indx],
self.net_graph.transition_scalar_indexing_structure[indx],
aggr[1]._transition_matrices)
aggr[1].build_cims(aggr[1]._state_residence_times, aggr[1]._transition_matrices)

@ -227,7 +227,12 @@ class StructureConstraintBasedEstimator(StructureEstimator):
total_vars_numb_array)
#list_edges_partial = [ctpc_algo(n,total_vars_numb) for n in self._nodes]
return set(itertools.chain.from_iterable(list_edges_partial))
'Update the graph'
edges = set(itertools.chain.from_iterable(list_edges_partial))
self._complete_graph = nx.DiGraph()
self._complete_graph.add_edges_from(edges)
return edges
def estimate_structure(self,disable_multiprocessing:bool=False):

@ -9,7 +9,7 @@ import numpy as np
from networkx.readwrite import json_graph
from abc import ABC
import os
import abc
from ..utility.cache import Cache
@ -104,11 +104,11 @@ class StructureEstimator(object):
json.dump(res, f)
def remove_diagonal_elements(self, matrix):
m = matrix.shape[0]
strided = np.lib.stride_tricks.as_strided
s0, s1 = matrix.strides
return strided(matrix.ravel()[1:], shape=(m - 1, m), strides=(s0 + s1, s1)).reshape(m, -1)
#def remove_diagonal_elements(self, matrix):
# m = matrix.shape[0]
# strided = np.lib.stride_tricks.as_strided
# s0, s1 = matrix.strides
# return strided(matrix.ravel()[1:], shape=(m - 1, m), strides=(s0 + s1, s1)).reshape(m, -1)
@abc.abstractmethod
@ -137,49 +137,47 @@ class StructureEstimator(object):
:rtype: List
"""
if not self._sample_path.has_prior_net_structure:
raise RuntimeError("Can not compute spurious edges with no prior net structure!")
return []
real_graph = nx.DiGraph()
real_graph.add_nodes_from(self._sample_path.structure.nodes_labels)
real_graph.add_edges_from(self._sample_path.structure.edges)
return nx.difference(real_graph, self._complete_graph).edges
def save_plot_estimated_structure_graph(self) -> None:
"""Plot the estimated structure in a graphical model style.
Spurious edges are colored in red.
"""
graph_to_draw = nx.DiGraph()
spurious_edges = self.spurious_edges()
non_spurious_edges = list(set(self._complete_graph.edges) - set(spurious_edges))
print(non_spurious_edges)
edges_colors = ['red' if edge in spurious_edges else 'black' for edge in self._complete_graph.edges]
graph_to_draw.add_edges_from(spurious_edges)
graph_to_draw.add_edges_from(non_spurious_edges)
pos = nx.spring_layout(graph_to_draw, k=0.5*1/np.sqrt(len(graph_to_draw.nodes())), iterations=50,scale=10)
options = {
"node_size": 2000,
"node_color": "white",
"edgecolors": "black",
'linewidths':2,
"with_labels":True,
"font_size":13,
'connectionstyle': 'arc3, rad = 0.1',
"arrowsize": 15,
"arrowstyle": '<|-',
"width": 1,
"edge_color":edges_colors,
}
nx.draw(graph_to_draw, pos, **options)
ax = plt.gca()
ax.margins(0.20)
plt.axis("off")
name = self._sample_path._importer.file_path.rsplit('/', 1)[-1]
name = name.split('.', 1)[0]
name += '_' + str(self._sample_path._importer.dataset_id())
name += '.png'
plt.savefig(name)
plt.clf()
print("Estimated Structure Plot Saved At: ", os.path.abspath(name))
def save_plot_estimated_structure_graph(self, file_path: str) -> None:
"""Plot the estimated structure in a graphical model style, use .png extension.
Spurious edges are colored in red if a prior structure is present.
:param file_path: path to save the file to
:type: string
"""
graph_to_draw = nx.DiGraph()
spurious_edges = self.spurious_edges()
non_spurious_edges = list(set(self._complete_graph.edges) - set(spurious_edges))
edges_colors = ['red' if edge in spurious_edges else 'black' for edge in self._complete_graph.edges]
graph_to_draw.add_edges_from(spurious_edges)
graph_to_draw.add_edges_from(non_spurious_edges)
pos = nx.spring_layout(graph_to_draw, k=0.5*1/np.sqrt(len(graph_to_draw.nodes())), iterations=50,scale=10)
options = {
"node_size": 2000,
"node_color": "white",
"edgecolors": "black",
'linewidths':2,
"with_labels":True,
"font_size":13,
'connectionstyle': 'arc3, rad = 0.1',
"arrowsize": 15,
"arrowstyle": '<|-',
"width": 1,
"edge_color":edges_colors,
}
nx.draw(graph_to_draw, pos, **options)
ax = plt.gca()
ax.margins(0.20)
plt.axis("off")
plt.savefig(file_path)
plt.clf()
print("Estimated Structure Plot Saved At: ", os.path.abspath(file_path))

@ -156,6 +156,11 @@ class StructureScoreBasedEstimator(StructureEstimator):
except Exception as e:
print(f"errore: {e}")
'Update the graph'
self._complete_graph = nx.DiGraph()
self._complete_graph.add_edges_from(set_list_edges)
return set_list_edges

@ -104,7 +104,7 @@ class TabuSearch(Optimizer):
self.tabu_length = len(other_nodes)
if self.tabu_rules_duration is None:
self.tabu_tabu_rules_durationength = len(other_nodes)
self.tabu_rules_duration = len(other_nodes)
'inizialize the data structures'
tabu_set = set()

@ -36,15 +36,15 @@ class NetworkGraph(object):
self._transition_filtering = None
self._p_combs_structure = None
def init_graph(self):
self.add_nodes(self._nodes_labels)
self.add_edges(self.graph_struct.edges)
self.aggregated_info_about_nodes_parents = self.get_ord_set_of_par_of_all_nodes()
self._fancy_indexing = self.build_fancy_indexing_structure(0)
self.build_scalar_indexing_structures()
self.build_time_columns_filtering_structure()
self.build_transition_columns_filtering_structure()
self._p_combs_structure = self.build_p_combs_structure()
#def init_graph(self):
# self.add_nodes(self._nodes_labels)
# self.add_edges(self.graph_struct.edges)
# self.aggregated_info_about_nodes_parents = self.get_ord_set_of_par_of_all_nodes()
# self._fancy_indexing = self.build_fancy_indexing_structure(0)
# self.build_scalar_indexing_structures()
# self.build_time_columns_filtering_structure()
# self.build_transition_columns_filtering_structure()
# self._p_combs_structure = self.build_p_combs_structure()
def fast_init(self, node_id: str) -> None:
"""Initializes all the necessary structures for parameters estimation of the node identified by the label

@ -74,11 +74,11 @@ class Structure(object):
self._edges_list = list()
def add_edge(self,edge: tuple):
self._edges_list.append(tuple)
print(self._edges_list)
self._edges_list.append(edge)
def remove_edge(self,edge: tuple):
self._edges_list.remove(tuple)
self._edges_list.remove(edge)
def contains_edge(self,edge:tuple) -> bool:
return edge in self._edges_list

@ -32,7 +32,7 @@ class SampleImporter(AbstractImporter):
'If the data are not DataFrame, it will be converted'
if isinstance(variables,list) or isinstance(variables,np.ndarray):
variables = pd.DataFrame(variables)
if isinstance(variables,list) or isinstance(variables,np.ndarray):
if isinstance(prior_net_structure,list) or isinstance(prior_net_structure,np.ndarray):
prior_net_structure=pd.DataFrame(prior_net_structure)
super(SampleImporter, self).__init__(trajectory_list =trajectory_list,
@ -48,9 +48,6 @@ class SampleImporter(AbstractImporter):
samples_list= self._df_samples_list
if isinstance(samples_list, np.ndarray):
samples_list = samples_list.tolist()
self.compute_row_delta_in_all_samples_frames(samples_list)
def build_sorter(self, sample_frame: pd.DataFrame) -> typing.List:
@ -61,5 +58,5 @@ class SampleImporter(AbstractImporter):
return columns_header
def dataset_id(self) -> object:
pass
def dataset_id(self) -> str:
return str("")

@ -50,15 +50,127 @@ class TestStructureConstraintBasedEstimator(unittest.TestCase):
cls.s1.build_trajectories()
cls.s1.build_structure()
def test_structure(self):
def test_structure_1(self):
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
se1 = StructureConstraintBasedEstimator(self.s1,0.1,0.1)
edges = se1.estimate_structure(disable_multiprocessing=False)
edges = se1.estimate_structure(True)
self.assertFalse(se1.spurious_edges())
self.assertEqual(edges, true_edges)
def test_structure_2(self):
with open("./PyCTBN/test_data/networks_and_trajectories_binary_data_02_10_1.json") as f:
raw_data = json.load(f)
trajectory_list_raw= raw_data["samples"]
trajectory_list = [pd.DataFrame(sample) for sample in trajectory_list_raw]
variables= pd.DataFrame(raw_data["variables"])
prior_net_structure = pd.DataFrame(raw_data["dyn.str"])
self.importer = SampleImporter(
trajectory_list=trajectory_list,
variables=variables,
prior_net_structure=prior_net_structure
)
self.importer.import_data()
#cls.s1 = sp.SamplePath(cls.importer)
#cls.traj = cls.s1.concatenated_samples
# print(len(cls.traj))
self.s1 = SamplePath(self.importer)
self.s1.build_trajectories()
self.s1.build_structure()
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
se1 = StructureConstraintBasedEstimator(self.s1,0.1,0.1)
edges = se1.estimate_structure(True)
'calculate precision and recall'
n_missing_edges = 0
n_added_fake_edges = 0
n_added_fake_edges = len(edges.difference(true_edges))
n_missing_edges = len(true_edges.difference(edges))
n_true_positive = len(true_edges) - n_missing_edges
precision = n_true_positive / (n_true_positive + n_added_fake_edges)
recall = n_true_positive / (n_true_positive + n_missing_edges)
self.assertGreaterEqual(precision,0.75)
self.assertGreaterEqual(recall,0.75)
def test_structure_3(self):
with open("./PyCTBN/test_data/networks_and_trajectories_ternary_data_01_6_1.json") as f:
raw_data = json.load(f)
trajectory_list_raw= raw_data["samples"]
trajectory_list = [pd.DataFrame(sample) for sample in trajectory_list_raw]
variables= pd.DataFrame(raw_data["variables"])
prior_net_structure = pd.DataFrame(raw_data["dyn.str"])
self.importer = SampleImporter(
trajectory_list=trajectory_list,
variables=variables,
prior_net_structure=prior_net_structure
)
self.importer.import_data()
#cls.s1 = sp.SamplePath(cls.importer)
#cls.traj = cls.s1.concatenated_samples
# print(len(cls.traj))
self.s1 = SamplePath(self.importer)
self.s1.build_trajectories()
self.s1.build_structure()
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
se1 = StructureConstraintBasedEstimator(self.s1,0.1,0.1)
edges = se1.estimate_structure(True)
'calculate precision and recall'
n_missing_edges = 0
n_added_fake_edges = 0
n_added_fake_edges = len(edges.difference(true_edges))
n_missing_edges = len(true_edges.difference(edges))
n_true_positive = len(true_edges) - n_missing_edges
precision = n_true_positive / (n_true_positive + n_added_fake_edges)
recall = n_true_positive / (n_true_positive + n_missing_edges)
self.assertGreaterEqual(precision,0.75)
self.assertGreaterEqual(recall,0.75)
if __name__ == '__main__':
unittest.main()

@ -1,59 +0,0 @@
import glob
import math
import os
import unittest
import networkx as nx
import numpy as np
import psutil
from line_profiler import LineProfiler
from ...PyCTBN.utility.cache import Cache
from ...PyCTBN.structure_graph.sample_path import SamplePath
from ...PyCTBN.estimators.structure_constraint_based_estimator import StructureConstraintBasedEstimator
from ...PyCTBN.utility.json_importer import JsonImporter
from multiprocessing import set_start_method
import copy
class TestStructureConstraintBasedEstimator(unittest.TestCase):
@classmethod
def setUpClass(cls):
pass
def test_structure(self):
#cls.read_files = glob.glob(os.path.join('../../data', "*.json"))
self.importer = JsonImporter("./PyCTBN/test_data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
self.s1 = SamplePath(self.importer)
self.s1.build_trajectories()
self.s1.build_structure()
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
se1 = StructureConstraintBasedEstimator(self.s1,0.1,0.1)
edges = se1.estimate_structure(disable_multiprocessing=False)
self.importer = JsonImporter("./PyCTBN/test_data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
self.s1 = SamplePath(self.importer)
self.s1.build_trajectories()
self.s1.build_structure()
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
se1 = StructureConstraintBasedEstimator(self.s1,0.1,0.1)
edges = se1.estimate_structure(disable_multiprocessing=True)
self.assertEqual(edges, true_edges)
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,118 @@
import glob
import math
import os
import unittest
import json
import networkx as nx
import numpy as np
import timeit
from ...PyCTBN.utility.cache import Cache
from ...PyCTBN.structure_graph.sample_path import SamplePath
from ...PyCTBN.estimators.structure_constraint_based_estimator import StructureConstraintBasedEstimator
from ...PyCTBN.utility.json_importer import JsonImporter
class TestStructureEstimator(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.read_files = glob.glob(os.path.join('./PyCTBN/test_data', "*.json"))
cls.importer = JsonImporter('./PyCTBN/test_data/networks_and_trajectories_binary_data_01_3.json', 'samples', 'dyn.str', 'variables', 'Time', 'Name')
cls.importer.import_data(0)
cls.s1 = SamplePath(cls.importer)
cls.s1.build_trajectories()
cls.s1.build_structure()
def test_init(self):
exp_alfa = 0.1
chi_alfa = 0.1
se1 = StructureConstraintBasedEstimator(self.s1, exp_alfa, chi_alfa)
self.assertEqual(self.s1, se1._sample_path)
self.assertTrue(np.array_equal(se1._nodes, np.array(self.s1.structure.nodes_labels)))
self.assertTrue(np.array_equal(se1._nodes_indxs, self.s1.structure.nodes_indexes))
self.assertTrue(np.array_equal(se1._nodes_vals, self.s1.structure.nodes_values))
self.assertEqual(se1._exp_test_sign, exp_alfa)
self.assertEqual(se1._chi_test_alfa, chi_alfa)
self.assertIsInstance(se1._complete_graph, nx.DiGraph)
self.assertIsInstance(se1._cache, Cache)
def test_build_complete_graph(self):
exp_alfa = 0.1
chi_alfa = 0.1
nodes_numb = len(self.s1.structure.nodes_labels)
se1 = StructureConstraintBasedEstimator(self.s1, exp_alfa, chi_alfa)
cg = se1.build_complete_graph(self.s1.structure.nodes_labels)
self.assertEqual(len(cg.edges), nodes_numb*(nodes_numb - 1))
for node in self.s1.structure.nodes_labels:
no_self_loops = self.s1.structure.nodes_labels[:]
no_self_loops.remove(node)
for n2 in no_self_loops:
self.assertIn((node, n2), cg.edges)
#se1.save_plot_estimated_structure_graph()
def test_build_removable_edges_matrix(self):
exp_alfa = 0.1
chi_alfa = 0.1
known_edges = self.s1.structure.edges[0:2]
se1 = StructureConstraintBasedEstimator(self.s1, exp_alfa, chi_alfa, known_edges)
for edge in known_edges:
i = self.s1.structure.get_node_indx(edge[0])
j = self.s1.structure.get_node_indx(edge[1])
self.assertFalse(se1._removable_edges_matrix[i][j])
def test_generate_possible_sub_sets_of_size(self):
exp_alfa = 0.1
chi_alfa = 0.1
nodes_numb = len(self.s1.structure.nodes_labels)
se1 = StructureConstraintBasedEstimator(self.s1, exp_alfa, chi_alfa)
for node in self.s1.structure.nodes_labels:
for b in range(nodes_numb):
sets = StructureConstraintBasedEstimator.generate_possible_sub_sets_of_size(self.s1.structure.nodes_labels, b, node)
sets2 = StructureConstraintBasedEstimator.generate_possible_sub_sets_of_size(self.s1.structure.nodes_labels, b, node)
self.assertEqual(len(list(sets)), math.floor(math.factorial(nodes_numb - 1) /
(math.factorial(b)*math.factorial(nodes_numb -1 - b))))
for sset in sets2:
self.assertFalse(node in sset)
def test_time(self):
known_edges = []
se1 = StructureConstraintBasedEstimator(self.s1, 0.1, 0.1, known_edges,25)
exec_time = timeit.timeit(se1.ctpc_algorithm, number=1)
print("Execution Time: ", exec_time)
for ed in self.s1.structure.edges:
self.assertIn(tuple(ed), se1._complete_graph.edges)
#print("Spurious Edges:", se1.spurious_edges())
#se1.save_plot_estimated_structure_graph()
def test_save_results(self):
se1 = StructureConstraintBasedEstimator(self.s1, 0.1, 0.1)
se1.ctpc_algorithm()
se1.save_results()
name = self.s1._importer.file_path.rsplit('/', 1)[-1]
name = name.split('.', 1)[0]
name += '_' + str(self.s1._importer.dataset_id())
name += '.json'
file_name = 'results_' + name
with open(file_name) as f:
js_graph = json.load(f)
result_graph = nx.json_graph.node_link_graph(js_graph)
self.assertFalse(nx.difference(se1._complete_graph, result_graph).edges)
os.remove(file_name)
def test_adjacency_matrix(self):
se1 = StructureConstraintBasedEstimator(self.s1, 0.1, 0.1)
se1.ctpc_algorithm()
adj_matrix = nx.adj_matrix(se1._complete_graph).toarray().astype(bool)
self.assertTrue(np.array_equal(adj_matrix, se1.adjacency_matrix()))
def test_save_plot_estimated_graph(self):
se1 = StructureConstraintBasedEstimator(self.s1, 0.1, 0.1)
edges = se1.estimate_structure(disable_multiprocessing=True)
se1.save_plot_estimated_structure_graph('./networks_and_trajectories_ternary_data_3.png')
if __name__ == '__main__':
unittest.main()

@ -1,5 +1,4 @@
import sys
sys.path.append("../../PyCTBN/")
import glob
import math
import os
@ -54,14 +53,38 @@ class TestStructureScoreBasedEstimator(unittest.TestCase):
cls.s1.build_trajectories()
cls.s1.build_structure()
def test_structure_monoprocesso(self):
with open("./PyCTBN/test_data/networks_and_trajectories_binary_data_01_3.json") as f:
raw_data = json.load(f)
trajectory_list_raw= raw_data[0]["samples"]
trajectory_list = [pd.DataFrame(sample) for sample in trajectory_list_raw]
variables= pd.DataFrame(raw_data[0]["variables"])
prior_net_structure = pd.DataFrame(raw_data[0]["dyn.str"])
self.importer = SampleImporter(
trajectory_list=trajectory_list,
variables=variables,
prior_net_structure=prior_net_structure
)
self.importer.import_data()
#cls.s1 = sp.SamplePath(cls.importer)
#cls.traj = cls.s1.concatenated_samples
def test_structure(self):
# print(len(cls.traj))
self.s1 = SamplePath(self.importer)
self.s1.build_trajectories()
self.s1.build_structure()
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
se1 = StructureScoreBasedEstimator(self.s1,known_edges = [('X','Q')])
se1 = StructureScoreBasedEstimator(self.s1)
edges = se1.estimate_structure(
max_parents = None,
iterations_number = 100,
@ -75,6 +98,146 @@ class TestStructureScoreBasedEstimator(unittest.TestCase):
self.assertEqual(edges, true_edges)
def test_structure_1(self):
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
se1 = StructureScoreBasedEstimator(self.s1)
edges = se1.estimate_structure(
max_parents = None,
iterations_number = 100,
patience = 35,
tabu_length = 15,
tabu_rules_duration = 15,
optimizer = 'hill',
disable_multiprocessing=False
)
self.assertEqual(edges, true_edges)
def test_structure_2(self):
with open("./PyCTBN/test_data/networks_and_trajectories_binary_data_02_10_1.json") as f:
raw_data = json.load(f)
trajectory_list_raw= raw_data["samples"]
trajectory_list = [pd.DataFrame(sample) for sample in trajectory_list_raw]
variables= pd.DataFrame(raw_data["variables"])
prior_net_structure = pd.DataFrame(raw_data["dyn.str"])
self.importer = SampleImporter(
trajectory_list=trajectory_list,
variables=variables,
prior_net_structure=prior_net_structure
)
self.importer.import_data()
#cls.s1 = sp.SamplePath(cls.importer)
#cls.traj = cls.s1.concatenated_samples
# print(len(cls.traj))
self.s1 = SamplePath(self.importer)
self.s1.build_trajectories()
self.s1.build_structure()
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
se1 = StructureScoreBasedEstimator(self.s1)
edges = se1.estimate_structure(
max_parents = None,
iterations_number = 100,
patience = 35,
tabu_length = 15,
tabu_rules_duration = 15,
optimizer = 'hill',
disable_multiprocessing=True
)
'calculate precision and recall'
n_missing_edges = 0
n_added_fake_edges = 0
n_added_fake_edges = len(edges.difference(true_edges))
n_missing_edges = len(true_edges.difference(edges))
n_true_positive = len(true_edges) - n_missing_edges
precision = n_true_positive / (n_true_positive + n_added_fake_edges)
recall = n_true_positive / (n_true_positive + n_missing_edges)
self.assertGreaterEqual(precision,0.75)
self.assertGreaterEqual(recall,0.75)
def test_structure_3(self):
with open("./PyCTBN/test_data/networks_and_trajectories_ternary_data_01_6_1.json") as f:
raw_data = json.load(f)
trajectory_list_raw= raw_data["samples"]
trajectory_list = [pd.DataFrame(sample) for sample in trajectory_list_raw]
variables= raw_data["variables"]
prior_net_structure = raw_data["dyn.str"]
self.importer = SampleImporter(
trajectory_list=trajectory_list,
variables=variables,
prior_net_structure=prior_net_structure
)
self.importer.import_data()
#cls.s1 = sp.SamplePath(cls.importer)
#cls.traj = cls.s1.concatenated_samples
# print(len(cls.traj))
self.s1 = SamplePath(self.importer)
self.s1.build_trajectories()
self.s1.build_structure()
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
known_edges = self.s1.structure.edges[0:2]
se1 = StructureScoreBasedEstimator(self.s1,known_edges=known_edges)
edges = se1.estimate_structure(
max_parents = 4,
iterations_number = 100,
patience = 35,
tabu_length = 15,
tabu_rules_duration = 15,
optimizer = 'hill',
disable_multiprocessing=True
)
'calculate precision and recall'
n_missing_edges = 0
n_added_fake_edges = 0
n_added_fake_edges = len(edges.difference(true_edges))
n_missing_edges = len(true_edges.difference(edges))
n_true_positive = len(true_edges) - n_missing_edges
precision = n_true_positive / (n_true_positive + n_added_fake_edges)
recall = n_true_positive / (n_true_positive + n_missing_edges)
self.assertGreaterEqual(precision,0.75)
self.assertGreaterEqual(recall,0.75)
if __name__ == '__main__':

@ -1,79 +0,0 @@
import glob
import math
import os
import unittest
import networkx as nx
import numpy as np
import psutil
from line_profiler import LineProfiler
import copy
from ...PyCTBN.utility.cache import Cache
from ...PyCTBN.structure_graph.sample_path import SamplePath
from ...PyCTBN.estimators.structure_score_based_estimator import StructureScoreBasedEstimator
from ...PyCTBN.utility.json_importer import JsonImporter
class TestStructureScoreBasedEstimator(unittest.TestCase):
@classmethod
def setUpClass(cls):
pass
def test_structure(self):
#cls.read_files = glob.glob(os.path.join('../../data', "*.json"))
self.importer = JsonImporter("./PyCTBN/test_data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
self.s1 = SamplePath(self.importer)
self.s1.build_trajectories()
self.s1.build_structure()
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
se1 = StructureScoreBasedEstimator(self.s1)
edges = se1.estimate_structure(
max_parents = None,
iterations_number = 100,
patience = 35,
tabu_length = 15,
tabu_rules_duration = 15,
optimizer = 'tabu',
disable_multiprocessing=False
)
self.importer = JsonImporter("./PyCTBN/test_data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
self.s1 = SamplePath(self.importer)
self.s1.build_trajectories()
self.s1.build_structure()
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
se1 = StructureScoreBasedEstimator(self.s1)
edges = se1.estimate_structure(
max_parents = None,
iterations_number = 100,
patience = 35,
tabu_length = 15,
tabu_rules_duration = 15,
optimizer = 'tabu',
disable_multiprocessing=True
)
self.assertEqual(edges, true_edges)
if __name__ == '__main__':
unittest.main()

@ -9,11 +9,14 @@ import numpy as np
import psutil
from line_profiler import LineProfiler
import copy
import json
import pandas as pd
from ...PyCTBN.structure_graph.sample_path import SamplePath
from ...PyCTBN.estimators.structure_score_based_estimator import StructureScoreBasedEstimator
from ...PyCTBN.utility.json_importer import JsonImporter
from ...PyCTBN.utility.sample_importer import SampleImporter
@ -38,16 +41,75 @@ class TestHillClimbingSearch(unittest.TestCase):
se1 = StructureScoreBasedEstimator(self.s1)
edges = se1.estimate_structure(
max_parents = None,
max_parents = 2,
iterations_number = 40,
patience = None,
optimizer = 'hill'
optimizer = 'hill',
disable_multiprocessing=True
)
self.assertEqual(edges, true_edges)
def test_structure_3(self):
with open("./PyCTBN/test_data/networks_and_trajectories_ternary_data_01_6_1.json") as f:
raw_data = json.load(f)
trajectory_list_raw= raw_data["samples"]
trajectory_list = [pd.DataFrame(sample) for sample in trajectory_list_raw]
variables= raw_data["variables"]
prior_net_structure = raw_data["dyn.str"]
self.importer = SampleImporter(
trajectory_list=trajectory_list,
variables=variables,
prior_net_structure=prior_net_structure
)
self.importer.import_data()
#cls.s1 = sp.SamplePath(cls.importer)
#cls.traj = cls.s1.concatenated_samples
# print(len(cls.traj))
self.s1 = SamplePath(self.importer)
self.s1.build_trajectories()
self.s1.build_structure()
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
known_edges = self.s1.structure.edges[0:2]
se1 = StructureScoreBasedEstimator(self.s1,known_edges=known_edges)
edges = se1.estimate_structure(
max_parents = 3,
iterations_number = 100,
patience = 40,
optimizer = 'hill',
disable_multiprocessing=True
)
'calculate precision and recall'
n_missing_edges = 0
n_added_fake_edges = 0
n_added_fake_edges = len(edges.difference(true_edges))
n_missing_edges = len(true_edges.difference(edges))
n_true_positive = len(true_edges) - n_missing_edges
precision = n_true_positive / (n_true_positive + n_added_fake_edges)
recall = n_true_positive / (n_true_positive + n_missing_edges)
self.assertGreaterEqual(precision,0.75)
self.assertGreaterEqual(recall,0.75)
if __name__ == '__main__':
unittest.main()

@ -13,14 +13,11 @@ from line_profiler import LineProfiler
import copy
import json
import utility.cache as ch
import structure_graph.sample_path as sp
import estimators.structure_score_based_estimator as se
import utility.json_importer as ji
import utility.sample_importer as si
from ...PyCTBN.utility.cache import Cache
from ...PyCTBN.structure_graph.sample_path import SamplePath
from ...PyCTBN.estimators.structure_score_based_estimator import StructureScoreBasedEstimator
from ...PyCTBN.utility.json_importer import JsonImporter
from ...PyCTBN.utility.sample_importer import SampleImporter
class TestTabuSearch(unittest.TestCase):
@ -40,7 +37,7 @@ class TestTabuSearch(unittest.TestCase):
prior_net_structure = pd.DataFrame(raw_data[0]["dyn.str"])
cls.importer = si.SampleImporter(
cls.importer = SampleImporter(
trajectory_list=trajectory_list,
variables=variables,
prior_net_structure=prior_net_structure
@ -52,7 +49,7 @@ class TestTabuSearch(unittest.TestCase):
#cls.traj = cls.s1.concatenated_samples
# print(len(cls.traj))
cls.s1 = sp.SamplePath(cls.importer)
cls.s1 = SamplePath(cls.importer)
cls.s1.build_trajectories()
cls.s1.build_structure()
#cls.s1.clear_memory()
@ -63,21 +60,79 @@ class TestTabuSearch(unittest.TestCase):
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
se1 = se.StructureScoreBasedEstimator(self.s1)
se1 = StructureScoreBasedEstimator(self.s1)
edges = se1.estimate_structure(
max_parents = None,
iterations_number = 100,
patience = 20,
tabu_length = 10,
tabu_rules_duration = 10,
optimizer = 'tabu',
disable_multiprocessing=False
disable_multiprocessing=True
)
self.assertEqual(edges, true_edges)
def test_structure_3(self):
with open("./PyCTBN/test_data/networks_and_trajectories_ternary_data_01_6_1.json") as f:
raw_data = json.load(f)
trajectory_list_raw= raw_data["samples"]
trajectory_list = [pd.DataFrame(sample) for sample in trajectory_list_raw]
variables= raw_data["variables"]
prior_net_structure = raw_data["dyn.str"]
self.importer = SampleImporter(
trajectory_list=trajectory_list,
variables=variables,
prior_net_structure=prior_net_structure
)
self.importer.import_data()
#cls.s1 = sp.SamplePath(cls.importer)
#cls.traj = cls.s1.concatenated_samples
# print(len(cls.traj))
self.s1 = SamplePath(self.importer)
self.s1.build_trajectories()
self.s1.build_structure()
true_edges = copy.deepcopy(self.s1.structure.edges)
true_edges = set(map(tuple, true_edges))
known_edges = self.s1.structure.edges[0:2]
se1 = StructureScoreBasedEstimator(self.s1,known_edges=known_edges)
edges = se1.estimate_structure(
max_parents = 4,
iterations_number = 100,
patience = 40,
tabu_length = 3,
tabu_rules_duration = 3,
optimizer = 'tabu',
disable_multiprocessing=True
)
'calculate precision and recall'
n_missing_edges = 0
n_added_fake_edges = 0
n_added_fake_edges = len(edges.difference(true_edges))
n_missing_edges = len(true_edges.difference(edges))
n_true_positive = len(true_edges) - n_missing_edges
precision = n_true_positive / (n_true_positive + n_added_fake_edges)
recall = n_true_positive / (n_true_positive + n_missing_edges)
self.assertGreaterEqual(precision,0.75)
self.assertGreaterEqual(recall,0.75)
if __name__ == '__main__':
unittest.main()

@ -54,7 +54,8 @@ class TestSamplePath(unittest.TestCase):
importer = JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name')
importer.import_data(0)
s1 = SamplePath(importer)
random.shuffle(importer._sorter)
importer._sorter[0],importer._sorter[1]= importer._sorter[1],importer._sorter[0]
self.assertRaises(RuntimeError, s1.build_structure)
def test_build_saplepath_no_prior_net_structure(self):
@ -66,7 +67,17 @@ class TestSamplePath(unittest.TestCase):
s1.build_structure()
self.assertFalse(s1.structure.edges)
def test_buid_samplepath_no_variables(self):
importer = JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name')
importer.import_data(0)
importer._df_variables = None
self.assertRaises(RuntimeError, SamplePath, importer)
def test_buid_samplepath_no_concatenated_samples(self):
importer = JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name')
importer.import_data(0)
importer._concatenated_samples = None
self.assertRaises(RuntimeError, SamplePath, importer)
if __name__ == '__main__':
unittest.main()

@ -28,6 +28,15 @@ class TestStructure(unittest.TestCase):
for indx, var in enumerate(self.labels):
self.assertEqual(var, s1.get_node_id(indx))
def test_edges_operations(self):
s1 = Structure(self.labels, self.indxs, self.vals, self.edges, self.vars_numb)
self.assertTrue(s1.contains_edge(('X','Z')))
s1.add_edge(('Z','X'))
self.assertTrue(s1.contains_edge(('Z','X')))
s1.remove_edge(('Z','X'))
self.assertFalse(s1.contains_edge(('Z','X')))
def test_get_node_indx(self):
l2 = self.labels[:]
l2.remove('Y')
@ -71,6 +80,7 @@ class TestStructure(unittest.TestCase):
s1 = Structure(self.labels, self.indxs, self.vals, self.edges, self.vars_numb)
s2 = Structure(self.labels, self.indxs, self.vals, self.edges, self.vars_numb)
self.assertEqual(s1, s2)
self.assertNotEqual(s1,4)
def test_repr(self):
s1 = Structure(self.labels, self.indxs, self.vals, self.edges, self.vars_numb)

@ -2,6 +2,7 @@
import unittest
import numpy as np
import glob
import os
from ...PyCTBN.structure_graph.trajectory import Trajectory
from ...PyCTBN.utility.json_importer import JsonImporter
@ -10,7 +11,7 @@ class TestTrajectory(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.read_files = glob.glob(os.path.join('./test_data', "*.json"))
cls.read_files = glob.glob(os.path.join('./PyCTBN/test_data', "*.json"))
cls.importer = JsonImporter(cls.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name')
cls.importer.import_data(0)
@ -22,6 +23,8 @@ class TestTrajectory(unittest.TestCase):
self.assertTrue(np.array_equal(self.importer.concatenated_samples.iloc[:, 1: len(self.importer.sorter) + 1], t1.trajectory))
self.assertEqual(len(self.importer.sorter) + 1, t1._original_cols_number)
self.assertEqual(self.importer.concatenated_samples.iloc[:,1:].to_numpy().shape[0], t1.size())
print(t1)
if __name__ == '__main__':
unittest.main()

@ -152,7 +152,7 @@ class TestJsonImporter(unittest.TestCase):
def test_file_path(self):
j1 = JsonImporter("./PyCTBN/test_data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
self.assertEqual(j1.file_path, "./PyCTBN/data/networks_and_trajectories_binary_data_01_3.json")
self.assertEqual(j1.file_path, "./PyCTBN/test_data/networks_and_trajectories_binary_data_01_3.json")
def test_import_data(self):
j1 = JsonImporter("./PyCTBN/test_data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')

@ -33,13 +33,15 @@ class TestSampleImporter(unittest.TestCase):
prior_net_structure=self.prior_net_structure
)
sample_importer.import_data()
sample_importer.import_data(['X','Y','Z'])
s1 = SamplePath(sample_importer)
s1.build_trajectories()
s1.build_structure()
s1.clear_memory()
data_id= sample_importer.dataset_id()
self.assertEqual(data_id,"")
self.assertEqual(len(s1._importer._df_samples_list), 300)
self.assertIsInstance(s1._importer._df_samples_list,list)
self.assertIsInstance(s1._importer._df_samples_list[0],pd.DataFrame)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff