1
0
Fork 0

Refactors in test classes

parallel_struct_est
philpMartin 4 years ago
parent 9ad0cac265
commit 462e29bb17
  1. 0
      main_package/__init__.py
  2. 1
      main_package/classes/conditional_intensity_matrix.py
  3. 10
      main_package/classes/json_importer.py
  4. 73
      main_package/classes/network_graph.py
  5. 35
      main_package/classes/parameters_estimator.py
  6. 16
      main_package/classes/sample_path.py
  7. 1
      main_package/classes/set_of_cims.py
  8. 31
      main_package/classes/structure.py
  9. 3
      main_package/tests/sample_path_test.py
  10. 16
      main_package/tests/test_json_importer.py
  11. 71
      main_package/tests/test_networkgraph.py
  12. 43
      main_package/tests/test_structure.py
  13. 2
      main_package/tests/test_trajectory.py

@ -22,6 +22,7 @@ class ConditionalIntensityMatrix:
def compute_cim_coefficients(self):
np.fill_diagonal(self.cim, self.cim.diagonal() * -1)
self.cim = ((self.cim.T + 1) / (self.state_residence_times + 1)).T
#np.fill_diagonal(self.state_transition_matrix, 0)
def __repr__(self):
return 'CIM:\n' + str(self.cim)

@ -102,10 +102,12 @@ class JsonImporter(AbstractImporter):
"""
for sample_indx, sample in enumerate(raw_data[indx][trajectories_key]):
self.df_samples_list.append(pd.DataFrame(sample))
self.sorter = list(self.df_samples_list[0].columns.values)[1:]
def compute_row_delta_sigle_samples_frame(self, sample_frame, time_header_label, columns_header, shifted_cols_header):
sample_frame[time_header_label] = sample_frame[time_header_label].diff().shift(-1)
shifted_cols = sample_frame[columns_header[1:]].shift(-1)
shifted_cols = sample_frame[columns_header].shift(-1).fillna(0).astype('int32')
#print(shifted_cols)
shifted_cols.columns = shifted_cols_header
sample_frame = sample_frame.assign(**shifted_cols)
sample_frame.drop(sample_frame.tail(1).index, inplace=True)
@ -113,11 +115,11 @@ class JsonImporter(AbstractImporter):
def compute_row_delta_in_all_samples_frames(self, time_header_label):
columns_header = list(self.df_samples_list[0].columns.values)
self.sorter = columns_header[1:]
shifted_cols_header = [s + "S" for s in columns_header[1:]]
#self.sorter = columns_header[1:]
shifted_cols_header = [s + "S" for s in self.sorter]
for indx, sample in enumerate(self.df_samples_list):
self.df_samples_list[indx] = self.compute_row_delta_sigle_samples_frame(sample,
time_header_label, columns_header, shifted_cols_header)
time_header_label, self.sorter, shifted_cols_header)
self._concatenated_samples = pd.concat(self.df_samples_list)
def build_list_of_samples_array(self, data_frame):

@ -21,6 +21,7 @@ class NetworkGraph():
self.graph = nx.DiGraph()
self._nodes_indexes = self.graph_struct.list_of_nodes_indexes()
self._nodes_labels = self.graph_struct.list_of_nodes_labels()
self.aggregated_info_about_nodes_parents = None
self._fancy_indexing = None
self._time_scalar_indexing_structure = []
self._transition_scalar_indexing_structure = []
@ -30,6 +31,7 @@ class NetworkGraph():
def init_graph(self):
self.add_nodes(self.graph_struct.list_of_nodes_labels())
self.add_edges(self.graph_struct.list_of_edges())
self.aggregated_info_about_nodes_parents = self.get_ord_set_of_par_of_all_nodes()
self._fancy_indexing = self.build_fancy_indexing_structure(0)
self.build_time_scalar_indexing_structure()
self.build_time_columns_filtering_structure()
@ -37,40 +39,57 @@ class NetworkGraph():
self.build_transition_columns_filtering_structure()
def add_nodes(self, list_of_nodes):
for indx, id in enumerate(list_of_nodes):
for id in list_of_nodes:
self.graph.add_node(id)
nx.set_node_attributes(self.graph, {id:indx}, 'indx')
nx.set_node_attributes(self.graph, {id:self.graph_struct.get_node_indx(id)}, 'indx')
def add_edges(self, list_of_edges):
self.graph.add_edges_from(list_of_edges)
def get_ordered_by_indx_set_of_parents(self, node):
#print(node)
ordered_set = {}
parents = self.get_parents_by_id(node)
#print(parents)
sorted_parents = [x for _, x in sorted(zip(self.graph_struct.list_of_nodes_labels(), parents))]
#print(sorted_parents)
#print(parents)
p_indxes= []
p_values = []
for n in parents:
indx = self._nodes_labels.index(n)
ordered_set[n] = indx
ordered_set = {k: v for k, v in sorted(ordered_set.items(), key=lambda item: item[1])}
return list(ordered_set.keys())
#indx = self.graph_struct.get_node_indx(n)
#print(indx)
#ordered_set[n] = indx
p_indxes.append(self.graph_struct.get_node_indx(n))
p_values.append(self.graph_struct.get_states_number(n))
ordered_set = (sorted_parents, p_indxes, p_values)
#print(ordered_set)
#ordered_set = {k: v for k, v in sorted(ordered_set.items(), key=lambda item: item[1])}
return ordered_set
def get_ord_set_of_par_of_all_nodes(self):
result = []
for node in self._nodes_labels:
result.append(self.get_ordered_by_indx_set_of_parents(node))
#print(result)
return result
def get_ordered_by_indx_parents_values(self, node):
"""def get_ordered_by_indx_parents_values(self, node):
parents_values = []
parents = self.get_ordered_by_indx_set_of_parents(node)
for n in parents:
parents_values.append(self.graph_struct.get_states_number(n))
return parents_values
return parents_values"""
def get_ordered_by_indx_parents_values_for_all_nodes(self):
result = []
"""result = []
for node in self._nodes_labels:
result.append(self.get_ordered_by_indx_parents_values(node))
return result
return result"""
pars_values = [i[2] for i in self.aggregated_info_about_nodes_parents]
return pars_values
def get_states_number_of_all_nodes_sorted(self):
states_number_list = []
@ -79,20 +98,30 @@ class NetworkGraph():
return states_number_list
def build_fancy_indexing_structure(self, start_indx):
list_of_parents_list = self.get_ord_set_of_par_of_all_nodes()
"""list_of_parents_list = self.get_ord_set_of_par_of_all_nodes()
#print(list_of_parents_list)
index_structure = []
for i, list_of_parents in enumerate(list_of_parents_list):
indexes_for_a_node = []
for j, node in enumerate(list_of_parents):
indexes_for_a_node.append(self.get_node_indx(node) + start_indx)
index_structure.append(np.array(indexes_for_a_node, dtype=np.int))
return index_structure
#print(index_structure)
return index_structure"""
if start_indx > 0:
pass
else:
fancy_indx = [i[1] for i in self.aggregated_info_about_nodes_parents]
return fancy_indx
def build_time_scalar_indexing_structure_for_a_node(self, node_id, parents_id):
#print(parents_id)
#print(node_id)
#print("Parents_id", parents_id)
T_vector = np.array([self.graph_struct.variables_frame.iloc[node_id, 1].astype(np.int)])
#print(T_vector)
T_vector = np.append(T_vector, [self.graph_struct.variables_frame.iloc[x, 1] for x in parents_id])
#print("Here ", self.graph_struct.variables_frame.iloc[parents_id[0], 1])
T_vector = np.append(T_vector, [self.graph_struct.get_states_number_by_indx(x) for x in parents_id])
#print(T_vector)
T_vector = T_vector.cumprod().astype(np.int)
return T_vector
@ -100,9 +129,10 @@ class NetworkGraph():
def build_time_scalar_indexing_structure(self):
parents_indexes_list = self._fancy_indexing
for node_indx, p_indxs in enumerate(parents_indexes_list):
for node_indx, p_indxs in zip(self.graph_struct.list_of_nodes_indexes(), parents_indexes_list):
if p_indxs.size == 0:
self._time_scalar_indexing_structure.append(np.array([self.get_states_number_by_indx(node_indx)], dtype=np.int))
self._time_scalar_indexing_structure.append(np.array([self.get_states_number_by_indx(node_indx)],
dtype=np.int))
else:
self._time_scalar_indexing_structure.append(
self.build_time_scalar_indexing_structure_for_a_node(node_indx, p_indxs))
@ -110,7 +140,7 @@ class NetworkGraph():
def build_transition_scalar_indexing_structure_for_a_node(self, node_id, parents_id):
M_vector = np.array([self.graph_struct.variables_frame.iloc[node_id, 1],
self.graph_struct.variables_frame.iloc[node_id, 1].astype(np.int)])
M_vector = np.append(M_vector, [self.graph_struct.variables_frame.iloc[x, 1] for x in parents_id])
M_vector = np.append(M_vector, [self.graph_struct.get_states_number_by_indx(x) for x in parents_id])
M_vector = M_vector.cumprod().astype(np.int)
return M_vector
@ -130,15 +160,18 @@ class NetworkGraph():
def build_transition_columns_filtering_structure(self):
parents_indexes_list = self._fancy_indexing
nodes_number = len(parents_indexes_list)
for node_indx, p_indxs in enumerate(parents_indexes_list):
nodes_number = self.graph_struct.total_variables_number
for node_indx, p_indxs in zip(self.graph_struct.list_of_nodes_indexes(), parents_indexes_list):
self._transition_filtering.append(np.array([node_indx + nodes_number, node_indx, *p_indxs], dtype=np.int))
def get_nodes(self):
return list(self.graph.nodes)
def get_edges(self):
return list(self.graph.edges)
def get_nodes_sorted_by_indx(self):
return self.graph_struct.list_of_nodes
return self.graph_struct.list_of_nodes_labels()
def get_parents_by_id(self, node_id):
return list(self.graph.predecessors(node_id))

@ -23,18 +23,25 @@ class ParametersEstimator:
def compute_parameters(self):
for node_indx, set_of_cims in enumerate(self.amalgamated_cims_struct.sets_of_cims):
self.compute_state_res_time_for_node(node_indx, self.sample_path.trajectories.times,
#print(self.net_graph.get_nodes())
#print(self.amalgamated_cims_struct.sets_of_cims)
#enumerate(zip(self.net_graph.get_nodes(), self.amalgamated_cims_struct.sets_of_cims))
for indx, aggr in enumerate(zip(self.net_graph.get_nodes(), self.amalgamated_cims_struct.sets_of_cims)):
#print(self.net_graph.time_filtering[indx])
#print(self.net_graph.time_scalar_indexing_strucure[indx])
self.compute_state_res_time_for_node(self.net_graph.get_node_indx(aggr[0]), self.sample_path.trajectories.times,
self.sample_path.trajectories.trajectory,
self.net_graph.time_filtering[node_indx],
self.net_graph.time_scalar_indexing_strucure[node_indx],
set_of_cims.state_residence_times)
self.compute_state_transitions_for_a_node(node_indx,
self.net_graph.time_filtering[indx],
self.net_graph.time_scalar_indexing_strucure[indx],
aggr[1].state_residence_times)
#print(self.net_graph.transition_filtering[indx])
#print(self.net_graph.transition_scalar_indexing_structure[indx])
self.compute_state_transitions_for_a_node(self.net_graph.get_node_indx(aggr[0]),
self.sample_path.trajectories.complete_trajectory,
self.net_graph.transition_filtering[node_indx],
self.net_graph.transition_scalar_indexing_structure[node_indx],
set_of_cims.transition_matrices)
set_of_cims.build_cims(set_of_cims.state_residence_times, set_of_cims.transition_matrices)
self.net_graph.transition_filtering[indx],
self.net_graph.transition_scalar_indexing_structure[indx],
aggr[1].transition_matrices)
aggr[1].build_cims(aggr[1].state_residence_times, aggr[1].transition_matrices)
@ -90,7 +97,7 @@ class ParametersEstimator:
# Simple Test #
os.getcwd()
"""os.getcwd()
os.chdir('..')
path = os.getcwd() + '/data'
@ -105,7 +112,7 @@ pe = ParametersEstimator(s1, g1)
pe.init_amalgamated_cims_struct()
lp = LineProfiler()
"""[[2999.2966 2749.2298 3301.5975]
[[2999.2966 2749.2298 3301.5975]
[3797.1737 3187.8345 2939.2009]
[3432.224 3062.5402 4530.9028]]
@ -140,7 +147,7 @@ print(pe.amalgamated_cims_struct.sets_of_cims[0].state_residence_times)
Raveled [14472 3552 10920 12230 25307 13077 9707 14408 24115 22918 6426 16492
10608 16072 5464 10746 11213 21959 23305 6816 16489 3792 19190 15398
13718 18243 31961]"""
13718 18243 31961]
lp_wrapper = lp(pe.compute_parameters)
lp_wrapper()
@ -148,5 +155,5 @@ lp_wrapper()
#for cond in variable.get_cims():
#print(cond.cim)
print(pe.amalgamated_cims_struct.get_cims_of_node(1,[2]))
lp.print_stats()
lp.print_stats()"""

@ -21,6 +21,7 @@ class SamplePath:
variables_label, time_key, variables_key)
self._trajectories = None
self._structure = None
self.total_variables_count = None
def build_trajectories(self):
self.importer.import_data()
@ -31,7 +32,9 @@ class SamplePath:
self.importer.clear_concatenated_frame()
def build_structure(self):
self._structure = st.Structure(self.importer.structure, self.importer.variables)
self.total_variables_count = len(self.importer.sorter)
self._structure = st.Structure(self.importer.structure, self.importer.variables,
self.total_variables_count)
@property
def trajectories(self):
@ -41,5 +44,16 @@ class SamplePath:
def structure(self):
return self._structure
def total_variables_count(self):
return self.total_variables_count
"""def build_possible_values_variables_structure(self):
possible_val_list = []
print(self.importer.variables)
for cardinality in self.importer.variables['Value']:
possible_val_list.append(list(range(0, cardinality)))
self.possible_variables_values = possible_val_list"""

@ -69,7 +69,6 @@ class SetOfCims:
def get_cims_number(self):
return len(self.actual_cims)
def indexes_converter(self, indexes): # Si aspetta array del tipo [2,2] dove
assert len(indexes) == len(self.parents_states_number)
vector_index = 0

@ -1,4 +1,4 @@
import numpy as np
class Structure:
"""
@ -9,17 +9,14 @@ class Structure:
rispetto alle colonne del dataset
"""
def __init__(self, structure, variables):
def __init__(self, structure, variables, total_variables_number):
self.structure_frame = structure
self.variables_frame = variables
self.total_variables_number = total_variables_number
self.name_label = variables.columns.values[0]
self.value_label = variables.columns.values[1]
def list_of_edges(self):
#edges_list = []
#for indx, row in self.structure_frame.iterrows():
#row_tuple = (row[0], row[1])
#edges_list.append(row_tuple)
records = self.structure_frame.to_records(index=False)
edges_list = list(records)
return edges_list
@ -28,23 +25,35 @@ class Structure:
return self.variables_frame[self.name_label].values.tolist()
def list_of_nodes_indexes(self):
nodes_indexes = []
for indx in self.list_of_nodes_labels():
nodes_indexes.append(indx)
return nodes_indexes
return list(self.variables_frame.index)
def get_node_id(self, node_indx):
return self.variables_frame[self.name_label][node_indx]
def get_node_indx(self, node_id):
return list(self.variables_frame[self.name_label]).index(node_id)
return self.variables_frame[self.name_label][self.variables_frame[self.name_label] == node_id].index[0]
def get_positional_node_indx(self, node_id):
return np.flatnonzero(self.variables_frame[self.name_label] == node_id)[0]
def get_states_number(self, node):
#print("node", node)
return self.variables_frame[self.value_label][self.get_node_indx(node)]
def get_states_number_by_indx(self, node_indx):
#print(self.value_label)
#print("Node indx", node_indx)
return self.variables_frame[self.value_label][node_indx]
def total_variables_number(self):
return self.total_variables_number
def __repr__(self):
return "Variables:\n" + str(self.variables_frame) + "\nEdges: \n" + str(self.structure_frame)
def __eq__(self, other):
"""Overrides the default implementation"""
if isinstance(other, Structure):
return self.structure_frame.equals(other.structure_frame) and \
self.variables_frame.equals(other.variables_frame)
return NotImplemented

@ -15,11 +15,10 @@ class TestSamplePath(unittest.TestCase):
self.assertIsNotNone(s1.structure)
self.assertIsInstance(s1.structure, st.Structure)
self.assertTrue(s1.importer.concatenated_samples.empty)
self.assertEqual(s1.total_variables_count, len(s1.importer.sorter))
print(s1.structure)
print(s1.trajectories)
if __name__ == '__main__':
unittest.main()

@ -7,7 +7,6 @@ import os
import json
class TestJsonImporter(unittest.TestCase):
def test_init(self):
@ -19,10 +18,11 @@ class TestJsonImporter(unittest.TestCase):
self.assertEqual(j1.time_key, 'Time')
self.assertEqual(j1.variables_key, 'Name')
self.assertEqual(j1.files_path, path)
self.assertTrue(not j1.df_samples_list)
self.assertFalse(j1.df_samples_list)
self.assertTrue(j1.variables.empty)
self.assertTrue(j1.structure.empty)
self.assertTrue(not j1.concatenated_samples)
self.assertFalse(j1.concatenated_samples)
self.assertFalse(j1.sorter)
def test_read_json_file_found(self):
data_set = {"key1": [1, 2, 3], "key2": [4, 5, 6]}
@ -36,7 +36,6 @@ class TestJsonImporter(unittest.TestCase):
def test_read_json_file_not_found(self):
path = os.getcwd()
#print(path)
j1 = ji.JsonImporter(path, '', '', '', '', '')
self.assertIsNone(j1.read_json_file())
@ -45,6 +44,7 @@ class TestJsonImporter(unittest.TestCase):
raw_data = j1.read_json_file()
j1.normalize_trajectories(raw_data, 0, j1.samples_label)
self.assertEqual(len(j1.df_samples_list), len(raw_data[0][j1.samples_label]))
self.assertEqual(list(j1.df_samples_list[0].columns.values)[1:], j1.sorter)
def test_normalize_trajectories_wrong_indx(self):
j1 = ji.JsonImporter('../data', 'samples', 'dyn.str', 'variables', 'Time', 'Name')
@ -63,7 +63,7 @@ class TestJsonImporter(unittest.TestCase):
sample_frame = j1.df_samples_list[0]
columns_header = list(sample_frame.columns.values)
shifted_cols_header = [s + "S" for s in columns_header[1:]]
new_sample_frame = j1.compute_row_delta_sigle_samples_frame(sample_frame, j1.time_key, columns_header,
new_sample_frame = j1.compute_row_delta_sigle_samples_frame(sample_frame, j1.time_key, columns_header[1:],
shifted_cols_header)
self.assertEqual(len(list(sample_frame.columns.values)) + len(shifted_cols_header),
len(list(new_sample_frame.columns.values)))
@ -103,8 +103,10 @@ class TestJsonImporter(unittest.TestCase):
def test_import_variables(self):
j1 = ji.JsonImporter('../data', 'samples', 'dyn.str', 'variables', 'Time', 'Name')
sorter = ['X', 'Y', 'Z']
raw_data = [{'variables':{"Name": ['Z', 'Y', 'X'], "value": [3, 3, 3]}}]
j1.import_variables(raw_data, ['X', 'Y', 'Z'])
j1.import_variables(raw_data, sorter)
self.assertEqual(list(j1.variables[j1.variables_key]), sorter)
def test_import_data(self):
j1 = ji.JsonImporter('../data', 'samples', 'dyn.str', 'variables', 'Time', 'Name')
@ -115,8 +117,6 @@ class TestJsonImporter(unittest.TestCase):
print(j1.structure)
print(j1.concatenated_samples)
def ordered(self, obj):
if isinstance(obj, dict):
return sorted((k, self.ordered(v)) for k, v in obj.items())

@ -0,0 +1,71 @@
import unittest
import networkx as nx
import sample_path as sp
import network_graph as ng
class TestNetworkGraph(unittest.TestCase):
def setUp(self):
self.s1 = sp.SamplePath('../data', 'samples', 'dyn.str', 'variables', 'Time', 'Name')
self.s1.build_trajectories()
self.s1.build_structure()
def test_init(self):
g1 = ng.NetworkGraph(self.s1.structure)
self.assertEqual(self.s1.structure, g1.graph_struct)
self.assertIsInstance(g1.graph, nx.DiGraph)
#TODO MANCANO TUTTI I TEST DI INIZIALIZZAZIONE DEI DATI PRIVATI della classe aggiungere le property necessarie
def test_add_nodes(self):
g1 = ng.NetworkGraph(self.s1.structure)
g1.add_nodes(self.s1.structure.list_of_nodes_labels())
for n1, n2 in zip(g1.get_nodes(), self.s1.structure.list_of_nodes_labels()):
self.assertEqual(n1, n2)
def test_add_edges(self):
g1 = ng.NetworkGraph(self.s1.structure)
g1.add_edges(self.s1.structure.list_of_edges())
for e in self.s1.structure.list_of_edges():
self.assertIn(tuple(e), g1.get_edges())
def test_get_ordered_by_indx_set_of_parents(self):
g1 = ng.NetworkGraph(self.s1.structure)
g1.add_nodes(self.s1.structure.list_of_nodes_labels())
g1.add_edges(self.s1.structure.list_of_edges())
sorted_par_list_aggregated_info = g1.get_ordered_by_indx_set_of_parents(g1.get_nodes()[2])
self.test_aggregated_par_list_data(g1,g1.get_nodes()[2], sorted_par_list_aggregated_info)
def test_aggregated_par_list_data(self, graph, node_id, sorted_par_list_aggregated_info):
for indx, element in enumerate(sorted_par_list_aggregated_info):
if indx == 0:
self.assertEqual(graph.get_parents_by_id(node_id), element)
for j in range(0, len(sorted_par_list_aggregated_info[0]) - 1):
self.assertLess(self.s1.structure.get_node_indx(sorted_par_list_aggregated_info[0][j]),
self.s1.structure.get_node_indx(sorted_par_list_aggregated_info[0][j + 1]))
elif indx == 1:
for node, node_indx in zip(sorted_par_list_aggregated_info[0], sorted_par_list_aggregated_info[1]):
self.assertEqual(graph.get_node_indx(node), node_indx)
else:
for node, node_val in zip(sorted_par_list_aggregated_info[0], sorted_par_list_aggregated_info[2]):
self.assertEqual(graph.graph_struct.get_states_number(node), node_val)
def test_get_ord_set_of_par_of_all_nodes(self):
g1 = ng.NetworkGraph(self.s1.structure)
g1.add_nodes(self.s1.structure.list_of_nodes_labels())
g1.add_edges(self.s1.structure.list_of_edges())
sorted_list_of_par_lists = g1.get_ord_set_of_par_of_all_nodes()
for node, par_list in zip(g1.get_nodes_sorted_by_indx(), sorted_list_of_par_lists):
self.test_aggregated_par_list_data(g1, node, par_list)
def test_get_ordered_by_indx_parents_values_for_all_nodes(self):
g1 = ng.NetworkGraph(self.s1.structure)
g1.add_nodes(self.s1.structure.list_of_nodes_labels())
g1.add_edges(self.s1.structure.list_of_edges())
g1.aggregated_info_about_nodes_parents = g1.get_ord_set_of_par_of_all_nodes()
print(g1.get_ordered_by_indx_parents_values_for_all_nodes())
if __name__ == '__main__':
unittest.main()

@ -1,55 +1,70 @@
import unittest
import pandas as pd
import structure as st
class TestStructure(unittest.TestCase):
def setUp(self):
self.structure_frame = pd.DataFrame([{"From":"X","To":"Z"},{"From":"Y","To":"Z"},{"From":"Z","To":"Y"}])
self.structure_frame = pd.DataFrame([{"From":"X","To":"Z"}, {"From":"X","To":"Y"},{"From":"Y","To":"X"},
{"From":"Y","To":"Z"},{"From":"Z","To":"Y"}, {"From":"Z","To":"X"} ])
self.variables_frame = pd.DataFrame([{"Name":"X","Value":3},{"Name":"Y","Value":3},{"Name":"Z","Value":3}])
def test_init(self):
s1 = st.Structure(self.structure_frame, self.variables_frame)
s1 = st.Structure(self.structure_frame, self.variables_frame, len(self.variables_frame.index))
self.assertTrue(self.structure_frame.equals(s1.structure_frame))
self.assertTrue(self.variables_frame.equals(s1.variables_frame))
self.assertEqual(self.variables_frame.columns.values[0], s1.name_label)
self.assertEqual(self.variables_frame.columns.values[1], s1.value_label)
#print(len(self.variables_frame.index))
self.assertEqual(len(self.variables_frame.index), s1.total_variables_number)
def test_list_of_edges(self):
s1 = st.Structure(self.structure_frame, self.variables_frame)
s1 = st.Structure(self.structure_frame, self.variables_frame, len(self.variables_frame.index))
records = self.structure_frame.to_records(index=False)
result = list(records)
for e1, e2 in zip(result, s1.list_of_edges()):
self.assertEqual(e1, e2)
def test_list_of_nodes_labels(self):
s1 = st.Structure(self.structure_frame, self.variables_frame)
s1 = st.Structure(self.structure_frame, self.variables_frame, len(self.variables_frame.index))
self.assertEqual(list(self.variables_frame['Name']), s1.list_of_nodes_labels())
def test_get_node_id(self):
s1 = st.Structure(self.structure_frame, self.variables_frame)
s1 = st.Structure(self.structure_frame, self.variables_frame, len(self.variables_frame.index))
for indx, var in enumerate(list(self.variables_frame['Name'])):
self.assertEqual(var, s1.get_node_id(indx))
def test_get_node_indx(self):
s1 = st.Structure(self.structure_frame, self.variables_frame)
for indx, var in enumerate(list(self.variables_frame['Name'])):
filtered_frame = self.variables_frame.drop(self.variables_frame[self.variables_frame['Name'] == 'Y'].index)
#print(filtered_frame)
s1 = st.Structure(self.structure_frame, filtered_frame, len(self.variables_frame.index))
for indx, var in zip(filtered_frame.index, filtered_frame['Name']):
self.assertEqual(indx, s1.get_node_indx(var))
def test_list_of_node_indxs(self):
filtered_frame = self.variables_frame.drop(self.variables_frame[self.variables_frame['Name'] == 'Y'].index)
# print(filtered_frame)
s1 = st.Structure(self.structure_frame, filtered_frame, len(self.variables_frame.index))
for indx1, indx2 in zip(filtered_frame.index, s1.list_of_nodes_indexes()):
self.assertEqual(indx1, indx2)
def test_get_positional_node_indx(self):
filtered_frame = self.variables_frame.drop(self.variables_frame[self.variables_frame['Name'] == 'Y'].index)
# print(filtered_frame)
s1 = st.Structure(self.structure_frame, filtered_frame, len(self.variables_frame.index))
for indx, var in enumerate(s1.list_of_nodes_labels()):
self.assertEqual(indx, s1.get_positional_node_indx(var))
def test_get_states_number(self):
s1 = st.Structure(self.structure_frame, self.variables_frame)
s1 = st.Structure(self.structure_frame, self.variables_frame, len(self.variables_frame.index))
for indx, row in self.variables_frame.iterrows():
self.assertEqual(row[1], s1.get_states_number(row[0]))
def test_get_states_numeber_by_indx(self):
s1 = st.Structure(self.structure_frame, self.variables_frame)
s1 = st.Structure(self.structure_frame, self.variables_frame, len(self.variables_frame.index))
for indx, row in self.variables_frame.iterrows():
self.assertEqual(row[1], s1.get_states_number_by_indx(indx))
def test_list_of_node_indxs(self):
pass
if __name__ == '__main__':
unittest.main()

@ -4,7 +4,7 @@ import numpy as np
import trajectory as tr
class TestTrajecotry(unittest.TestCase):
class TestTrajectory(unittest.TestCase):
def test_init(self):
cols_list = [np.array([1.2,1.3,.14]), np.arange(1,4), np.arange(4,7)]