1
0
Fork 0

Updated tests

master
Luca Moretti 4 years ago
parent 78923d4768
commit 08d29d3e65
  1. 10
      main_package/classes/estimators/fam_score_calculator.py
  2. 10
      main_package/classes/estimators/parameters_estimator.py
  3. 143
      main_package/classes/structure_graph/set_of_cims.py
  4. 4
      main_package/tests/structure_graph/test_cim.py
  5. 54
      main_package/tests/structure_graph/test_setofcims.py
  6. 166
      main_package/tests/utility/test_json_importer.py

@ -70,7 +70,7 @@ class FamScoreCalculator:
"""
'get cim length'
values = len(cim.state_residence_times)
values = len(cim._state_residence_times)
'compute the marginal likelihood for the current cim'
return np.sum([
@ -98,7 +98,7 @@ class FamScoreCalculator:
the marginal likelihood of the node when assumes a specif value
"""
values = list(range(len(cim.state_residence_times)))
values = list(range(len(cim._state_residence_times)))
'remove the index because of the x != x^ condition in the summation '
values.remove(index)
@ -162,13 +162,13 @@ class FamScoreCalculator:
"""
'get cim length'
values=len(cim.state_residence_times)
values=len(cim._state_residence_times)
'compute the marginal likelihood for the current cim'
return np.sum([
self.single_cim_xu_marginal_likelihood_q(
cim.state_transition_matrix[index, index],
cim.state_residence_times[index],
cim._state_residence_times[index],
tau_xu,
alpha_xu)
for index in range(values)])
@ -227,7 +227,7 @@ class FamScoreCalculator:
# alpha_xu,
# alpha_xxu))
'calculate alpha_xxu as a uniform distribution'
alpha_xxu = alpha_xu /(len(cims[0].state_residence_times) - 1)
alpha_xxu = alpha_xu /(len(cims[0]._state_residence_times) - 1)
return self.marginal_likelihood_q(cims,
tau_xu,

@ -47,8 +47,8 @@ class ParametersEstimator:
A setOfCims object filled with the computed CIMS
"""
node_indx = self.net_graph.get_node_indx(node_id)
state_res_times = self.single_set_of_cims.state_residence_times
transition_matrices = self.single_set_of_cims.transition_matrices
state_res_times = self.single_set_of_cims._state_residence_times
transition_matrices = self.single_set_of_cims._transition_matrices
trajectory = self.sample_path.trajectories.trajectory
self.compute_state_res_time_for_node(node_indx, self.sample_path.trajectories.times,
trajectory,
@ -123,15 +123,15 @@ class ParametersEstimator:
self.sample_path.trajectories.trajectory,
self.net_graph.time_filtering[indx],
self.net_graph.time_scalar_indexing_strucure[indx],
aggr[1].state_residence_times)
aggr[1]._state_residence_times)
#print(self.net_graph.transition_filtering[indx])
#print(self.net_graph.transition_scalar_indexing_structure[indx])
self.compute_state_transitions_for_a_node(self.net_graph.get_node_indx(aggr[0]),
self.sample_path.trajectories.complete_trajectory,
self.net_graph.transition_filtering[indx],
self.net_graph.transition_scalar_indexing_structure[indx],
aggr[1].transition_matrices)
aggr[1].build_cims(aggr[1].state_residence_times, aggr[1].transition_matrices)
aggr[1]._transition_matrices)
aggr[1].build_cims(aggr[1]._state_residence_times, aggr[1]._transition_matrices)

@ -8,102 +8,91 @@ import numpy as np
import structure_graph.conditional_intensity_matrix as cim
class SetOfCims:
"""
Aggregates all the CIMS of the node identified by the label node_id.
:node_id: the node label
:parents_states_number: the cardinalities of the parents
:node_states_number: the caridinality of the node
:p_combs: the relative p_comb structure
:state_residence_time: matrix containing all the state residence time vectors for the node
:transition_matrices: matrix containing all the transition matrices for the node
:actaul_cims: the cims of the node
class SetOfCims(object):
"""Aggregates all the CIMS of the node identified by the label _node_id.
:param node_id: the node label
:type node_ind: string
:param parents_states_number: the cardinalities of the parents
:type parents_states_number: List
:param node_states_number: the caridinality of the node
:type node_states_number: int
:param p_combs: the p_comb structure bound to this node
:type p_combs: numpy.ndArray
:_state_residence_time: matrix containing all the state residence time vectors for the node
:_transition_matrices: matrix containing all the transition matrices for the node
:_actual_cims: the cims of the node
"""
def __init__(self, node_id: str, parents_states_number: typing.List, node_states_number: int, p_combs: np.ndarray):
self.node_id = node_id
self.parents_states_number = parents_states_number
self.node_states_number = node_states_number
self.actual_cims = []
self.state_residence_times = None
self.transition_matrices = None
self.p_combs = p_combs
self.build_times_and_transitions_structures()
def build_times_and_transitions_structures(self):
"""Constructor Method
"""
Initializes at the correct dimensions the state residence times matrix and the state transition matrices
self._node_id = node_id
self._parents_states_number = parents_states_number
self._node_states_number = node_states_number
self._actual_cims = []
self._state_residence_times = None
self._transition_matrices = None
self._p_combs = p_combs
self.build_times_and_transitions_structures()
Parameters:
void
Returns:
void
def build_times_and_transitions_structures(self) -> None:
"""Initializes at the correct dimensions the state residence times matrix and the state transition matrices.
"""
if not self.parents_states_number:
self.state_residence_times = np.zeros((1, self.node_states_number), dtype=np.float)
self.transition_matrices = np.zeros((1,self.node_states_number, self.node_states_number), dtype=np.int)
if not self._parents_states_number:
self._state_residence_times = np.zeros((1, self._node_states_number), dtype=np.float)
self._transition_matrices = np.zeros((1, self._node_states_number, self._node_states_number), dtype=np.int)
else:
self.state_residence_times = \
np.zeros((np.prod(self.parents_states_number), self.node_states_number), dtype=np.float)
self.transition_matrices = np.zeros([np.prod(self.parents_states_number), self.node_states_number,
self.node_states_number], dtype=np.int)
def build_cims(self, state_res_times: typing.List, transition_matrices: typing.List):
"""
Build the ConditionalIntensityMatrix object given the state residence times and transitions matrices.
Compute the cim coefficients.
Parameters:
state_res_times: the state residence times matrix
transition_matrices: the transition matrices
Returns:
void
self._state_residence_times = \
np.zeros((np.prod(self._parents_states_number), self._node_states_number), dtype=np.float)
self._transition_matrices = np.zeros([np.prod(self._parents_states_number), self._node_states_number,
self._node_states_number], dtype=np.int)
def build_cims(self, state_res_times: np.ndarray, transition_matrices: np.ndarray) -> None:
"""Build the ``ConditionalIntensityMatrix`` objects given the state residence times and transitions matrices.
Compute the cim coefficients.The class member ``_actual_cims`` will contain the computed cims.
:param state_res_times: the state residence times matrix
:type state_res_times: numpy.ndArray
:param transition_matrices: the transition matrices
:type transition_matrices: numpy.ndArray
"""
for state_res_time_vector, transition_matrix in zip(state_res_times, transition_matrices):
cim_to_add = cim.ConditionalIntensityMatrix(state_res_time_vector, transition_matrix)
cim_to_add.compute_cim_coefficients()
self.actual_cims.append(cim_to_add)
self.actual_cims = np.array(self.actual_cims)
self.transition_matrices = None
self.state_residence_times = None
self._actual_cims.append(cim_to_add)
self._actual_cims = np.array(self._actual_cims)
self._transition_matrices = None
self._state_residence_times = None
def filter_cims_with_mask(self, mask_arr: np.ndarray, comb: typing.List) -> np.ndarray:
"""
Filter the cims contained in the array actual_cims given the boolean mask mask_arr and the index comb.
Parameters:
mask_arr: the boolean mask
comb: the indexes of the selected cims
Returns:
Array of ConditionalIntensityMatrix
"""Filter the cims contained in the array ``_actual_cims`` given the boolean mask ``mask_arr`` and the index
``comb``.
:param mask_arr: the boolean mask that indicates which parent to consider
:type mask_arr: numpy.array
:param comb: the state/s of the filtered parents
:type comb: numpy.array
:return: Array of ``ConditionalIntensityMatrix`` objects
:rtype: numpy.array
"""
if mask_arr.size <= 1:
return self.actual_cims
return self._actual_cims
else:
tmp_parents_comb_from_ids = np.argwhere(np.all(self.p_combs[:, mask_arr] == comb, axis=1)).ravel()
return self.actual_cims[tmp_parents_comb_from_ids]
flat_indxs = np.argwhere(np.all(self._p_combs[:, mask_arr] == comb, axis=1)).ravel()
return self._actual_cims[flat_indxs]
@property
def actual_cims(self) -> np.ndarray:
return self._actual_cims
@property
def get_cims(self):
return self.actual_cims
def p_combs(self) -> np.ndarray:
return self._p_combs
def get_cims_number(self):
return len(self.actual_cims)
"""
def get_cim(self, index):
flat_index = self.indexes_converter(index)
return self.actual_cims[flat_index]
def indexes_converter(self, indexes):
assert len(indexes) == len(self.parents_states_number)
vector_index = 0
if not indexes:
return vector_index
else:
for indx, value in enumerate(indexes):
vector_index = vector_index*self.parents_states_number[indx] + indexes[indx]
return vector_index"""
return len(self._actual_cims)

@ -19,7 +19,7 @@ class TestConditionalIntensityMatrix(unittest.TestCase):
def test_init(self):
c1 = cim.ConditionalIntensityMatrix(self.state_res_times, self.state_transition_matrix)
self.assertTrue(np.array_equal(self.state_res_times, c1.state_residence_times))
self.assertTrue(np.array_equal(self.state_res_times, c1._state_residence_times))
self.assertTrue(np.array_equal(self.state_transition_matrix, c1.state_transition_matrix))
self.assertEqual(c1.cim.dtype, np.float)
self.assertEqual(self.state_transition_matrix.shape, c1.cim.shape)
@ -32,7 +32,7 @@ class TestConditionalIntensityMatrix(unittest.TestCase):
for j in range(0, len(self.state_res_times)):
c2[i, j] = (c2[i, j] + 1) / (self.state_res_times[i] + 1)
c1.compute_cim_coefficients()
for i in range(0, len(c1.state_residence_times)):
for i in range(0, len(c1._state_residence_times)):
self.assertTrue(np.isclose(np.sum(c1.cim[i]), 0.0, 1e-02, 1e-01))
for i in range(0, len(self.state_res_times)):
for j in range(0, len(self.state_res_times)):

@ -4,7 +4,9 @@ import unittest
import numpy as np
import itertools
import structure_graph.set_of_cims as soci
from structure_graph.set_of_cims import SetOfCims
class TestSetOfCims(unittest.TestCase):
@ -50,7 +52,7 @@ class TestSetOfCims(unittest.TestCase):
def test_filter_cims_with_mask(self):
p_combs = self.build_p_comb_structure_for_a_node(self.possible_cardinalities)
sofc1 = soci.SetOfCims('X', self.possible_cardinalities, 3, p_combs)
sofc1 = SetOfCims('X', self.possible_cardinalities, 3, p_combs)
state_res_times_list = []
transition_matrices_list = []
for i in range(len(p_combs)):
@ -59,56 +61,45 @@ class TestSetOfCims(unittest.TestCase):
state_transition_matrix = np.random.randint(1, 10000, (3, 3))
state_res_times_list.append(state_res_times)
transition_matrices_list.append(state_transition_matrix)
sofc1.build_cims(state_res_times_list, transition_matrices_list)
sofc1.build_cims(np.array(state_res_times_list), np.array(transition_matrices_list))
for length_of_mask in range(3):
for mask in list(itertools.permutations([True, False],r=length_of_mask)):
m = np.array(mask)
for parent_value in range(self.possible_cardinalities[0]):
cims = sofc1.filter_cims_with_mask(m, [parent_value])
if length_of_mask == 0 or length_of_mask == 1:
self.assertTrue(np.array_equal(sofc1.actual_cims, cims))
self.assertTrue(np.array_equal(sofc1._actual_cims, cims))
else:
indxs = self.another_filtering_method(p_combs, m, [parent_value])
self.assertTrue(np.array_equal(cims, sofc1.actual_cims[indxs]))
self.assertTrue(np.array_equal(cims, sofc1._actual_cims[indxs]))
def aux_test_build_cims(self, node_id, p_values, node_states, p_combs):
state_res_times_list = []
transition_matrices_list = []
so1 = soci.SetOfCims(node_id, p_values, node_states, p_combs)
so1 = SetOfCims(node_id, p_values, node_states, p_combs)
for i in range(len(p_combs)):
state_res_times = np.random.rand(1, node_states)[0]
state_res_times = state_res_times * 1000
state_transition_matrix = np.random.randint(1, 10000, (node_states, node_states))
state_res_times_list.append(state_res_times)
transition_matrices_list.append(state_transition_matrix)
so1.build_cims(state_res_times_list, transition_matrices_list)
so1.build_cims(np.array(state_res_times_list), np.array(transition_matrices_list))
self.assertEqual(len(state_res_times_list), so1.get_cims_number())
self.assertIsInstance(so1.actual_cims, np.ndarray)
self.assertIsNone(so1.transition_matrices)
self.assertIsNone(so1.state_residence_times)
self.assertIsInstance(so1._actual_cims, np.ndarray)
self.assertIsNone(so1._transition_matrices)
self.assertIsNone(so1._state_residence_times)
def aux_test_init(self, node_id, parents_states_number, node_states_number, p_combs):
sofcims = soci.SetOfCims(node_id, parents_states_number, node_states_number, p_combs)
self.assertEqual(sofcims.node_id, node_id)
self.assertTrue(np.array_equal(sofcims.p_combs, p_combs))
self.assertTrue(np.array_equal(sofcims.parents_states_number, parents_states_number))
self.assertEqual(sofcims.node_states_number, node_states_number)
self.assertFalse(sofcims.actual_cims)
self.assertEqual(sofcims.state_residence_times.shape[0], np.prod(np.array(parents_states_number)))
self.assertEqual(len(sofcims.state_residence_times[0]),node_states_number)
self.assertEqual(sofcims.transition_matrices.shape[0], np.prod(np.array(parents_states_number)))
self.assertEqual(len(sofcims.transition_matrices[0][0]), node_states_number)
def aux_test_indexes_converter(self, node_id, parents_states_number, node_states_number):
sofcims = soci.SetOfCims(node_id, parents_states_number, node_states_number)
if not parents_states_number:
self.assertEqual(sofcims.indexes_converter([]), 0)
else:
parents_possible_values = []
for cardi in parents_states_number:
parents_possible_values.extend(range(0, cardi))
for p in itertools.permutations(parents_possible_values, len(parents_states_number)):
self.assertEqual(sofcims.indexes_converter(list(p)), np.ravel_multi_index(list(p), parents_states_number))
sofcims = SetOfCims(node_id, parents_states_number, node_states_number, p_combs)
self.assertEqual(sofcims._node_id, node_id)
self.assertTrue(np.array_equal(sofcims._p_combs, p_combs))
self.assertTrue(np.array_equal(sofcims._parents_states_number, parents_states_number))
self.assertEqual(sofcims._node_states_number, node_states_number)
self.assertFalse(sofcims._actual_cims)
self.assertEqual(sofcims._state_residence_times.shape[0], np.prod(np.array(parents_states_number)))
self.assertEqual(len(sofcims._state_residence_times[0]), node_states_number)
self.assertEqual(sofcims._transition_matrices.shape[0], np.prod(np.array(parents_states_number)))
self.assertEqual(len(sofcims._transition_matrices[0][0]), node_states_number)
def build_p_comb_structure_for_a_node(self, parents_values):
"""
@ -140,5 +131,6 @@ class TestSetOfCims(unittest.TestCase):
indxs.append(indx)
return np.array(indxs)
if __name__ == '__main__':
unittest.main()

@ -5,7 +5,7 @@ import os
import glob
import numpy as np
import pandas as pd
import utility.json_importer as ji
from utility.json_importer import JsonImporter
import json
@ -18,18 +18,20 @@ class TestJsonImporter(unittest.TestCase):
cls.read_files = glob.glob(os.path.join('../../data', "*.json"))
def test_init(self):
j1 = ji.JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name')
self.assertEqual(j1.samples_label, 'samples')
self.assertEqual(j1.structure_label, 'dyn.str')
self.assertEqual(j1.variables_label, 'variables')
self.assertEqual(j1.time_key, 'Time')
self.assertEqual(j1.variables_key, 'Name')
self.assertEqual(j1.file_path, self.read_files[0])
self.assertFalse(j1.df_samples_list)
self.assertTrue(j1.variables.empty)
self.assertTrue(j1.structure.empty)
self.assertFalse(j1.concatenated_samples)
self.assertFalse(j1.sorter)
j1 = JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
self.assertEqual(j1._samples_label, 'samples')
self.assertEqual(j1._structure_label, 'dyn.str')
self.assertEqual(j1._variables_label, 'variables')
self.assertEqual(j1._time_key, 'Time')
self.assertEqual(j1._variables_key, 'Name')
self.assertEqual(j1._file_path, "../../data/networks_and_trajectories_binary_data_01_3.json")
self.assertIsNone(j1._df_samples_list)
self.assertIsNone(j1.variables)
self.assertIsNone(j1.structure)
self.assertEqual(j1.concatenated_samples,[])
self.assertIsNone(j1.sorter)
self.assertIsNone(j1._array_indx)
self.assertIsInstance(j1._raw_data, list)
def test_read_json_file_found(self):
data_set = {"key1": [1, 2, 3], "key2": [4, 5, 6]}
@ -37,68 +39,85 @@ class TestJsonImporter(unittest.TestCase):
json.dump(data_set, f)
path = os.getcwd()
path = path + '/data.json'
j1 = ji.JsonImporter(path, '', '', '', '', '')
imported_data = j1.read_json_file()
self.assertTrue(self.ordered(data_set) == self.ordered(imported_data))
j1 = JsonImporter(path, '', '', '', '', '')
self.assertTrue(self.ordered(data_set) == self.ordered(j1._raw_data))
os.remove('data.json')
def test_read_json_file_not_found(self):
path = os.getcwd()
path = path + '/data.json'
j1 = ji.JsonImporter(path, '', '', '', '', '')
self.assertRaises(FileNotFoundError, j1.read_json_file)
self.assertRaises(FileNotFoundError, JsonImporter, path, '', '', '', '', '')
def test_build_sorter(self):
j1 = JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
df_samples_list = j1.normalize_trajectories(j1._raw_data, 0, j1._samples_label)
sorter = j1.build_sorter(df_samples_list[0])
self.assertListEqual(sorter, list(df_samples_list[0].columns.values)[1:])
def test_normalize_trajectories(self):
j1 = ji.JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name')
raw_data = j1.read_json_file()
#print(raw_data)
j1.normalize_trajectories(raw_data, 0, j1.samples_label)
self.assertEqual(len(j1.df_samples_list), len(raw_data[0][j1.samples_label]))
self.assertEqual(list(j1.df_samples_list[0].columns.values)[1:], j1.sorter)
j1 = JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
df_samples_list = j1.normalize_trajectories(j1._raw_data, 0, j1._samples_label)
self.assertEqual(len(df_samples_list), len(j1._raw_data[0][j1._samples_label]))
def test_normalize_trajectories_wrong_indx(self):
j1 = ji.JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name')
raw_data = j1.read_json_file()
self.assertRaises(IndexError, j1.normalize_trajectories, raw_data, 474, j1.samples_label)
j1 = JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
self.assertRaises(IndexError, j1.normalize_trajectories, j1._raw_data, 474, j1._samples_label)
def test_normalize_trajectories_wrong_key(self):
j1 = ji.JsonImporter(self.read_files[0], 'sample', 'dyn.str', 'variables', 'Time', 'Name')
raw_data = j1.read_json_file()
self.assertRaises(KeyError, j1.normalize_trajectories, raw_data, 0, j1.samples_label)
j1 = JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'sample', 'dyn.str', 'variables', 'Time', 'Name')
self.assertRaises(KeyError, j1.normalize_trajectories, j1._raw_data, 0, j1._samples_label)
def test_compute_row_delta_single_samples_frame(self):
j1 = ji.JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name')
raw_data = j1.read_json_file()
j1.normalize_trajectories(raw_data, 0, j1.samples_label)
sample_frame = j1.df_samples_list[0]
j1 = JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
j1._array_indx = 0
j1._df_samples_list = j1.import_trajectories(j1._raw_data)
sample_frame = j1._df_samples_list[0]
original_copy = sample_frame.copy()
columns_header = list(sample_frame.columns.values)
shifted_cols_header = [s + "S" for s in columns_header[1:]]
new_sample_frame = j1.compute_row_delta_sigle_samples_frame(sample_frame, j1.time_key, columns_header[1:],
new_sample_frame = j1.compute_row_delta_sigle_samples_frame(sample_frame, columns_header[1:],
shifted_cols_header)
self.assertEqual(len(list(sample_frame.columns.values)) + len(shifted_cols_header),
len(list(new_sample_frame.columns.values)))
self.assertEqual(sample_frame.shape[0] - 1, new_sample_frame.shape[0])
for indx, row in new_sample_frame.iterrows():
self.assertAlmostEqual(row['Time'],
original_copy.iloc[indx + 1]['Time'] - original_copy.iloc[indx]['Time'])
for indx, row in new_sample_frame.iterrows():
np.array_equal(np.array(row[columns_header[1:]],dtype=int),
np.array(original_copy.iloc[indx][columns_header[1:]],dtype=int))
np.array_equal(np.array(row[shifted_cols_header], dtype=int),
np.array(original_copy.iloc[indx + 1][columns_header[1:]], dtype=int))
def test_compute_row_delta_in_all_frames(self):
j1 = ji.JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name')
raw_data = j1.read_json_file()
j1.import_trajectories(raw_data)
j1.compute_row_delta_in_all_samples_frames(j1.time_key)
self.assertEqual(list(j1.df_samples_list[0].columns.values), list(j1.concatenated_samples.columns.values))
self.assertEqual(list(j1.concatenated_samples.columns.values)[0], j1.time_key)
j1 = JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
j1._array_indx = 0
j1._df_samples_list = j1.import_trajectories(j1._raw_data)
j1._sorter = j1.build_sorter(j1._df_samples_list[0])
j1.compute_row_delta_in_all_samples_frames(j1._df_samples_list)
self.assertEqual(list(j1._df_samples_list[0].columns.values),
list(j1.concatenated_samples.columns.values)[:len(list(j1._df_samples_list[0].columns.values))])
self.assertEqual(list(j1.concatenated_samples.columns.values)[0], j1._time_key)
def test_compute_row_delta_in_all_frames_not_init_sorter(self):
j1 = JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
j1._array_indx = 0
j1._df_samples_list = j1.import_trajectories(j1._raw_data)
self.assertRaises(RuntimeError, j1.compute_row_delta_in_all_samples_frames, j1._df_samples_list)
def test_clear_data_frame_list(self):
j1 = ji.JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name')
raw_data = j1.read_json_file()
j1.import_trajectories(raw_data)
j1.compute_row_delta_in_all_samples_frames(j1.time_key)
j1 = JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
j1._array_indx = 0
j1._df_samples_list = j1.import_trajectories(j1._raw_data)
j1._sorter = j1.build_sorter(j1._df_samples_list[0])
j1.compute_row_delta_in_all_samples_frames(j1._df_samples_list)
j1.clear_data_frame_list()
for df in j1.df_samples_list:
for df in j1._df_samples_list:
self.assertTrue(df.empty)
def test_clear_concatenated_frame(self):
j1 = ji.JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name')
j1.import_data()
j1 = JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
j1.import_data(0)
j1.clear_concatenated_frame()
self.assertTrue(j1.concatenated_samples.empty)
@ -108,7 +127,7 @@ class TestJsonImporter(unittest.TestCase):
json.dump(data_set, f)
path = os.getcwd()
path = path + '/data.json'
j1 = ji.JsonImporter(path, '', '', '', '', '')
j1 = JsonImporter(path, '', '', '', '', '')
raw_data = j1.read_json_file()
frame = pd.DataFrame(raw_data)
col_list = j1.build_list_of_samples_array(frame)
@ -120,37 +139,44 @@ class TestJsonImporter(unittest.TestCase):
os.remove('data.json')
def test_import_variables(self):
j1 = ji.JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name')
j1 = JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
sorter = ['X', 'Y', 'Z']
raw_data = [{'variables':{"Name": ['Z', 'Y', 'X'], "value": [3, 3, 3]}}]
j1.import_variables(raw_data, sorter)
self.assertEqual(list(j1.variables[j1.variables_key]), sorter)
raw_data = [{'variables':{"Name": ['X', 'Y', 'Z'], "value": [3, 3, 3]}}]
j1._array_indx = 0
df_var = j1.import_variables(raw_data)
self.assertEqual(list(df_var[j1._variables_key]), sorter)
def test_import_structure(self):
j1 = ji.JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name')
j1 = JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
raw_data = [{"dyn.str":[{"From":"X","To":"Z"},{"From":"Y","To":"Z"},{"From":"Z","To":"Y"}]}]
j1.import_structure(raw_data)
#print(raw_data[0]['dyn.str'][0].items())
self.assertIsInstance(j1.structure, pd.DataFrame)
j1._array_indx = 0
df_struct = j1.import_structure(raw_data)
self.assertIsInstance(df_struct, pd.DataFrame)
def test_import_sampled_cims(self):
j1 = ji.JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name')
j1 = JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
raw_data = j1.read_json_file()
j1._array_indx = 0
j1._df_samples_list = j1.import_trajectories(raw_data)
j1._sorter = j1.build_sorter(j1._df_samples_list[0])
cims = j1.import_sampled_cims(raw_data, 0, 'dyn.cims')
j1.import_variables(raw_data, ['X','Y','Z'])
self.assertEqual(list(cims.keys()), j1.variables['Name'].tolist())
self.assertEqual(list(cims.keys()), j1.sorter)
def test_dataset_id(self):
j1 = JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
array_indx = 0
j1.import_data(array_indx)
self.assertEqual(array_indx, j1.dataset_id())
def test_file_path(self):
j1 = JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
self.assertEqual(j1.file_path, "../../data/networks_and_trajectories_binary_data_01_3.json")
def test_import_data(self):
j1 = ji.JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name')
j1.import_data()
#lp = LineProfiler()
#lp_wrapper = lp(j1.import_data)
#lp_wrapper()
#lp.print_stats()
#j1.import_data()
self.assertEqual(list(j1.variables[j1.variables_key]),
list(j1.concatenated_samples.columns.values[1:len(j1.variables[j1.variables_key]) + 1]))
j1 = JsonImporter("../../data/networks_and_trajectories_binary_data_01_3.json", 'samples', 'dyn.str', 'variables', 'Time', 'Name')
j1.import_data(0)
self.assertEqual(list(j1.variables[j1._variables_key]),
list(j1.concatenated_samples.columns.values[1:len(j1.variables[j1._variables_key]) + 1]))
print(j1.variables)
print(j1.structure)
print(j1.concatenated_samples)