import unittest import numpy as np import networkx as nx import glob import os import math from line_profiler import LineProfiler import psutil import json_importer as ji import sample_path as sp import structure_estimator as se import cache as ch class TestStructureEstimator(unittest.TestCase): @classmethod def setUpClass(cls) -> None: cls.read_files = glob.glob(os.path.join('../data', "*.json")) cls.importer = ji.JsonImporter(cls.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name') cls.s1 = sp.SamplePath(cls.importer) cls.s1.build_trajectories() cls.s1.build_structure() def test_init(self): exp_alfa = 0.1 chi_alfa = 0.1 se1 = se.StructureEstimator(self.s1, exp_alfa, chi_alfa) self.assertEqual(self.s1, se1.sample_path) self.assertTrue(np.array_equal(se1.nodes, np.array(self.s1.structure.nodes_labels))) self.assertTrue(np.array_equal(se1.nodes_indxs, self.s1.structure.nodes_indexes)) self.assertTrue(np.array_equal(se1.nodes_vals, self.s1.structure.nodes_values)) self.assertEqual(se1.exp_test_sign, exp_alfa) self.assertEqual(se1.chi_test_alfa, chi_alfa) self.assertIsInstance(se1.complete_graph, nx.DiGraph) self.assertIsInstance(se1.cache, ch.Cache) def test_build_complete_graph(self): exp_alfa = 0.1 chi_alfa = 0.1 nodes_numb = len(self.s1.structure.nodes_labels) se1 = se.StructureEstimator(self.s1, exp_alfa, chi_alfa) cg = se1.build_complete_graph(self.s1.structure.nodes_labels) self.assertEqual(len(cg.edges), nodes_numb*(nodes_numb - 1)) for node in self.s1.structure.nodes_labels: no_self_loops = self.s1.structure.nodes_labels[:] no_self_loops.remove(node) for n2 in no_self_loops: self.assertIn((node, n2), cg.edges) def test_generate_possible_sub_sets_of_size(self): exp_alfa = 0.1 chi_alfa = 0.1 nodes_numb = len(self.s1.structure.nodes_labels) se1 = se.StructureEstimator(self.s1, exp_alfa, chi_alfa) for node in self.s1.structure.nodes_labels: for b in range(nodes_numb): sets = se1.generate_possible_sub_sets_of_size(self.s1.structure.nodes_labels, b, node) sets2 = se1.generate_possible_sub_sets_of_size(self.s1.structure.nodes_labels, b, node) self.assertEqual(len(list(sets)), math.floor(math.factorial(nodes_numb - 1) / (math.factorial(b)*math.factorial(nodes_numb -1 - b)))) for sset in sets2: self.assertFalse(node in sset) def test_time(self): se1 = se.StructureEstimator(self.s1, 0.1, 0.1) lp = LineProfiler() lp.add_function(se1.complete_test) lp.add_function(se1.one_iteration_of_CTPC_algorithm) lp.add_function(se1.independence_test) lp_wrapper = lp(se1.ctpc_algorithm) lp_wrapper() lp.print_stats() print(se1.complete_graph.edges) print(self.s1.structure.edges) for ed in self.s1.structure.edges: self.assertIn(tuple(ed), se1.complete_graph.edges) tuples_edges = [tuple(rec) for rec in self.s1.structure.edges] spurious_edges = [] for ed in se1.complete_graph.edges: if not(ed in tuples_edges): spurious_edges.append(ed) print("Spurious Edges:",spurious_edges) se1.save_results() def test_memory(self): se1 = se.StructureEstimator(self.s1, 0.1, 0.1) se1.ctpc_algorithm() current_process = psutil.Process(os.getpid()) mem = current_process.memory_info().rss print("Average Memory Usage in MB:", mem / 10**6) if __name__ == '__main__': unittest.main()