1
0
Fork 0

Add SamplePath and Structure tests

parallel_struct_est
philpMartin 4 years ago
parent d86e3af695
commit 9ad0cac265
  1. 2
      main_package/classes/json_importer.py
  2. 2
      main_package/classes/network_graph.py
  3. 23
      main_package/classes/sample_path.py
  4. 13
      main_package/classes/structure.py
  5. 6
      main_package/classes/trajectory.py
  6. 25
      main_package/tests/sample_path_test.py
  7. 55
      main_package/tests/test_structure.py
  8. 1
      main_package/tests/test_trajectory.py

@ -134,7 +134,7 @@ class JsonImporter(AbstractImporter):
columns_list.append(data_frame[column].to_numpy())
return columns_list
def clear_data_frames(self):
def clear_concatenated_frame(self):
"""
Rimuove tutti i valori contenuti nei data_frames presenti in df_samples_list
Parameters:

@ -50,7 +50,7 @@ class NetworkGraph():
for n in parents:
indx = self._nodes_labels.index(n)
ordered_set[n] = indx
{k: v for k, v in sorted(ordered_set.items(), key=lambda item: item[1])}
ordered_set = {k: v for k, v in sorted(ordered_set.items(), key=lambda item: item[1])}
return list(ordered_set.keys())
def get_ord_set_of_par_of_all_nodes(self):

@ -1,6 +1,4 @@
import pandas as pd
import numpy as np
import os
import json_importer as imp
import trajectory as tr
import structure as st
@ -18,8 +16,9 @@ class SamplePath:
:structure: oggetto Structure
"""
def __init__(self, files_path):
self.importer = imp.JsonImporter(files_path)
def __init__(self, files_path, samples_label, structure_label, variables_label, time_key, variables_key):
self.importer = imp.JsonImporter(files_path, samples_label, structure_label,
variables_label, time_key, variables_key)
self._trajectories = None
self._structure = None
@ -29,7 +28,7 @@ class SamplePath:
tr.Trajectory(self.importer.build_list_of_samples_array(self.importer.concatenated_samples),
len(self.importer.sorter) + 1)
#self.trajectories.append(trajectory)
self.importer.clear_data_frames()
self.importer.clear_concatenated_frame()
def build_structure(self):
self._structure = st.Structure(self.importer.structure, self.importer.variables)
@ -42,17 +41,5 @@ class SamplePath:
def structure(self):
return self._structure
"""os.getcwd()
os.chdir('..')
path = os.getcwd() + '/data'
os.getcwd()
os.chdir('..')
path = os.getcwd() + '/data'
s1 = SamplePath(path)
s1.build_trajectories()
s1.build_structure()
print(s1.trajectories[0].get_complete_trajectory())"""

@ -16,10 +16,12 @@ class Structure:
self.value_label = variables.columns.values[1]
def list_of_edges(self):
edges_list = []
for indx, row in self.structure_frame.iterrows():
row_tuple = (row.From, row.To)
edges_list.append(row_tuple)
#edges_list = []
#for indx, row in self.structure_frame.iterrows():
#row_tuple = (row[0], row[1])
#edges_list.append(row_tuple)
records = self.structure_frame.to_records(index=False)
edges_list = list(records)
return edges_list
def list_of_nodes_labels(self):
@ -43,3 +45,6 @@ class Structure:
def get_states_number_by_indx(self, node_indx):
#print(self.value_label)
return self.variables_frame[self.value_label][node_indx]
def __repr__(self):
return "Variables:\n" + str(self.variables_frame) + "\nEdges: \n" + str(self.structure_frame)

@ -35,6 +35,10 @@ class Trajectory:
return self._times
def size(self):
return self.actual_trajectory.shape[0]
return self._actual_trajectory.shape[0]
def __repr__(self):
return "Complete Trajectory Rows: " + str(self.size()) + "\n" + self.complete_trajectory.__repr__() + \
"\nTimes Rows:" + str(self.times.size) + "\n" + self.times.__repr__()

@ -0,0 +1,25 @@
import unittest
import sample_path as sp
import trajectory as tr
import structure as st
class TestSamplePath(unittest.TestCase):
def test_init(self):
s1 = sp.SamplePath('../data', 'samples', 'dyn.str', 'variables', 'Time', 'Name')
s1.build_trajectories()
self.assertIsNotNone(s1.trajectories)
self.assertIsInstance(s1.trajectories, tr.Trajectory)
s1.build_structure()
self.assertIsNotNone(s1.structure)
self.assertIsInstance(s1.structure, st.Structure)
self.assertTrue(s1.importer.concatenated_samples.empty)
print(s1.structure)
print(s1.trajectories)
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,55 @@
import unittest
import pandas as pd
import structure as st
class TestStructure(unittest.TestCase):
def setUp(self):
self.structure_frame = pd.DataFrame([{"From":"X","To":"Z"},{"From":"Y","To":"Z"},{"From":"Z","To":"Y"}])
self.variables_frame = pd.DataFrame([{"Name":"X","Value":3},{"Name":"Y","Value":3},{"Name":"Z","Value":3}])
def test_init(self):
s1 = st.Structure(self.structure_frame, self.variables_frame)
self.assertTrue(self.structure_frame.equals(s1.structure_frame))
self.assertTrue(self.variables_frame.equals(s1.variables_frame))
self.assertEqual(self.variables_frame.columns.values[0], s1.name_label)
self.assertEqual(self.variables_frame.columns.values[1], s1.value_label)
def test_list_of_edges(self):
s1 = st.Structure(self.structure_frame, self.variables_frame)
records = self.structure_frame.to_records(index=False)
result = list(records)
for e1, e2 in zip(result, s1.list_of_edges()):
self.assertEqual(e1, e2)
def test_list_of_nodes_labels(self):
s1 = st.Structure(self.structure_frame, self.variables_frame)
self.assertEqual(list(self.variables_frame['Name']), s1.list_of_nodes_labels())
def test_get_node_id(self):
s1 = st.Structure(self.structure_frame, self.variables_frame)
for indx, var in enumerate(list(self.variables_frame['Name'])):
self.assertEqual(var, s1.get_node_id(indx))
def test_get_node_indx(self):
s1 = st.Structure(self.structure_frame, self.variables_frame)
for indx, var in enumerate(list(self.variables_frame['Name'])):
self.assertEqual(indx, s1.get_node_indx(var))
def test_get_states_number(self):
s1 = st.Structure(self.structure_frame, self.variables_frame)
for indx, row in self.variables_frame.iterrows():
self.assertEqual(row[1], s1.get_states_number(row[0]))
def test_get_states_numeber_by_indx(self):
s1 = st.Structure(self.structure_frame, self.variables_frame)
for indx, row in self.variables_frame.iterrows():
self.assertEqual(row[1], s1.get_states_number_by_indx(indx))
def test_list_of_node_indxs(self):
pass
if __name__ == '__main__':
unittest.main()

@ -13,6 +13,7 @@ class TestTrajecotry(unittest.TestCase):
self.assertTrue(np.array_equal(np.ravel(t1.complete_trajectory[:, : 1]), cols_list[1]))
self.assertTrue(np.array_equal(np.ravel(t1.complete_trajectory[:, 1: 2]), cols_list[2]))
self.assertEqual(len(cols_list) - 1, t1.complete_trajectory.shape[1])
self.assertEqual(t1.size(), t1.times.size)
def test_init_first_array_not_float_type(self):
cols_list = [np.arange(1, 4), np.arange(4, 7), np.array([1.2, 1.3, .14])]