#!/usr/bin/env python3 # License: MIT License import glob import math import os import unittest import json import networkx as nx import numpy as np import timeit from pyctbn.legacy.utility.cache import Cache from pyctbn.legacy.structure_graph.sample_path import SamplePath from pyctbn.legacy.estimators.structure_constraint_based_estimator import StructureConstraintBasedEstimator from pyctbn.legacy.utility.json_importer import JsonImporter class TestStructureEstimator(unittest.TestCase): @classmethod def setUpClass(cls): cls.read_files = glob.glob(os.path.join('./tests/data', "*.json")) cls.importer = JsonImporter('./tests/data/networks_and_trajectories_binary_data_01_3.json', 'samples', 'dyn.str', 'variables', 'Time', 'Name') cls.importer.import_data(0) cls.s1 = SamplePath(cls.importer) cls.s1.build_trajectories() cls.s1.build_structure() cls.real_net_structure = nx.DiGraph(cls.s1.structure.edges) def test_init(self): exp_alfa = 0.1 chi_alfa = 0.1 se1 = StructureConstraintBasedEstimator(self.s1, exp_alfa, chi_alfa) self.assertEqual(self.s1, se1._sample_path) self.assertTrue(np.array_equal(se1._nodes, np.array(self.s1.structure.nodes_labels))) self.assertTrue(np.array_equal(se1._nodes_indxs, self.s1.structure.nodes_indexes)) self.assertTrue(np.array_equal(se1._nodes_vals, self.s1.structure.nodes_values)) self.assertEqual(se1._exp_test_sign, exp_alfa) self.assertEqual(se1._chi_test_alfa, chi_alfa) self.assertIsInstance(se1._complete_graph, nx.DiGraph) self.assertIsInstance(se1._cache, Cache) def test_build_complete_graph(self): exp_alfa = 0.1 chi_alfa = 0.1 nodes_numb = len(self.s1.structure.nodes_labels) se1 = StructureConstraintBasedEstimator(self.s1, exp_alfa, chi_alfa) cg = se1.build_complete_graph(self.s1.structure.nodes_labels) self.assertEqual(len(cg.edges), nodes_numb*(nodes_numb - 1)) for node in self.s1.structure.nodes_labels: no_self_loops = self.s1.structure.nodes_labels[:] no_self_loops.remove(node) for n2 in no_self_loops: self.assertIn((node, n2), cg.edges) def test_build_removable_edges_matrix(self): exp_alfa = 0.1 chi_alfa = 0.1 known_edges = self.s1.structure.edges[0:2] se1 = StructureConstraintBasedEstimator(self.s1, exp_alfa, chi_alfa, known_edges) for edge in known_edges: i = self.s1.structure.get_node_indx(edge[0]) j = self.s1.structure.get_node_indx(edge[1]) self.assertFalse(se1._removable_edges_matrix[i][j]) def test_generate_possible_sub_sets_of_size(self): exp_alfa = 0.1 chi_alfa = 0.1 nodes_numb = len(self.s1.structure.nodes_labels) se1 = StructureConstraintBasedEstimator(self.s1, exp_alfa, chi_alfa) for node in self.s1.structure.nodes_labels: for b in range(nodes_numb): sets = StructureConstraintBasedEstimator.generate_possible_sub_sets_of_size(self.s1.structure.nodes_labels, b, node) sets2 = StructureConstraintBasedEstimator.generate_possible_sub_sets_of_size(self.s1.structure.nodes_labels, b, node) self.assertEqual(len(list(sets)), math.floor(math.factorial(nodes_numb - 1) / (math.factorial(b)*math.factorial(nodes_numb -1 - b)))) for sset in sets2: self.assertFalse(node in sset) def test_time(self): known_edges = [] se1 = StructureConstraintBasedEstimator(self.s1, 0.1, 0.1, known_edges,25) exec_time = timeit.timeit(se1.ctpc_algorithm, number=1) print("Execution Time: ", exec_time) for ed in self.s1.structure.edges: self.assertIn(tuple(ed), se1._complete_graph.edges) def test_save_results(self): se1 = StructureConstraintBasedEstimator(self.s1, 0.1, 0.1) se1.ctpc_algorithm() file_name = './tests/estimators/test_save.json' se1.save_results(file_name) with open(file_name) as f: js_graph = json.load(f) result_graph = nx.json_graph.node_link_graph(js_graph) self.assertFalse(nx.difference(se1._complete_graph, result_graph).edges) os.remove(file_name) def test_adjacency_matrix(self): se1 = StructureConstraintBasedEstimator(self.s1, 0.1, 0.1) se1.ctpc_algorithm() adj_matrix = nx.adjacency_matrix(self.real_net_structure, self.s1.structure.nodes_labels).toarray().astype(bool) self.assertTrue(np.array_equal(adj_matrix, se1.adjacency_matrix())) def test_save_plot_estimated_graph(self): se1 = StructureConstraintBasedEstimator(self.s1, 0.1, 0.1) edges = se1.estimate_structure(disable_multiprocessing=True) file_name = './tests/estimators/test_plot.png' se1.save_plot_estimated_structure_graph(file_name) os.remove(file_name) if __name__ == '__main__': unittest.main()