1
0
Fork 0

Refactors on JsonImporterClass

parallel_struct_est
philpMartin 4 years ago
parent cd678b6a5b
commit c60001426a
  1. 87
      main_package/classes/json_importer.py
  2. 102
      main_package/classes/network_graph.py
  3. 97
      main_package/classes/parameters_estimator.py
  4. 22
      main_package/classes/sample_path.py
  5. 2
      main_package/classes/set_of_cims.py
  6. 22
      main_package/classes/structure.py
  7. 31
      main_package/classes/trajectory.py

@ -3,6 +3,7 @@ import glob
import pandas as pd
import json
from abstract_importer import AbstractImporter
from line_profiler import LineProfiler
class JsonImporter(AbstractImporter):
@ -23,26 +24,29 @@ class JsonImporter(AbstractImporter):
def __init__(self, files_path):
self.df_samples_list = []
self.df_structure = pd.DataFrame()
self.df_variables = pd.DataFrame()
self.concatenated_samples = None
self._df_structure = pd.DataFrame()
self._df_variables = pd.DataFrame()
self._concatenated_samples = None
super(JsonImporter, self).__init__(files_path)
def import_data(self):
raw_data = self.read_json_file()
self.import_trajectories(raw_data)
self.compute_row_delta_in_all_samples_frames()
self.compute_row_delta_in_all_samples_frames('Time')
self.import_structure(raw_data)
self.import_variables(raw_data)
#Le variabili DEVONO essere ordinate come le Colonne del dataset
assert list(self._df_variables['Name']) == \
(list(self._concatenated_samples.columns.values[1:len(self.variables['Name']) + 1]))
def import_trajectories(self, raw_data):
self.normalize_trajectories(raw_data, 0, 'samples')
def import_structure(self, raw_data):
self.df_structure = self.one_level_normalizing(raw_data, 0, 'dyn.str')
self._df_structure = self.one_level_normalizing(raw_data, 0, 'dyn.str')
def import_variables(self, raw_data):
self.df_variables = self.one_level_normalizing(raw_data, 0, 'variables')
self._df_variables = self.one_level_normalizing(raw_data, 0, 'variables')
def read_json_file(self):
"""
@ -89,34 +93,27 @@ class JsonImporter(AbstractImporter):
void
"""
for sample_indx, sample in enumerate(raw_data[indx][trajectories_key]):
self.df_samples_list.append(pd.json_normalize(raw_data[indx][trajectories_key][sample_indx]))
#print(sample_indx, self.df_samples_list[sample_indx])
def compute_row_delta_sigle_samples_frame(self, sample_frame):
columns_header = list(sample_frame.columns.values)
#print(columns_header)
for col_name in columns_header:
if col_name == 'Time':
sample_frame[col_name + 'Delta'] = sample_frame[col_name].diff()
else:
sample_frame[col_name + 'Delta'] = sample_frame[col_name]
sample_frame['Time'] = sample_frame['TimeDelta']
del sample_frame['TimeDelta']
sample_frame['Time'] = sample_frame['Time'].shift(-1)
columns_header = list(sample_frame.columns.values)
#print(columns_header[4:]) #TODO rimuovere dipendeza diretta da 'Time' e 4
for column in columns_header[4:]:
sample_frame[column] = sample_frame[column].shift(-1)
#sample_frame[column] = sample_frame[column].fillna(0)"""
self.df_samples_list.append(pd.DataFrame(sample))
def compute_row_delta_sigle_samples_frame(self, sample_frame, time_header_label, columns_header, shifted_cols_header):
sample_frame[time_header_label] = sample_frame[time_header_label].diff().shift(-1)
shifted_cols = sample_frame[columns_header[1:]].shift(-1)
shifted_cols.columns = shifted_cols_header
sample_frame = sample_frame.assign(**shifted_cols)
sample_frame.drop(sample_frame.tail(1).index, inplace=True)
#print("After Time Delta",sample_frame)
return sample_frame
def compute_row_delta_in_all_samples_frames(self):
def compute_row_delta_in_all_samples_frames(self, time_header_label):
columns_header = list(self.df_samples_list[0].columns.values)
shifted_cols_header = [s + "S" for s in columns_header[1:]]
for indx, sample in enumerate(self.df_samples_list):
#print(indx)
#print(self.df_samples_list[299])
self.compute_row_delta_sigle_samples_frame(sample)
self.concatenated_samples = pd.concat(self.df_samples_list)
self.df_samples_list[indx] = self.compute_row_delta_sigle_samples_frame(sample,
time_header_label, columns_header, shifted_cols_header)
#print(self.df_samples_list[indx])
self._concatenated_samples = pd.concat(self.df_samples_list)
#print("Concatenated", self._concatenated_samples)
for indx in range(len(self.df_samples_list)): # Le singole traj non servono più
self.df_samples_list[indx] = self.df_samples_list[indx].iloc[0:0]
def build_list_of_samples_array(self, data_frame):
"""
@ -140,17 +137,31 @@ class JsonImporter(AbstractImporter):
Returns:
void
"""
for indx in range(len(self.df_samples_list)):
self.df_samples_list[indx] = self.df_samples_list[indx].iloc[0:0]
self.concatenated_samples = self.concatenated_samples.iloc[0:0]
self._concatenated_samples = self._concatenated_samples.iloc[0:0]
@property
def concatenated_samples(self):
return self._concatenated_samples
@property
def variables(self):
return self._df_variables
@property
def structure(self):
return self._df_structure
"""ij = JsonImporter("../data")
#raw_data = ij.read_json_file()
lp = LineProfiler()
lp_wrapper = lp(ij.import_data)
lp_wrapper()
lp.print_stats()
ij.import_data()
#print(ij.df_samples_list[7])
print(ij.df_structure)
print(ij.df_variables)
print(ij.concatenated_samples)
#print((ij.build_list_of_samples_array(0)[1].size))
#ij.compute_row_delta_in_all_samples_frames()
#print(ij.df_samples_list[0])"""
print(ij.concatenated_samples)"""

@ -19,16 +19,20 @@ class NetworkGraph():
def __init__(self, graph_struct):
self.graph_struct = graph_struct
self.graph = nx.DiGraph()
self.scalar_indexing_structure = []
self.transition_scalar_indexing_structure = []
self.filtering_structure = []
self.transition_filtering = []
self._nodes_indexes = self.graph_struct.list_of_nodes_indexes()
self._nodes_labels = self.graph_struct.list_of_nodes_labels()
self._fancy_indexing = None
self._time_scalar_indexing_structure = []
self._transition_scalar_indexing_structure = []
self._time_filtering = []
self._transition_filtering = []
def init_graph(self):
self.add_nodes(self.graph_struct.list_of_nodes())
self.add_nodes(self.graph_struct.list_of_nodes_labels())
self.add_edges(self.graph_struct.list_of_edges())
self.build_scalar_indexing_structure()
self.build_columns_filtering_structure()
self._fancy_indexing = self.build_fancy_indexing_structure(0)
self.build_time_scalar_indexing_structure()
self.build_time_columns_filtering_structure()
self.build_transition_scalar_indexing_structure()
self.build_transition_columns_filtering_structure()
@ -44,34 +48,33 @@ class NetworkGraph():
ordered_set = {}
parents = self.get_parents_by_id(node)
for n in parents:
indx = self.graph_struct.get_node_indx(n)
indx = self._nodes_labels.index(n)
ordered_set[n] = indx
{k: v for k, v in sorted(ordered_set.items(), key=lambda item: item[1])}
return list(ordered_set.keys())
def get_ord_set_of_par_of_all_nodes(self):
result = []
for node in self.get_nodes():
for node in self._nodes_labels:
result.append(self.get_ordered_by_indx_set_of_parents(node))
return result
def get_ordered_by_indx_parents_values(self, node):
parents_values = []
parents = self.get_parents_by_id(node)
parents.sort() #Assumo che la structure rifletta l'ordine delle colonne del dataset
parents = self.get_ordered_by_indx_set_of_parents(node)
for n in parents:
parents_values.append(self.graph_struct.get_states_number(n))
return parents_values
def get_ordered_by_indx_parents_values_for_all_nodes(self):
result = []
for node in self.get_nodes(): #TODO bisogna essere sicuri che l'ordine sia coerente con quello del dataset serve un metodo get_nodes_sort_by_indx
for node in self._nodes_labels:
result.append(self.get_ordered_by_indx_parents_values(node))
return result
def get_states_number_of_all_nodes_sorted(self):
states_number_list = []
for node in self.get_nodes(): #TODO SERVE UN get_nodes_ordered!!!!!!
for node in self._nodes_labels:
states_number_list.append(self.get_states_number(node))
return states_number_list
@ -85,24 +88,24 @@ class NetworkGraph():
index_structure.append(np.array(indexes_for_a_node, dtype=np.int))
return index_structure
def build_scalar_indexing_structure_for_a_node(self, node_id, parents_id):
print(parents_id)
def build_time_scalar_indexing_structure_for_a_node(self, node_id, parents_id):
#print(parents_id)
T_vector = np.array([self.graph_struct.variables_frame.iloc[node_id, 1].astype(np.int)])
print(T_vector)
#print(T_vector)
T_vector = np.append(T_vector, [self.graph_struct.variables_frame.iloc[x, 1] for x in parents_id])
print(T_vector)
#print(T_vector)
T_vector = T_vector.cumprod().astype(np.int)
return T_vector
print(T_vector)
#print(T_vector)
def build_scalar_indexing_structure(self):
parents_indexes_list = self.build_fancy_indexing_structure(0)
def build_time_scalar_indexing_structure(self):
parents_indexes_list = self._fancy_indexing
for node_indx, p_indxs in enumerate(parents_indexes_list):
if p_indxs.size == 0:
self.scalar_indexing_structure.append(np.array([self.get_states_number_by_indx(node_indx)], dtype=np.int))
self._time_scalar_indexing_structure.append(np.array([self.get_states_number_by_indx(node_indx)], dtype=np.int))
else:
self.scalar_indexing_structure.append(
self.build_scalar_indexing_structure_for_a_node(node_indx, p_indxs))
self._time_scalar_indexing_structure.append(
self.build_time_scalar_indexing_structure_for_a_node(node_indx, p_indxs))
def build_transition_scalar_indexing_structure_for_a_node(self, node_id, parents_id):
M_vector = np.array([self.graph_struct.variables_frame.iloc[node_id, 1],
@ -112,32 +115,24 @@ class NetworkGraph():
return M_vector
def build_transition_scalar_indexing_structure(self):
parents_indexes_list = self.build_fancy_indexing_structure(0)
parents_indexes_list = self._fancy_indexing
for node_indx, p_indxs in enumerate(parents_indexes_list):
"""if p_indxs.size == 0:
self.scalar_indexing_structure.append(
np.array([self.get_states_number_by_indx(node_indx)], dtype=np.int))
else:"""
self.transition_scalar_indexing_structure.append(
self._transition_scalar_indexing_structure.append(
self.build_transition_scalar_indexing_structure_for_a_node(node_indx, p_indxs))
def build_columns_filtering_structure(self):
parents_indexes_list = self.build_fancy_indexing_structure(0)
def build_time_columns_filtering_structure(self):
parents_indexes_list = self._fancy_indexing
for node_indx, p_indxs in enumerate(parents_indexes_list):
if p_indxs.size == 0:
self.filtering_structure.append(np.append(p_indxs, np.array([node_indx], dtype=np.int)))
self._time_filtering.append(np.append(p_indxs, np.array([node_indx], dtype=np.int)))
else:
self.filtering_structure.append(np.append(np.array([node_indx], dtype=np.int), p_indxs))
self._time_filtering.append(np.append(np.array([node_indx], dtype=np.int), p_indxs))
def build_transition_columns_filtering_structure(self):
parents_indexes_list = self.build_fancy_indexing_structure(0)
parents_indexes_list = self._fancy_indexing
nodes_number = len(parents_indexes_list)
for node_indx, p_indxs in enumerate(parents_indexes_list):
#if p_indxs.size == 0:
#self.filtering_structure.append(np.append(p_indxs, np.array([node_indx], dtype=np.int)))
#else:
self.transition_filtering.append(np.array([node_indx + nodes_number, node_indx, *p_indxs], dtype=np.int))
self._transition_filtering.append(np.array([node_indx + nodes_number, node_indx, *p_indxs], dtype=np.int))
def get_nodes(self):
return list(self.graph.nodes)
@ -160,6 +155,21 @@ class NetworkGraph():
def get_node_indx(self, node_id):
return nx.get_node_attributes(self.graph, 'indx')[node_id]
@property
def time_scalar_indexing_strucure(self):
return self._time_scalar_indexing_structure
@property
def time_filtering(self):
return self._time_filtering
@property
def transition_scalar_indexing_structure(self):
return self._transition_scalar_indexing_structure
@property
def transition_filtering(self):
return self._transition_filtering
@ -175,14 +185,12 @@ s1.build_structure()
g1 = NetworkGraph(s1.structure)
g1.init_graph()
print(g1.graph.number_of_nodes())
print(g1.graph.number_of_edges())
print(nx.get_node_attributes(g1.graph, 'indx')['X'])
for node in g1.get_parents_by_id('Z'):
# print(g1.get_node_by_index(node))
print(node)
print(g1.get_ordered_by_indx_parents_values_for_all_nodes())
print(g1.transition_scalar_indexing_structure)
print(g1.transition_filtering)
print(g1.time_scalar_indexing_strucure)
print(g1.time_filering)
#print(g1.build_fancy_indexing_structure(0))
#print(g1.get_states_number_of_all_nodes_sorted())
g1.build_scalar_indexing_structure()

@ -14,10 +14,10 @@ class ParametersEstimator:
def __init__(self, sample_path, net_graph):
self.sample_path = sample_path
self.net_graph = net_graph
self.scalar_indexes_converter = self.net_graph.scalar_indexing_structure
self.columns_filtering_structure = self.net_graph.filtering_structure
self.transition_scalar_index_converter = self.net_graph.transition_scalar_indexing_structure
self.transition_filtering = self.net_graph.transition_filtering
#self.scalar_indexes_converter = self.net_graph.
#self.columns_filtering_structure = self.net_graph.filtering_structure
#self.transition_scalar_index_converter = self.net_graph.transition_scalar_indexing_structure
#self.transition_filtering = self.net_graph.transition_filtering
self.amalgamated_cims_struct = None
def init_amalgamated_cims_struct(self):
@ -190,22 +190,22 @@ class ParametersEstimator:
def compute_parameters(self):
for node_indx, set_of_cims in enumerate(self.amalgamated_cims_struct.sets_of_cims):
self.compute_state_res_time_for_node(node_indx, self.sample_path.trajectories[0].get_times(),
self.sample_path.trajectories[0].get_trajectory(),
self.columns_filtering_structure[node_indx],
self.scalar_indexes_converter[node_indx],
self.compute_state_res_time_for_node(node_indx, self.sample_path.trajectories.times,
self.sample_path.trajectories.trajectory,
self.net_graph.time_filtering[node_indx],
self.net_graph.time_scalar_indexing_strucure[node_indx],
set_of_cims.state_residence_times)
self.compute_state_transitions_for_a_node(node_indx,
self.sample_path.trajectories[0].get_complete_trajectory(),
self.transition_filtering[node_indx],
self.transition_scalar_index_converter[node_indx],
self.sample_path.trajectories.complete_trajectory,
self.net_graph.transition_filtering[node_indx],
self.net_graph.transition_scalar_indexing_structure[node_indx],
set_of_cims.transition_matrices)
set_of_cims.build_cims(set_of_cims.state_residence_times, set_of_cims.transition_matrices)
def compute_state_res_time_for_node(self, node_indx, times, trajectory, cols_filter, scalar_indexes_struct, T):
#print(times)
#print(times.size)
#print(trajectory)
#print(cols_filter)
#print(scalar_indexes_struct)
@ -270,80 +270,7 @@ g1.init_graph()
pe = ParametersEstimator(s1, g1)
pe.init_amalgamated_cims_struct()
#print(pe.amalgamated_cims_struct.get_set_of_cims(0).get_cims_number())
#print(pe.amalgamated_cims_struct.get_set_of_cims(1).get_cims_number())
#print(pe.amalgamated_cims_struct.get_set_of_cims(2).get_cims_number())
#print(np.shape(s1.trajectories[0].transitions)[0])
#print(pe.columns_filtering_structure)
#print(pe.scalar_indexes_converter)
#print(pe.amalgamated_cims_struct.sets_of_cims[1].state_residence_times)
#print(pe.amalgamated_cims_struct.sets_of_cims[2].state_residence_times)
#print(pe.amalgamated_cims_struct.sets_of_cims[2].transition_matrices)
#print(pe.amalgamated_cims_struct.sets_of_cims[1].transition_matrices)
#print(pe.amalgamated_cims_struct.sets_of_cims[0].transition_matrices)
#pe.compute_state_transitions_for_all_nodes()
lp = LineProfiler()
"""pe.compute_state_residence_time_for_all_nodes()
#pe.parameters_estimation_for_variable(0, pe.sample_path.trajectories[0].get_trajectory()[:, 0],
# pe.sample_path.trajectories[0].get_trajectory()[:, 1], [])
#pe.parameters_estimation_single_trajectory(pe.sample_path.trajectories[0].get_trajectory())
#pe.parameters_estimation()
#lp.add_function(pe.compute_sufficient_statistics_for_row) # add additional function to profile
#lp_wrapper = lp(pe.parameters_estimation_single_trajectory)
#lp_wrapper = lp(pe.parameters_estimation)
#lp_wrapper(pe.sample_path.trajectories[0].get_trajectory())
#lp.print_stats()
#lp_wrapper = lp(pe.parameters_estimation_for_variable)
#lp_wrapper(2, pe.sample_path.trajectories[0].get_times(),
#pe.sample_path.trajectories[0].get_trajectory()[:, 2],
#pe.sample_path.trajectories[0].get_trajectory()[:, [0,1]])
lp_wrapper = lp(pe.parameters_estimation_for_variable_single_parent)
lp_wrapper(1, pe.sample_path.trajectories[0].get_times(),
pe.sample_path.trajectories[0].get_trajectory()[:, 1],
pe.sample_path.trajectories[0].get_trajectory()[:, 2])
lp.print_stats()
#print( pe.sample_path.trajectories[0].get_trajectory()[:, [1,2]])
for matrix in pe.amalgamated_cims_struct.get_set_of_cims(1).actual_cims:
print(matrix.state_residence_times)
print(matrix.state_transition_matrix)
matrix.compute_cim_coefficients()
print(matrix.cim)"""
"""lp_wrapper = lp(pe.parameters_estimation_for_variable_no_parent_in_place)
lp_wrapper(0, pe.sample_path.trajectories[0].get_times(), pe.sample_path.trajectories[0].transitions[:, 0],
pe.sample_path.trajectories[0].get_trajectory()[:, 0] )
lp.print_stats()
lp_wrapper = lp(pe.parameters_estimation_for_variable_single_parent)
lp_wrapper(1, pe.sample_path.trajectories[0].get_times(), pe.sample_path.trajectories[0].transitions[:, 1],
pe.sample_path.trajectories[0].get_trajectory()[:,1], pe.sample_path.trajectories[0].get_trajectory()[:,2] )
lp.print_stats()
lp_wrapper = lp(pe.parameters_estimation_for_variable_multiple_parents)
lp_wrapper(2, pe.sample_path.trajectories[0].get_times(), pe.sample_path.trajectories[0].transitions[:, 2],
pe.sample_path.trajectories[0].get_trajectory()[:,2], pe.sample_path.trajectories[0].get_trajectory()[:, [0,1]] )
lp.print_stats()"""
"""lp_wrapper = lp(pe.parameters_estimation_for_variable_single_parent_in_place)
lp_wrapper(1, pe.sample_path.trajectories[0].get_times(), pe.sample_path.trajectories[0].transitions[:, 1],
pe.sample_path.trajectories[0].get_trajectory()[:,1], pe.sample_path.trajectories[0].get_trajectory()[:,2], (3,3,3) )
lp.print_stats()"""
"""lp_wrapper = lp(pe.compute_sufficient_statistics_for_trajectory)
lp_wrapper(pe.sample_path.trajectories[0].get_times(), pe.sample_path.trajectories[0].actual_trajectory,
pe.sample_path.trajectories[0].transitions, 3)
lp.print_stats()
lp_wrapper = lp(pe.compute_state_res_time_for_node)
lp_wrapper(0, pe.sample_path.trajectories[0].get_times(),
pe.sample_path.trajectories[0].actual_trajectory, [0], [3], np.zeros([3,3], dtype=np.float))
lp.print_stats()
#pe.compute_state_res_time_for_node(0, pe.sample_path.trajectories[0].get_times(),
#pe.sample_path.trajectories[0].actual_trajectory, [0], [3], np.zeros([3,3], dtype=np.float))"""
"""[[2999.2966 2749.2298 3301.5975]
[3797.1737 3187.8345 2939.2009]

@ -19,23 +19,27 @@ class SamplePath:
"""
def __init__(self, files_path):
print()
self.importer = imp.JsonImporter(files_path)
self.trajectories = []
self.structure = None
self._trajectories = None
self._structure = None
def build_trajectories(self):
self.importer.import_data()
#for traj_data_frame in self.importer.df_samples_list:
trajectory = tr.Trajectory(self.importer.build_list_of_samples_array(self.importer.concatenated_samples))
self.trajectories.append(trajectory)
self._trajectories = tr.Trajectory(self.importer.build_list_of_samples_array(self.importer.concatenated_samples))
#self.trajectories.append(trajectory)
self.importer.clear_data_frames()
def build_structure(self):
self.structure = st.Structure(self.importer.df_structure, self.importer.df_variables)
self._structure = st.Structure(self.importer.structure, self.importer.variables)
@property
def trajectories(self):
return self._trajectories
@property
def structure(self):
return self._structure
def get_number_trajectories(self):
return len(self.trajectories)
"""os.getcwd()

@ -88,6 +88,8 @@ class SetOfCims:
cim_to_add.compute_cim_coefficients()
#print(cim_to_add)
self.actual_cims.append(cim_to_add)
self.transition_matrices = None
self.state_residence_times = None
def get_cims(self):
return self.actual_cims

@ -12,6 +12,9 @@ class Structure:
def __init__(self, structure, variables):
self.structure_frame = structure
self.variables_frame = variables
#self._nodes_indexes = self.list_of_nodes_indexes()
self.name_label = variables.columns.values[0]
self.value_label = variables.columns.values[1]
def list_of_edges(self):
edges_list = []
@ -20,17 +23,24 @@ class Structure:
edges_list.append(row_tuple)
return edges_list
def list_of_nodes(self):
return self.variables_frame['Name'].values.tolist() #TODO rimuovere dipendenza diretta dalla key 'Name'
def list_of_nodes_labels(self):
return self.variables_frame[self.name_label].values.tolist()
def list_of_nodes_indexes(self):
nodes_indexes = []
for indx in self.list_of_nodes_labels():
nodes_indexes.append(indx)
return nodes_indexes
def get_node_id(self, node_indx):
return self.variables_frame['Name'][node_indx]
return self.variables_frame[self.name_label][node_indx]
def get_node_indx(self, node_id):
return list(self.variables_frame['Name']).index(node_id)
return list(self.variables_frame[self.name_label]).index(node_id)
def get_states_number(self, node):
return self.variables_frame['Value'][self.get_node_indx(node)]
return self.variables_frame[self.value_label][self.get_node_indx(node)]
def get_states_number_by_indx(self, node_indx):
return self.variables_frame['Value'][node_indx]
#print(self.value_label)
return self.variables_frame[self.value_label][node_indx]

@ -1,8 +1,8 @@
import pandas as pd
import numpy as np
class Trajectory():
class Trajectory:
"""
Rappresenta una traiettoria come un numpy_array contenente n-ple (indx, T_k,S_i,.....,Sj)
Offre i metodi utili alla computazione sulla struttura stessa.
@ -15,17 +15,22 @@ class Trajectory():
"""
def __init__(self, list_of_columns):
self.actual_trajectory = np.array(list_of_columns[1:], dtype=np.int).T
self.times = np.array(list_of_columns[0], dtype=np.float)
def get_trajectory(self):
return self.actual_trajectory[:,:4]
def get_complete_trajectory(self):
return self.actual_trajectory
def get_times(self):
return self.times
print(list_of_columns)
self._actual_trajectory = np.array(list_of_columns[1:], dtype=np.int).T
self._times = np.array(list_of_columns[0], dtype=np.float)
print(self._times)
@property
def trajectory(self):
return self._actual_trajectory[:, :4]
@property
def complete_trajectory(self):
return self._actual_trajectory
@property
def times(self):
return self._times
def size(self):
return self.actual_trajectory.shape[0]