Old engine for Continuous Time Bayesian Networks. Superseded by reCTBN. 🐍
https://github.com/madlabunimib/PyCTBN
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
144 lines
4.6 KiB
144 lines
4.6 KiB
4 years ago
|
|
||
|
# License: MIT License
|
||
|
|
||
4 years ago
|
import sys
|
||
4 years ago
|
sys.path.append("../../PyCTBN/")
|
||
4 years ago
|
import glob
|
||
|
import math
|
||
|
import os
|
||
|
import unittest
|
||
|
|
||
|
import networkx as nx
|
||
|
import numpy as np
|
||
4 years ago
|
import pandas as pd
|
||
4 years ago
|
import psutil
|
||
4 years ago
|
|
||
4 years ago
|
import copy
|
||
4 years ago
|
import json
|
||
4 years ago
|
|
||
3 years ago
|
from pyctbn.legacy.utility.cache import Cache
|
||
|
from pyctbn.legacy.structure_graph.sample_path import SamplePath
|
||
|
from pyctbn.legacy.estimators.structure_score_based_estimator import StructureScoreBasedEstimator
|
||
|
from pyctbn.legacy.utility.json_importer import JsonImporter
|
||
|
from pyctbn.legacy.utility.sample_importer import SampleImporter
|
||
4 years ago
|
|
||
|
|
||
4 years ago
|
class TestTabuSearch(unittest.TestCase):
|
||
|
|
||
|
@classmethod
|
||
|
def setUpClass(cls):
|
||
|
#cls.read_files = glob.glob(os.path.join('../../data', "*.json"))
|
||
4 years ago
|
|
||
3 years ago
|
with open("./tests/data/networks_and_trajectories_binary_data_01_3.json") as f:
|
||
4 years ago
|
raw_data = json.load(f)
|
||
|
|
||
4 years ago
|
trajectory_list_raw= raw_data[0]["samples"]
|
||
4 years ago
|
|
||
|
trajectory_list = [pd.DataFrame(sample) for sample in trajectory_list_raw]
|
||
|
|
||
4 years ago
|
variables= pd.DataFrame(raw_data[0]["variables"])
|
||
|
prior_net_structure = pd.DataFrame(raw_data[0]["dyn.str"])
|
||
4 years ago
|
|
||
|
|
||
4 years ago
|
cls.importer = SampleImporter(
|
||
4 years ago
|
trajectory_list=trajectory_list,
|
||
|
variables=variables,
|
||
|
prior_net_structure=prior_net_structure
|
||
|
)
|
||
|
|
||
4 years ago
|
cls.importer.import_data()
|
||
4 years ago
|
#cls.s1 = sp.SamplePath(cls.importer)
|
||
4 years ago
|
|
||
|
#cls.traj = cls.s1.concatenated_samples
|
||
|
|
||
|
# print(len(cls.traj))
|
||
4 years ago
|
cls.s1 = SamplePath(cls.importer)
|
||
4 years ago
|
cls.s1.build_trajectories()
|
||
|
cls.s1.build_structure()
|
||
4 years ago
|
#cls.s1.clear_memory()
|
||
4 years ago
|
|
||
|
|
||
|
|
||
|
def test_structure(self):
|
||
|
true_edges = copy.deepcopy(self.s1.structure.edges)
|
||
|
true_edges = set(map(tuple, true_edges))
|
||
|
|
||
4 years ago
|
se1 = StructureScoreBasedEstimator(self.s1)
|
||
4 years ago
|
edges = se1.estimate_structure(
|
||
|
max_parents = None,
|
||
4 years ago
|
iterations_number = 100,
|
||
4 years ago
|
patience = 20,
|
||
4 years ago
|
optimizer = 'tabu',
|
||
4 years ago
|
disable_multiprocessing=True
|
||
4 years ago
|
)
|
||
|
|
||
|
|
||
|
self.assertEqual(edges, true_edges)
|
||
|
|
||
4 years ago
|
def test_structure_3(self):
|
||
3 years ago
|
with open("./tests/data/networks_and_trajectories_ternary_data_01_6_1.json") as f:
|
||
4 years ago
|
raw_data = json.load(f)
|
||
4 years ago
|
|
||
|
# read the samples
|
||
4 years ago
|
trajectory_list_raw= raw_data["samples"]
|
||
|
|
||
|
trajectory_list = [pd.DataFrame(sample) for sample in trajectory_list_raw]
|
||
|
|
||
|
variables= raw_data["variables"]
|
||
|
prior_net_structure = raw_data["dyn.str"]
|
||
|
|
||
|
|
||
|
self.importer = SampleImporter(
|
||
|
trajectory_list=trajectory_list,
|
||
|
variables=variables,
|
||
|
prior_net_structure=prior_net_structure
|
||
|
)
|
||
|
|
||
|
self.importer.import_data()
|
||
|
#cls.s1 = sp.SamplePath(cls.importer)
|
||
|
|
||
|
#cls.traj = cls.s1.concatenated_samples
|
||
|
|
||
|
# print(len(cls.traj))
|
||
|
self.s1 = SamplePath(self.importer)
|
||
|
self.s1.build_trajectories()
|
||
|
self.s1.build_structure()
|
||
|
|
||
|
true_edges = copy.deepcopy(self.s1.structure.edges)
|
||
|
true_edges = set(map(tuple, true_edges))
|
||
|
|
||
|
known_edges = self.s1.structure.edges[0:2]
|
||
|
|
||
|
se1 = StructureScoreBasedEstimator(self.s1,known_edges=known_edges)
|
||
|
edges = se1.estimate_structure(
|
||
|
max_parents = 4,
|
||
|
iterations_number = 100,
|
||
|
patience = 40,
|
||
|
tabu_length = 3,
|
||
|
tabu_rules_duration = 3,
|
||
|
optimizer = 'tabu',
|
||
|
disable_multiprocessing=True
|
||
|
)
|
||
|
|
||
|
'calculate precision and recall'
|
||
|
n_missing_edges = 0
|
||
|
n_added_fake_edges = 0
|
||
|
|
||
|
|
||
|
n_added_fake_edges = len(edges.difference(true_edges))
|
||
|
|
||
|
n_missing_edges = len(true_edges.difference(edges))
|
||
|
|
||
|
n_true_positive = len(true_edges) - n_missing_edges
|
||
|
|
||
|
precision = n_true_positive / (n_true_positive + n_added_fake_edges)
|
||
|
|
||
|
recall = n_true_positive / (n_true_positive + n_missing_edges)
|
||
4 years ago
|
|
||
4 years ago
|
self.assertGreaterEqual(precision,0.75)
|
||
|
self.assertGreaterEqual(recall,0.75)
|
||
4 years ago
|
|
||
|
if __name__ == '__main__':
|
||
|
unittest.main()
|
||
|
|