diff --git a/.gitignore b/.gitignore index 7b62f12..01cdb07 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,9 @@ __pycache__ -.vscode \ No newline at end of file +*.csv +*.json +.idea +*.pyc +.coverage +./data/ +./venv/ +./PyCTBN/.scannerwork diff --git a/CTBN_project_dominio.pdf b/CTBN_project_dominio.pdf deleted file mode 100644 index 6bceea1..0000000 Binary files a/CTBN_project_dominio.pdf and /dev/null differ diff --git a/basic_main.py b/basic_main.py deleted file mode 100644 index 17cc15a..0000000 --- a/basic_main.py +++ /dev/null @@ -1,41 +0,0 @@ - -import os -import glob - - -from PyCTBN.json_importer import JsonImporter -from PyCTBN.sample_path import SamplePath -from PyCTBN.network_graph import NetworkGraph -from PyCTBN.parameters_estimator import ParametersEstimator - - -def main(): - read_files = glob.glob(os.path.join('./data', "*.json")) #Take all json files in this dir - #import data - importer = JsonImporter(read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name') - importer.import_data(0) - #Create a SamplePath Obj - s1 = SamplePath(importer) - #Build The trajectries and the structural infos - s1.build_trajectories() - s1.build_structure() - print(s1.structure.edges) - print(s1.structure.nodes_values) - #From The Structure Object build the Graph - g = NetworkGraph(s1.structure) - #Select a node you want to estimate the parameters - node = g.nodes[2] - print("Node", node) - #Init the _graph specifically for THIS node - g.fast_init(node) - #Use SamplePath and Grpah to create a ParametersEstimator Object - p1 = ParametersEstimator(s1.trajectories, g) - #Init the peEst specifically for THIS node - p1.fast_init(node) - #Compute the parameters - sofc1 = p1.compute_parameters_for_node(node) - #The est CIMS are inside the resultant SetOfCIms Obj - print(sofc1.actual_cims) - -if __name__ == "__main__": - main() diff --git a/main_package/.scannerwork/.sonar_lock b/main_package/.scannerwork/.sonar_lock deleted file mode 100644 index e69de29..0000000 diff --git a/main_package/.scannerwork/report-task.txt b/main_package/.scannerwork/report-task.txt deleted file mode 100644 index 5394593..0000000 --- a/main_package/.scannerwork/report-task.txt +++ /dev/null @@ -1,6 +0,0 @@ -projectKey=Ctbn_Project -serverUrl=http://localhost:9000 -serverVersion=8.4.1.35646 -dashboardUrl=http://localhost:9000/dashboard?id=Ctbn_Project -ceTaskId=AXPs4gCNB9mzoAo2hiLI -ceTaskUrl=http://localhost:9000/api/ce/task?id=AXPs4gCNB9mzoAo2hiLI diff --git a/main_package/PyCTBN/__init__.py b/main_package/PyCTBN/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/main_package/PyCTBN/abstract_importer.py b/main_package/PyCTBN/abstract_importer.py deleted file mode 100644 index 283841a..0000000 --- a/main_package/PyCTBN/abstract_importer.py +++ /dev/null @@ -1,149 +0,0 @@ - -from abc import ABC, abstractmethod -import pandas as pd -import typing - - -class AbstractImporter(ABC): - """Abstract class that exposes all the necessary methods to process the trajectories and the net structure. - - :param file_path: the file path - :type file_path: str - :_concatenated_samples: Dataframe containing the concatenation of all the processed trajectories - :_df_structure: Dataframe containing the structure of the network (edges) - :_df_variables: Dataframe containing the nodes cardinalities - :_sorter: A list containing the columns header (excluding the time column) of the `_concatenated_samples` - """ - - def __init__(self, file_path: str): - """Constructor - """ - self._file_path = file_path - self._df_variables = None - self._df_structure = None - self._concatenated_samples = None - self._sorter = None - super().__init__() - - @abstractmethod - def import_data(self) -> None: - """Imports all the trajectories, variables cardinalities, and net edges. - - .. warning:: - The class members ``_df_variables`` and ``_df_structure`` HAVE to be properly constructed - as Pandas Dataframes with the following structure: - Header of _df_structure = [From_Node | To_Node] - Header of _df_variables = [Variable_Label | Variable_Cardinality] - .. note:: - See :class:``JsonImporter`` for an example of implementation of this method. - """ - pass - - @abstractmethod - def build_sorter(self, sample_frame: pd.DataFrame) -> typing.List: - """Initializes the ``_sorter`` class member from a trajectory dataframe, exctracting the header of the frame - and keeping ONLY the variables symbolic labels, cutting out the time label in the header. - - :param sample_frame: The dataframe from which extract the header - :type sample_frame: pandas.DataFrame - :return: A list containing the processed header. - :rtype: List - """ - pass - - def compute_row_delta_sigle_samples_frame(self, sample_frame: pd.DataFrame, - columns_header: typing.List, shifted_cols_header: typing.List) \ - -> pd.DataFrame: - """Computes the difference between each value present in th time column. - Copies and shift by one position up all the values present in the remaining columns. - - :param sample_frame: the traj to be processed - :type sample_frame: pandas.Dataframe - :param columns_header: the original header of sample_frame - :type columns_header: List - :param shifted_cols_header: a copy of columns_header with changed names of the contents - :type shifted_cols_header: List - :return: The processed dataframe - :rtype: pandas.Dataframe - - .. warning:: - the Dataframe ``sample_frame`` has to follow the column structure of this header: - Header of sample_frame = [Time | Variable values] - """ - sample_frame.iloc[:, 0] = sample_frame.iloc[:, 0].diff().shift(-1) - shifted_cols = sample_frame[columns_header].shift(-1).fillna(0).astype('int32') - shifted_cols.columns = shifted_cols_header - sample_frame = sample_frame.assign(**shifted_cols) - sample_frame.drop(sample_frame.tail(1).index, inplace=True) - return sample_frame - - def compute_row_delta_in_all_samples_frames(self, df_samples_list: typing.List) -> None: - """Calls the method ``compute_row_delta_sigle_samples_frame`` on every dataframe present in the list - ``df_samples_list``. - Concatenates the result in the dataframe ``concatanated_samples`` - - :param df_samples_list: the datframe's list to be processed and concatenated - :type df_samples_list: List - - .. warning:: - The Dataframe sample_frame has to follow the column structure of this header: - Header of sample_frame = [Time | Variable values] - The class member self._sorter HAS to be properly INITIALIZED (See class members definition doc) - .. note:: - After the call of this method the class member ``concatanated_samples`` will contain all processed - and merged trajectories - """ - if not self._sorter: - raise RuntimeError("The class member self._sorter has to be INITIALIZED!") - shifted_cols_header = [s + "S" for s in self._sorter] - compute_row_delta = self.compute_row_delta_sigle_samples_frame - proc_samples_list = [compute_row_delta(sample, self._sorter, shifted_cols_header) - for sample in df_samples_list] - self._concatenated_samples = pd.concat(proc_samples_list) - complete_header = self._sorter[:] - complete_header.insert(0,'Time') - complete_header.extend(shifted_cols_header) - self._concatenated_samples = self._concatenated_samples[complete_header] - - def build_list_of_samples_array(self, data_frame: pd.DataFrame) -> typing.List: - """Builds a List containing the columns of data_frame and converts them to a numpy array. - - :param data_frame: the dataframe from which the columns have to be extracted and converted - :type data_frame: pandas.Dataframe - :return: the resulting list of numpy arrays - :rtype: List - """ - columns_list = [data_frame[column].to_numpy() for column in data_frame] - return columns_list - - def clear_concatenated_frame(self) -> None: - """Removes all values in the dataframe concatenated_samples. - """ - self._concatenated_samples = self._concatenated_samples.iloc[0:0] - - @abstractmethod - def dataset_id(self) -> object: - """If the original dataset contains multiple dataset, this method returns a unique id to identify the current - dataset - """ - pass - - @property - def concatenated_samples(self) -> pd.DataFrame: - return self._concatenated_samples - - @property - def variables(self) -> pd.DataFrame: - return self._df_variables - - @property - def structure(self) -> pd.DataFrame: - return self._df_structure - - @property - def sorter(self) -> typing.List: - return self._sorter - - @property - def file_path(self) -> str: - return self._file_path diff --git a/main_package/PyCTBN/cache.py b/main_package/PyCTBN/cache.py deleted file mode 100644 index 592fcf4..0000000 --- a/main_package/PyCTBN/cache.py +++ /dev/null @@ -1,54 +0,0 @@ - -import typing -#import set_of_cims as sofc -from .set_of_cims import SetOfCims - - -class Cache: - """This class acts as a cache of ``SetOfCims`` objects for a node. - - :_list_of_sets_of_parents: a list of ``Sets`` objects of the parents to which the cim in cache at SAME - index is related - :_actual_cache: a list of setOfCims objects - """ - - def __init__(self): - """Constructor Method - """ - self._list_of_sets_of_parents = [] - self._actual_cache = [] - - def find(self, parents_comb: typing.Set) -> SetOfCims: - """ - Tries to find in cache given the symbolic parents combination ``parents_comb`` the ``SetOfCims`` - related to that ``parents_comb``. - - :param parents_comb: the parents related to that ``SetOfCims`` - :type parents_comb: Set - :return: A ``SetOfCims`` object if the ``parents_comb`` index is found in ``_list_of_sets_of_parents``. - None otherwise. - :rtype: SetOfCims - """ - try: - result = self._actual_cache[self._list_of_sets_of_parents.index(parents_comb)] - return result - except ValueError: - return None - - def put(self, parents_comb: typing.Set, socim: SetOfCims) -> None: - """Place in cache the ``SetOfCims`` object, and the related symbolic index ``parents_comb`` in - ``_list_of_sets_of_parents``. - - :param parents_comb: the symbolic set index - :type parents_comb: Set - :param socim: the related SetOfCims object - :type socim: SetOfCims - """ - self._list_of_sets_of_parents.append(parents_comb) - self._actual_cache.append(socim) - - def clear(self) -> None: - """Clear the contents both of ``_actual_cache`` and ``_list_of_sets_of_parents``. - """ - del self._list_of_sets_of_parents[:] - del self._actual_cache[:] \ No newline at end of file diff --git a/main_package/PyCTBN/conditional_intensity_matrix.py b/main_package/PyCTBN/conditional_intensity_matrix.py deleted file mode 100644 index 96a3ae2..0000000 --- a/main_package/PyCTBN/conditional_intensity_matrix.py +++ /dev/null @@ -1,42 +0,0 @@ -import numpy as np - - -class ConditionalIntensityMatrix: - """Abstracts the Conditional Intesity matrix of a node as aggregation of the state residence times vector - and state transition matrix and the actual CIM matrix. - - :param state_residence_times: state residence times vector - :type state_residence_times: numpy.array - :param state_transition_matrix: the transitions count matrix - :type state_transition_matrix: numpy.ndArray - :_cim: the actual cim of the node - """ - def __init__(self, state_residence_times: np.array, state_transition_matrix: np.array): - """Constructor Method - """ - self._state_residence_times = state_residence_times - self._state_transition_matrix = state_transition_matrix - self._cim = self.state_transition_matrix.astype(np.float64) - - def compute_cim_coefficients(self) -> None: - """Compute the coefficients of the matrix _cim by using the following equality q_xx' = M[x, x'] / T[x]. - The class member ``_cim`` will contain the computed cim - """ - np.fill_diagonal(self._cim, self._cim.diagonal() * -1) - self._cim = ((self._cim.T + 1) / (self._state_residence_times + 1)).T - - @property - def state_residence_times(self) -> np.ndarray: - return self._state_residence_times - - @property - def state_transition_matrix(self) -> np.ndarray: - return self._state_transition_matrix - - @property - def cim(self) -> np.ndarray: - return self._cim - - def __repr__(self): - return 'CIM:\n' + str(self.cim) - diff --git a/main_package/PyCTBN/deprecated/sets_of_cims_container.py b/main_package/PyCTBN/deprecated/sets_of_cims_container.py deleted file mode 100644 index cf1cc82..0000000 --- a/main_package/PyCTBN/deprecated/sets_of_cims_container.py +++ /dev/null @@ -1,25 +0,0 @@ -import set_of_cims as socim - - -class SetsOfCimsContainer: - """ - Aggrega un insieme di oggetti SetOfCims - """ - def __init__(self, list_of_keys, states_number_per_node, list_of_parents_states_number, p_combs_list): - self.sets_of_cims = None - self.init_cims_structure(list_of_keys, states_number_per_node, list_of_parents_states_number, p_combs_list) - #self.states_per_variable = states_number - - def init_cims_structure(self, keys, states_number_per_node, list_of_parents_states_number, p_combs_list): - """for indx, key in enumerate(keys): - self.sets_of_cims.append( - socim.SetOfCims(key, list_of_parents_states_number[indx], states_number_per_node[indx]))""" - self.sets_of_cims = [socim.SetOfCims(pair[1], list_of_parents_states_number[pair[0]], states_number_per_node[pair[0]], p_combs_list[pair[0]]) - for pair in enumerate(keys)] - - def get_set_of_cims(self, node_indx): - return self.sets_of_cims[node_indx] - - def get_cims_of_node(self, node_indx, cim_indx): - return self.sets_of_cims[node_indx].get_cim(cim_indx) - diff --git a/main_package/PyCTBN/json_importer.py b/main_package/PyCTBN/json_importer.py deleted file mode 100644 index 283057d..0000000 --- a/main_package/PyCTBN/json_importer.py +++ /dev/null @@ -1,169 +0,0 @@ - -import json -import typing -import pandas as pd - -#import abstract_importer as ai -from .abstract_importer import AbstractImporter - - -class JsonImporter(AbstractImporter): - """Implements the abstracts methods of AbstractImporter and adds all the necessary methods to process and prepare the data in json ext. - with the following structure: - [0] - |_ dyn.cims - |_ dyn.str - |_ samples - |_ variabels - :param file_path: the path of the file that contains tha data to be imported - :type file_path: string - :param samples_label: the reference key for the samples in the trajectories - :type samples_label: string - :param structure_label: the reference key for the structure of the network data - :type structure_label: string - :param variables_label: the reference key for the cardinalites of the nodes data - :type variables_label: string - :param time_key: the key used to identify the timestamps in each trajectory - :type time_key: string - :param variables_key: the key used to identify the names of the variables in the net - :type variables_key: string - :param array_indx: the index of the outer JsonArray to exctract the data from - :type array_indx: int - :_df_samples_list: a Dataframe list in which every dataframe contains a trajectory - """ - - def __init__(self, file_path: str, samples_label: str, structure_label: str, variables_label: str, time_key: str, - variables_key: str, array_indx: int): - """Constructor method - """ - self._samples_label = samples_label - self._structure_label = structure_label - self._variables_label = variables_label - self._time_key = time_key - self._variables_key = variables_key - self._df_samples_list = None - self._array_indx = array_indx - super(JsonImporter, self).__init__(file_path) - - def import_data(self) -> None: - """Implements the abstract method of :class:`AbstractImporter` - """ - raw_data = self.read_json_file() - self._df_samples_list = self.import_trajectories(raw_data) - self._sorter = self.build_sorter(self._df_samples_list[0]) - self.compute_row_delta_in_all_samples_frames(self._df_samples_list) - self.clear_data_frame_list() - self._df_structure = self.import_structure(raw_data) - self._df_variables = self.import_variables(raw_data) - - def import_trajectories(self, raw_data: typing.List) -> typing.List: - """Imports the trajectories from the list of dicts ``raw_data``. - - :param raw_data: List of Dicts - :type raw_data: List - :return: List of dataframes containing all the trajectories - :rtype: List - """ - return self.normalize_trajectories(raw_data, self._array_indx, self._samples_label) - - def import_structure(self, raw_data: typing.List) -> pd.DataFrame: - """Imports in a dataframe the data in the list raw_data at the key ``_structure_label`` - - :param raw_data: List of Dicts - :type raw_data: List - :return: Dataframe containg the starting node a ending node of every arc of the network - :rtype: pandas.Dataframe - """ - return self.one_level_normalizing(raw_data, self._array_indx, self._structure_label) - - def import_variables(self, raw_data: typing.List) -> pd.DataFrame: - """Imports the data in ``raw_data`` at the key ``_variables_label``. - - :param raw_data: List of Dicts - :type raw_data: List - :return: Datframe containg the variables simbolic labels and their cardinalities - :rtype: pandas.Dataframe - """ - return self.one_level_normalizing(raw_data, self._array_indx, self._variables_label) - - def read_json_file(self) -> typing.List: - """Reads the JSON file in the path self.filePath - - :return: The contents of the json file - :rtype: List - """ - with open(self._file_path) as f: - data = json.load(f) - return data - - def one_level_normalizing(self, raw_data: typing.List, indx: int, key: str) -> pd.DataFrame: - """Extracts the one-level nested data in the list ``raw_data`` at the index ``indx`` at the key ``key``. - - :param raw_data: List of Dicts - :type raw_data: List - :param indx: The index of the array from which the data have to be extracted - :type indx: int - :param key: the key for the Dicts from which exctract data - :type key: string - :return: A normalized dataframe - :rtype: pandas.Datframe - """ - return pd.DataFrame(raw_data[indx][key]) - - def normalize_trajectories(self, raw_data: typing.List, indx: int, trajectories_key: str) -> typing.List: - """ - Extracts the trajectories in ``raw_data`` at the index ``index`` at the key ``trajectories key``. - - :param raw_data: List of Dicts - :type raw_data: List - :param indx: The index of the array from which the data have to be extracted - :type indx: int - :param trajectories_key: the key of the trajectories objects - :type trajectories_key: string - :return: A list of daframes containg the trajectories - :rtype: List - """ - dataframe = pd.DataFrame - smps = raw_data[indx][trajectories_key] - df_samples_list = [dataframe(sample) for sample in smps] - return df_samples_list - - def build_sorter(self, sample_frame: pd.DataFrame) -> typing.List: - """Implements the abstract method build_sorter of the :class:`AbstractImporter` for this dataset - """ - columns_header = list(sample_frame.columns.values) - columns_header.remove(self._time_key) - return columns_header - - def clear_data_frame_list(self) -> None: - """Removes all values present in the dataframes in the list ``_df_samples_list``. - """ - for indx in range(len(self._df_samples_list)): - self._df_samples_list[indx] = self._df_samples_list[indx].iloc[0:0] - - def dataset_id(self) -> object: - return self._array_indx - - def import_sampled_cims(self, raw_data: typing.List, indx: int, cims_key: str) -> typing.Dict: - """Imports the synthetic CIMS in the dataset in a dictionary, using variables labels - as keys for the set of CIMS of a particular node. - - :param raw_data: List of Dicts - :type raw_data: List - :param indx: The index of the array from which the data have to be extracted - :type indx: int - :param cims_key: the key where the json object cims are placed - :type cims_key: string - :return: a dictionary containing the sampled CIMS for all the variables in the net - :rtype: Dictionary - """ - cims_for_all_vars = {} - for var in raw_data[indx][cims_key]: - sampled_cims_list = [] - cims_for_all_vars[var] = sampled_cims_list - for p_comb in raw_data[indx][cims_key][var]: - cims_for_all_vars[var].append(pd.DataFrame(raw_data[indx][cims_key][var][p_comb]).to_numpy()) - return cims_for_all_vars - - - diff --git a/main_package/PyCTBN/network_graph.py b/main_package/PyCTBN/network_graph.py deleted file mode 100644 index 6ac6812..0000000 --- a/main_package/PyCTBN/network_graph.py +++ /dev/null @@ -1,297 +0,0 @@ - -import typing -import networkx as nx -import numpy as np - -#import structure as st -from .structure import Structure - - -class NetworkGraph: - """Abstracts the infos contained in the Structure class in the form of a directed graph. - Has the task of creating all the necessary filtering and indexing structures for parameters estimation - - :param graph_struct: the ``Structure`` object from which infos about the net will be extracted - :type graph_struct: Structure - :_graph: directed graph - :_aggregated_info_about_nodes_parents: a structure that contains all the necessary infos - about every parents of the node of which all the indexing and filtering structures will be constructed. - :_time_scalar_indexing_structure: the indexing structure for state res time estimation - :_transition_scalar_indexing_structure: the indexing structure for transition computation - :_time_filtering: the columns filtering structure used in the computation of the state res times - :_transition_filtering: the columns filtering structure used in the computation of the transition - from one state to another - :_p_combs_structure: all the possible parents states combination for the node of interest - """ - - def __init__(self, graph_struct: Structure): - """Constructor Method - """ - self._graph_struct = graph_struct - self._graph = nx.DiGraph() - self._aggregated_info_about_nodes_parents = None - self._time_scalar_indexing_structure = None - self._transition_scalar_indexing_structure = None - self._time_filtering = None - self._transition_filtering = None - self._p_combs_structure = None - - def fast_init(self, node_id: str) -> None: - """Initializes all the necessary structures for parameters estimation of the node identified by the label - node_id - - :param node_id: the label of the node - :type node_id: string - """ - self.add_nodes(self._graph_struct.nodes_labels) - self.add_edges(self._graph_struct.edges) - self._aggregated_info_about_nodes_parents = self.get_ordered_by_indx_set_of_parents(node_id) - p_indxs = self._aggregated_info_about_nodes_parents[1] - p_vals = self._aggregated_info_about_nodes_parents[2] - self._time_scalar_indexing_structure = self.build_time_scalar_indexing_structure_for_a_node(node_id, - p_vals) - self._transition_scalar_indexing_structure = self.build_transition_scalar_indexing_structure_for_a_node(node_id, - p_vals) - node_indx = self.get_node_indx(node_id) - self._time_filtering = self.build_time_columns_filtering_for_a_node(node_indx, p_indxs) - self._transition_filtering = self.build_transition_filtering_for_a_node(node_indx, p_indxs) - self._p_combs_structure = self.build_p_comb_structure_for_a_node(p_vals) - - def add_nodes(self, list_of_nodes: typing.List) -> None: - """Adds the nodes to the ``_graph`` contained in the list of nodes ``list_of_nodes``. - Sets all the properties that identify a nodes (index, positional index, cardinality) - - :param list_of_nodes: the nodes to add to ``_graph`` - :type list_of_nodes: List - """ - nodes_indxs = self._graph_struct.nodes_indexes - nodes_vals = self._graph_struct.nodes_values - pos = 0 - for id, node_indx, node_val in zip(list_of_nodes, nodes_indxs, nodes_vals): - self._graph.add_node(id, indx=node_indx, val=node_val, pos_indx=pos) - pos += 1 - - def add_edges(self, list_of_edges: typing.List) -> None: - """Add the edges to the ``_graph`` contained in the list ``list_of_edges``. - - :param list_of_edges: the list containing of tuples containing the edges - :type list_of_edges: List - """ - self._graph.add_edges_from(list_of_edges) - - def get_ordered_by_indx_set_of_parents(self, node: str) -> typing.Tuple: - """Builds the aggregated structure that holds all the infos relative to the parent set of the node, namely - (parents_labels, parents_indexes, parents_cardinalities). - - :param node: the label of the node - :type node: string - :return: a tuple containing all the parent set infos - :rtype: Tuple - """ - parents = self.get_parents_by_id(node) - nodes = self._graph_struct.nodes_labels - d = {v: i for i, v in enumerate(nodes)} - sorted_parents = sorted(parents, key=lambda v: d[v]) - get_node_indx = self.get_node_indx - p_indxes = [get_node_indx(node) for node in sorted_parents] - p_values = [self.get_states_number(node) for node in sorted_parents] - return (sorted_parents, p_indxes, p_values) - - def build_time_scalar_indexing_structure_for_a_node(self, node_id: str, parents_vals: typing.List) -> np.ndarray: - """Builds an indexing structure for the computation of state residence times values. - - :param node_id: the node label - :type node_id: string - :param parents_vals: the caridinalites of the node's parents - :type parents_vals: List - :return: The time indexing structure - :rtype: numpy.ndArray - """ - T_vector = np.array([self.get_states_number(node_id)]) - T_vector = np.append(T_vector, parents_vals) - T_vector = T_vector.cumprod().astype(np.int) - return T_vector - - def build_transition_scalar_indexing_structure_for_a_node(self, node_id: str, parents_vals: typing.List) \ - -> np.ndarray: - """Builds an indexing structure for the computation of state transitions values. - - :param node_id: the node label - :type node_id: string - :param parents_vals: the caridinalites of the node's parents - :type parents_vals: List - :return: The transition indexing structure - :rtype: numpy.ndArray - """ - node_states_number = self.get_states_number(node_id) - M_vector = np.array([node_states_number, - node_states_number]) - M_vector = np.append(M_vector, parents_vals) - M_vector = M_vector.cumprod().astype(np.int) - return M_vector - - def build_time_columns_filtering_for_a_node(self, node_indx: int, p_indxs: typing.List) -> np.ndarray: - """ - Builds the necessary structure to filter the desired columns indicated by ``node_indx`` and ``p_indxs`` - in the dataset. - This structute will be used in the computation of the state res times. - :param node_indx: the index of the node - :type node_indx: int - :param p_indxs: the indexes of the node's parents - :type p_indxs: List - :return: The filtering structure for times estimation - :rtype: numpy.ndArray - """ - return np.append(np.array([node_indx], dtype=np.int), p_indxs).astype(np.int) - - def build_transition_filtering_for_a_node(self, node_indx: int, p_indxs: typing.List) -> np.ndarray: - """Builds the necessary structure to filter the desired columns indicated by ``node_indx`` and ``p_indxs`` - in the dataset. - This structure will be used in the computation of the state transitions values. - :param node_indx: the index of the node - :type node_indx: int - :param p_indxs: the indexes of the node's parents - :type p_indxs: List - :return: The filtering structure for transitions estimation - :rtype: numpy.ndArray - """ - nodes_number = self._graph_struct.total_variables_number - return np.array([node_indx + nodes_number, node_indx, *p_indxs], dtype=np.int) - - def build_p_comb_structure_for_a_node(self, parents_values: typing.List) -> np.ndarray: - """ - Builds the combinatorial structure that contains the combinations of all the values contained in - ``parents_values``. - - :param parents_values: the cardinalities of the nodes - :type parents_values: List - :return: A numpy matrix containing a grid of the combinations - :rtype: numpy.ndArray - """ - tmp = [] - for val in parents_values: - tmp.append([x for x in range(val)]) - if len(parents_values) > 0: - parents_comb = np.array(np.meshgrid(*tmp)).T.reshape(-1, len(parents_values)) - if len(parents_values) > 1: - tmp_comb = parents_comb[:, 1].copy() - parents_comb[:, 1] = parents_comb[:, 0].copy() - parents_comb[:, 0] = tmp_comb - else: - parents_comb = np.array([[]], dtype=np.int) - return parents_comb - - def get_parents_by_id(self, node_id) -> typing.List: - """Returns a list of labels of the parents of the node ``node_id`` - - :param node_id: the node label - :type node_id: string - :return: a List of labels of the parents - :rtype: List - """ - return list(self._graph.predecessors(node_id)) - - def get_states_number(self, node_id) -> int: - return self._graph.nodes[node_id]['val'] - - def get_node_indx(self, node_id) -> int: - return nx.get_node_attributes(self._graph, 'indx')[node_id] - - def get_positional_node_indx(self, node_id) -> int: - return self._graph.nodes[node_id]['pos_indx'] - - @property - def nodes(self) -> typing.List: - return self._graph_struct.nodes_labels - - @property - def edges(self) -> typing.List: - return list(self._graph.edges) - - @property - def nodes_indexes(self) -> np.ndarray: - return self._graph_struct.nodes_indexes - - @property - def nodes_values(self) -> np.ndarray: - return self._graph_struct.nodes_values - - @property - def time_scalar_indexing_strucure(self) -> np.ndarray: - return self._time_scalar_indexing_structure - - @property - def time_filtering(self) -> np.ndarray: - return self._time_filtering - - @property - def transition_scalar_indexing_structure(self) -> np.ndarray: - return self._transition_scalar_indexing_structure - - @property - def transition_filtering(self) -> np.ndarray: - return self._transition_filtering - - @property - def p_combs(self) -> np.ndarray: - return self._p_combs_structure - - """ - ##############These Methods are actually unused but could become useful in the near future################ - - def init_graph(self): - self.add_nodes(self._nodes_labels) - self.add_edges(self._graph_struct.edges) - self._aggregated_info_about_nodes_parents = self.get_ord_set_of_par_of_all_nodes() - self._fancy_indexing = self.build_fancy_indexing_structure(0) - self.build_scalar_indexing_structures() - self.build_time_columns_filtering_structure() - self.build_transition_columns_filtering_structure() - self._p_combs_structure = self.build_p_combs_structure() - - def build_time_columns_filtering_structure(self): - nodes_indxs = self._nodes_indexes - self._time_filtering = [np.append(np.array([node_indx], dtype=np.int), p_indxs).astype(np.int) - for node_indx, p_indxs in zip(nodes_indxs, self._fancy_indexing)] - - def build_transition_columns_filtering_structure(self): - nodes_number = self._graph_struct.total_variables_number - nodes_indxs = self._nodes_indexes - self._transition_filtering = [np.array([node_indx + nodes_number, node_indx, *p_indxs], dtype=np.int) - for node_indx, p_indxs in zip(nodes_indxs, - self._fancy_indexing)] - - def build_scalar_indexing_structures(self): - parents_values_for_all_nodes = self.get_ordered_by_indx_parents_values_for_all_nodes() - build_transition_scalar_indexing_structure_for_a_node = \ - self.build_transition_scalar_indexing_structure_for_a_node - build_time_scalar_indexing_structure_for_a_node = self.build_time_scalar_indexing_structure_for_a_node - aggr = [(build_transition_scalar_indexing_structure_for_a_node(node_id, p_vals), - build_time_scalar_indexing_structure_for_a_node(node_id, p_vals)) - for node_id, p_vals in - zip(self._nodes_labels, - parents_values_for_all_nodes)] - self._transition_scalar_indexing_structure = [i[0] for i in aggr] - self._time_scalar_indexing_structure = [i[1] for i in aggr] - - def build_p_combs_structure(self): - parents_values_for_all_nodes = self.get_ordered_by_indx_parents_values_for_all_nodes() - p_combs_struct = [self.build_p_comb_structure_for_a_node(p_vals) for p_vals in parents_values_for_all_nodes] - return p_combs_struct - - def get_ord_set_of_par_of_all_nodes(self): - get_ordered_by_indx_set_of_parents = self.get_ordered_by_indx_set_of_parents - result = [get_ordered_by_indx_set_of_parents(node) for node in self._nodes_labels] - return result - - def get_ordered_by_indx_parents_values_for_all_nodes(self): - pars_values = [i[2] for i in self._aggregated_info_about_nodes_parents] - return pars_values - - def build_fancy_indexing_structure(self, start_indx): - if start_indx > 0: - pass - else: - fancy_indx = [i[1] for i in self._aggregated_info_about_nodes_parents] - return fancy_indx - """ \ No newline at end of file diff --git a/main_package/PyCTBN/original_ctpc_algorithm.py b/main_package/PyCTBN/original_ctpc_algorithm.py deleted file mode 100644 index 45e539a..0000000 --- a/main_package/PyCTBN/original_ctpc_algorithm.py +++ /dev/null @@ -1,495 +0,0 @@ -import glob -import json -import os -from itertools import combinations -import typing - -import numpy as np -import pandas as pd -from line_profiler import LineProfiler -from scipy.stats import chi2 as chi2_dist -from scipy.stats import f as f_dist -from tqdm import tqdm - -from .abstract_importer import AbstractImporter - - -class OriginalCTPCAlgorithm(AbstractImporter): - """ - Implements the abstracts methods of AbstractImporter and adds all the necessary methods to process and prepare the data in json ext. - with the following structure: - [0] - |_ dyn.cims - |_ dyn.str - |_ samples - |_ variabels - :_file_path: the path of the file that contains tha data to be imported - :_samples_label: the reference key for the samples in the trajectories - :_structure_label: the reference key for the structure of the network data - :_variables_label: the reference key for the cardinalites of the nodes data - :_time_key: the key used to identify the timestamps in each trajectory - :_variables_key: the key used to identify the names of the variables in the net - :_df_samples_list: a Dataframe list in which every df contains a trajectory - """ - - def dataset_id(self) -> object: - pass - - def __init__(self, file_path: str, samples_label: str, structure_label: str, variables_label: str, time_key: str, - variables_key: str, array_indx: int): - """ - Parameters: - file_path: the path of the file that contains tha data to be imported - :_samples_label: the reference key for the samples in the trajectories - :_structure_label: the reference key for the structure of the network data - :_variables_label: the reference key for the cardinalites of the nodes data - :_time_key: the key used to identify the timestamps in each trajectory - :_variables_key: the key used to identify the names of the variables in the net - """ - self.samples_label = samples_label - self.structure_label = structure_label - self.variables_label = variables_label - self.time_key = time_key - self.variables_key = variables_key - self.df_samples_list = None - self.trajectories = None - self._array_indx = array_indx - self.matrix = None - super(OriginalCTPCAlgorithm, self).__init__(file_path) - - def import_data(self): - """ - Imports and prepares all data present needed for subsequent processing. - Parameters: - :void - Returns: - _void - """ - raw_data = self.read_json_file() - self.df_samples_list = self.import_trajectories(raw_data) - self._sorter = self.build_sorter(self.df_samples_list[0]) - #self.compute_row_delta_in_all_samples_frames(self._df_samples_list) - #self.clear_data_frame_list() - self._df_structure = self.import_structure(raw_data) - self._df_variables = self.import_variables(raw_data, self._sorter) - - - def import_trajectories(self, raw_data: typing.List): - """ - Imports the trajectories in the list of dicts raw_data. - Parameters: - :raw_data: List of Dicts - Returns: - :List of dataframes containing all the trajectories - """ - return self.normalize_trajectories(raw_data, self._array_indx, self.samples_label) - - def import_structure(self, raw_data: typing.List) -> pd.DataFrame: - """ - Imports in a dataframe the data in the list raw_data at the key _structure_label - - Parameters: - :raw_data: the data - Returns: - :Daframe containg the starting node a ending node of every arc of the network - """ - return self.one_level_normalizing(raw_data, self._array_indx, self.structure_label) - - def import_variables(self, raw_data: typing.List, sorter: typing.List) -> pd.DataFrame: - """ - Imports the data in raw_data at the key _variables_label. - Sorts the row of the dataframe df_variables using the list sorter. - - Parameters: - :raw_data: the data - :sorter: the header of the dataset containing only variables symbolic labels - Returns: - :Datframe containg the variables simbolic labels and their cardinalities - """ - return self.one_level_normalizing(raw_data, self._array_indx, self.variables_label) - #TODO Usando come Pre-requisito l'ordinamento del frame _df_variables uguale a quello presente in - #TODO self _sorter questo codice risulta inutile - """self._df_variables[self._variables_key] = self._df_variables[self._variables_key].astype("category") - self._df_variables[self._variables_key] = self._df_variables[self._variables_key].cat.set_categories(sorter) - self._df_variables = self._df_variables.sort_values([self._variables_key]) - self._df_variables.reset_index(inplace=True) - self._df_variables.drop('index', axis=1, inplace=True) - #print("Var Frame", self._df_variables) - """ - - def read_json_file(self) -> typing.List: - """ - Reads the JSON file in the path self.filePath - - Parameters: - :void - Returns: - :data: the contents of the json file - - """ - with open(self._file_path) as f: - data = json.load(f) - return data - - def one_level_normalizing(self, raw_data: typing.List, indx: int, key: str) -> pd.DataFrame: - """ - Extracts the one-level nested data in the list raw_data at the index indx at the key key - - Parameters: - :raw_data: List of Dicts - :indx: The index of the array from which the data have to be extracted - :key: the key for the Dicts from which exctract data - Returns: - :a normalized dataframe: - - """ - return pd.DataFrame(raw_data[indx][key]) - - def normalize_trajectories(self, raw_data: typing.List, indx: int, trajectories_key: str): - """ - Extracts the traj in raw_data at the index index at the key trajectories key. - - Parameters: - :raw_data: the data - :indx: the index of the array from which extract data - :trajectories_key: the key of the trajectories objects - Returns: - :A list of daframes containg the trajectories - """ - dataframe = pd.DataFrame - smps = raw_data[indx][trajectories_key] - df_samples_list = [dataframe(sample) for sample in smps] - return df_samples_list - #columns_header = list(self._df_samples_list[0].columns.values) - #columns_header.remove(self._time_key) - #self._sorter = columns_header - - def build_sorter(self, sample_frame: pd.DataFrame) -> typing.List: - """ - Implements the abstract method build_sorter for this dataset - """ - columns_header = list(sample_frame.columns.values) - columns_header.remove(self.time_key) - return columns_header - - def clear_data_frame_list(self): - """ - Removes all values present in the dataframes in the list _df_samples_list - Parameters: - :void - Returns: - :void - """ - for indx in range(len(self.df_samples_list)): - self.df_samples_list[indx] = self.df_samples_list[indx].iloc[0:0] - - - def prepare_trajectories(self, trajectories, variables): - """ - Riformula le traiettorie per rendere più efficiente la fase di computazione delle cim - - Parameters - ------------- - trajectories: [pandas.DataFrame] - Un array di pandas dataframe contenente tutte le traiettorie. Ogni array avrà una - colonna per il timestamp (sempre la prima) e n colonne una per ogni variabili - presente nella rete. - variables: pandas.DataFrame - Pandas dataframe contenente due colonne: il nome della variabile e cardinalità - della variabile - """ - dimensions = np.array([x.shape[0] - 1 for x in trajectories], dtype=np.int) - ret_array = np.zeros([dimensions.sum(), trajectories[0].shape[1] * 2]) - cum_dim = np.zeros(len(trajectories) + 1, dtype=np.int) - cum_dim[1:] = dimensions.cumsum() - dimensions.cumsum() - for it in range(len(trajectories)): - tmp = trajectories[it].to_numpy() - dim = tmp.shape[1] - ret_array[cum_dim[it]:cum_dim[it + 1], 0:dim] = tmp[:-1] - ret_array[cum_dim[it]:cum_dim[it + 1], dim] = np.diff(tmp[:, 0]) - ret_array[cum_dim[it]:cum_dim[it + 1], dim + 1:] = np.roll(tmp[:, 1:], -1, axis=0)[:-1] - self.trajectories = ret_array - #self.variables = variables - - @staticmethod - def _compute_cim(trajectories, child_id, parents_id, T_vector, M_vector, parents_comb, M, T): - """Funzione interna per calcolare le CIM - - Parameters: - ----------- - trajectories: np.array - Array contenente le traiettorie. (self.trajectories) - - child_id: int - Indice del nodo di cui si vogliono calcolare le cim - - parents:id: [int] - Array degli indici dei genitori nel nodo child_id - - T_vector: np.array - Array numpy per l'indicizzazione dell'array T - - M_vector: np.array - Array numpy per l'indicizzazione dell'array M - - parents_comb: [(int)] - Array di tuple contenenti tutte le possibili combinazioni dei genitori di child_id - - M: np.array - Array numpy contenente la statistica sufficiente M - - T: np.array - Array numpy contenente la statistica sufficiente T - - Returns: - --------- - CIM: np.array - Array numpy contenente le CIM - """ - #print(T) - diag_indices = np.array([x * M.shape[1] + x % M.shape[1] for x in range(M.shape[0] * M.shape[1])], - dtype=np.int64) - #print(diag_indices) - T_filter = np.array([child_id, *parents_id], dtype=np.int) + 1 - #print("TFilter",T_filter) - #print("TVector", T_vector) - #print("Trajectories", trajectories) - #print("Actual TVect",T_vector / T_vector[0]) - #print("Masked COlumns", trajectories[:, T_filter]) # Colonne non shiftate dei values - #print("Masked Multiplied COlumns",trajectories[:, T_filter] * (T_vector / T_vector[0]) ) - #print("Summing",np.sum(trajectories[:, T_filter] * (T_vector / T_vector[0]), axis=1)) - #print("Deltas",trajectories[:, int(trajectories.shape[1] / 2)]) # i delta times - assert np.sum(trajectories[:, T_filter] * (T_vector / T_vector[0]), axis=1).size == \ - trajectories[:, int(trajectories.shape[1] / 2)].size - #print(T_vector[-1]) - T[:] = np.bincount(np.sum(trajectories[:, T_filter] * T_vector / T_vector[0], axis=1).astype(np.int), \ - trajectories[:, int(trajectories.shape[1] / 2)], minlength=T_vector[-1]).reshape(-1, - T.shape[1]) - #print("Shape", T.shape[1]) - #print(np.bincount(np.sum(trajectories[:, T_filter] * T_vector / T_vector[0], axis=1).astype(np.int), \ - #trajectories[:, int(trajectories.shape[1] / 2)], minlength=T_vector[-1])) - ###### Transitions ####### - - #print("Shifted Node column", trajectories[:, int(trajectories.shape[1] / 2) + 1 + child_id].astype(np.int)) - #print("Step 2", trajectories[:, int(trajectories.shape[1] / 2) + 1 + child_id].astype(np.int) >= 0) - trj_tmp = trajectories[trajectories[:, int(trajectories.shape[1] / 2) + 1 + child_id].astype(np.int) >= 0] - #print("Trj Temp", trj_tmp) - - - M_filter = np.array([child_id, child_id, *parents_id], dtype=np.int) + 1 - #print("MFilter", M_filter) - M_filter[0] += int(trj_tmp.shape[1] / 2) - #print("MFilter", M_filter) - #print("MVector", M_vector) - #print("Division", M_vector / M_vector[0]) - #print("Masked Traj temp", (trj_tmp[:, M_filter])) - #print("Masked Multiplied Traj temp", trj_tmp[:, M_filter] * M_vector / M_vector[0]) - #print("Summing", np.sum(trj_tmp[:, M_filter] * M_vector / M_vector[0], axis=1)) - #print(M.shape[2]) - - M[:] = np.bincount(np.sum(trj_tmp[:, M_filter] * M_vector / M_vector[0], axis=1).astype(np.int), \ - minlength=M_vector[-1]).reshape(-1, M.shape[1], M.shape[2]) - #print("M!!!!!!!", M) - M_raveled = M.ravel() - #print("Raveled", M_raveled) - M_raveled[diag_indices] = 0 - M_raveled[diag_indices] = np.sum(M, axis=2).ravel() - #print("Raveled", M_raveled) - q = (M.ravel()[diag_indices].reshape(-1, M.shape[1]) + 1) / (T + 1) - theta = (M + 1) / (M.ravel()[diag_indices].reshape(-1, M.shape[2], 1) + 1) - negate_main_diag = np.ones((M.shape[1], M.shape[2])) - np.fill_diagonal(negate_main_diag, -1) - theta = np.multiply(theta, negate_main_diag) - return theta * q.reshape(-1, M.shape[2], 1) - - def compute_cim(self, child_id, parents_id): - """Metodo utilizzato per calcolare le CIM di un nodo dati i suoi genitori - - Parameters: - ----------- - child_id: int - Indice del nodo di cui si vogliono calcolare le cim - - parents:id: [int] - Array degli indici dei genitori nel nodo child_id - - Return: - ---------- - Restituisce una tupla contenente: - parents_comb: [(int)] - Array di tuple contenenti tutte le possibili combinazioni dei genitori di child_id - - M: np.array - Array numpy contenente la statistica sufficiente M - - T: np.array - Array numpy contenente la statistica sufficiente T - - CIM: np.array - Array numpy contenente le CIM - - """ - tmp = [] - child_id = int(child_id) - parents_id = np.array(parents_id, dtype=np.int) - parents_id.sort() - #print("Parents id",parents_id) - #breakpoint() - for idx in parents_id: - tmp.append([x for x in range(self.variables.loc[idx, "Value"])]) - #print("TIMP", tmp) - if len(parents_id) > 0: - parents_comb = np.array(np.meshgrid(*tmp)).T.reshape(-1, len(parents_id)) - #print(np.argsort(parents_comb)) - #print("PArents COmb", parents_comb) - if len(parents_id) > 1: - tmp_comb = parents_comb[:, 1].copy() - #print(tmp_comb) - parents_comb[:, 1] = parents_comb[:, 0].copy() - parents_comb[:, 0] = tmp_comb - else: - parents_comb = np.array([[]], dtype=np.int) - #print("PARENTS COMB ", parents_comb) - M = np.zeros([max(1, parents_comb.shape[0]), \ - self.variables.loc[child_id, "Value"], \ - self.variables.loc[child_id, "Value"]], dtype=np.int) - #print(M) - - T = np.zeros([max(1, parents_comb.shape[0]), \ - self.variables.loc[child_id, "Value"]], dtype=np.float) - #print(T) - #print("T Vector") - #print(child_id) - T_vector = np.array([self.variables.iloc[child_id, 1].astype(np.int)]) - #print(T_vector) - #for x in parents_id: - #print(self.variables.iloc[x, 1]) - T_vector = np.append(T_vector, [self.variables.iloc[x, 1] for x in parents_id]) - #print(T_vector) - T_vector = T_vector.cumprod().astype(np.int) - #print(T_vector) - - #print("M Vector") - M_vector = np.array([self.variables.iloc[child_id, 1], self.variables.iloc[child_id, 1].astype(np.int)]) - #print(M_vector) - M_vector = np.append(M_vector, [self.variables.iloc[x, 1] for x in parents_id]) - - #for x in parents_id: - #print(self.variables.iloc[x, 1]) - M_vector = M_vector.cumprod().astype(np.int) - #print("MVECTOR", M_vector) - - CIM = self._compute_cim(self.trajectories, child_id, parents_id, T_vector, M_vector, parents_comb, M, T) - return parents_comb, M, T, CIM - - def independence_test(self, to_var, from_var, sep_set, alpha_exp, alpha_chi2, thumb_threshold): - #print("To var", to_var) - #print("From var", from_var) - #print("sep set", sep_set) - parents = np.array(sep_set) - parents = np.append(parents, from_var) - parents.sort() - #print("PARENTS", parents) - parents_no_from_mask = parents != from_var - #print("Parents Comb NO Mask", parents_no_from_mask) - - - parents_comb_from, M_from, T_from, CIM_from = self.compute_cim(to_var, parents) - #print("Parents Comb From", parents_comb_from) - - #print("C2:", CIM_from) - - if self.variables.loc[to_var, "Value"] > 2: - df = (self.variables.loc[to_var, "Value"] - 1) ** 2 - df = df * (self.variables.loc[from_var, "Value"]) - for v in sep_set: - df = df * (self.variables.loc[v, "Value"]) - - if np.all(np.sum(np.diagonal(M_from, axis1=1, axis2=2), axis=1) / df < thumb_threshold): - return False - #print("Before CHi quantile", self.variables.loc[to_var, "Value"] - 1) - chi_2_quantile = chi2_dist.ppf(1 - alpha_chi2, self.variables.loc[to_var, "Value"] - 1) - #print("Chi Quantile", chi_2_quantile) - - parents_comb, M, T, CIM = self.compute_cim(to_var, parents[parents_no_from_mask]) - - #print("C1", CIM) - - - for comb_id in range(parents_comb.shape[0]): - # Bad code, inefficient - #print("COMB ID", comb_id) - - if parents.shape[0] > 1: - #print("STEP 0", parents_comb_from[:, parents_no_from_mask]) - #print("STEP 1", np.all(parents_comb_from[:, parents_no_from_mask] == parents_comb[comb_id], axis=1)) - #print("STEP 2", np.argwhere( - #np.all(parents_comb_from[:, parents_no_from_mask] == parents_comb[comb_id], axis=1)).ravel()) - tmp_parents_comb_from_ids = np.argwhere( - np.all(parents_comb_from[:, parents_no_from_mask] == parents_comb[comb_id], axis=1)).ravel() - else: - tmp_parents_comb_from_ids = np.array([x for x in range(parents_comb_from.shape[0])]) - - #print("TMP PAR COMB IDSSSS:", tmp_parents_comb_from_ids) - for comb_from_id in tmp_parents_comb_from_ids: - #print("COMB ID FROM", comb_from_id) - diag = np.diag(CIM[comb_id]) - diag_from = np.diag(CIM_from[comb_from_id]) - #print("Diag C2", diag_from) - #print("Diag C1", diag) - r1 = np.diag(M[comb_id]) - r2 = np.diag(M_from[comb_from_id]) - stats = diag_from / diag - #print("Exponential Test", stats, r1, r2) - for id_diag in range(diag.shape[0]): - if stats[id_diag] < f_dist.ppf(alpha_exp / 2, r1[id_diag], r2[id_diag]) or \ - stats[id_diag] > f_dist.ppf(1 - alpha_exp / 2, r1[id_diag], r2[id_diag]): - return False - - if diag.shape[0] > 2: - - # https://www.itl.nist.gov/div898/software/dataplot/refman1/auxillar/chi2samp.htm - K_from = np.sqrt(M[comb_id].diagonal() / M_from[comb_from_id].diagonal()) - K = np.sqrt(M_from[comb_from_id].diagonal() / M[comb_id].diagonal()) - #print("K From", K_from) - #print("K ", K) - - M_no_diag = M[comb_id][~np.eye(diag.shape[0], dtype=np.bool)].reshape(diag.shape[0], -1) - M_from_no_diag = M_from[comb_from_id][~np.eye(diag.shape[0], dtype=np.bool)].reshape(diag.shape[0], - -1) - #print("M No Diag", M_no_diag) - #print("M From No Diag", M_from_no_diag) - chi_stats = np.sum((np.power((M_no_diag.T * K).T - (M_from_no_diag.T * K_from).T, 2) \ - / (M_no_diag + M_from_no_diag)), axis=1) - #print("Chi stats", chi_stats) - #print("Chi Quantile", chi_2_quantile) - if np.any(chi_stats > chi_2_quantile): - return False - - return True - - def cb_structure_algo(self, alpha_exp=0.1, alpha_chi2=0.1, thumb_threshold=25): - adj_matrix = np.ones((self.variables.shape[0], self.variables.shape[0]), dtype=np.bool) - np.fill_diagonal(adj_matrix, False) - for to_var in tqdm(range(self.variables.shape[0])): - n = 0 - tested_variables = np.argwhere(adj_matrix[:, to_var]).ravel() - while n < tested_variables.shape[0]: - for from_var in tested_variables: - if from_var not in tested_variables: - continue - if n >= tested_variables.shape[0]: - break - sep_set_vars = tested_variables[tested_variables != from_var] - for comb in combinations(sep_set_vars, n): - if self.independence_test(to_var, from_var, comb, alpha_exp, alpha_chi2, thumb_threshold): - #print("######REMOVING EDGE #############", from_var, to_var) - adj_matrix[from_var, to_var] = False - tested_variables = np.argwhere(adj_matrix[:, to_var]).ravel() - break - n += 1 - #print("MATRIZ:", adj_matrix) - self.matrix = adj_matrix - - diff --git a/main_package/PyCTBN/parameters_estimator.py b/main_package/PyCTBN/parameters_estimator.py deleted file mode 100644 index d1c0ddb..0000000 --- a/main_package/PyCTBN/parameters_estimator.py +++ /dev/null @@ -1,143 +0,0 @@ - -import numpy as np - -#import network_graph as ng -#import trajectory as tr -#import set_of_cims as sofc -from .trajectory import Trajectory -from .set_of_cims import SetOfCims -from .network_graph import NetworkGraph - - - -class ParametersEstimator: - """Has the task of computing the cims of particular node given the trajectories and the net structure - in the graph ``_net_graph``. - - :param trajectories: the trajectories - :type trajectories: Trajectory - :param net_graph: the net structure - :type net_graph: NetworkGraph - :_single_set_of_cims: the set of cims object that will hold the cims of the node - """ - - def __init__(self, trajectories: Trajectory, net_graph: NetworkGraph): - """Constructor Method - """ - self._trajectories = trajectories - self._net_graph = net_graph - self._single_set_of_cims = None - - def fast_init(self, node_id: str) -> None: - """Initializes all the necessary structures for the parameters estimation for the node ``node_id``. - - :param node_id: the node label - :type node_id: string - """ - p_vals = self._net_graph._aggregated_info_about_nodes_parents[2] - node_states_number = self._net_graph.get_states_number(node_id) - self._single_set_of_cims = SetOfCims(node_id, p_vals, node_states_number, self._net_graph.p_combs) - - def compute_parameters_for_node(self, node_id: str) -> SetOfCims: - """Compute the CIMS of the node identified by the label ``node_id``. - - :param node_id: the node label - :type node_id: string - :return: A SetOfCims object filled with the computed CIMS - :rtype: SetOfCims - """ - node_indx = self._net_graph.get_node_indx(node_id) - state_res_times = self._single_set_of_cims._state_residence_times - transition_matrices = self._single_set_of_cims._transition_matrices - self.compute_state_res_time_for_node(node_indx, self._trajectories.times, - self._trajectories.trajectory, - self._net_graph.time_filtering, - self._net_graph.time_scalar_indexing_strucure, - state_res_times) - self.compute_state_transitions_for_a_node(node_indx, - self._trajectories.complete_trajectory, - self._net_graph.transition_filtering, - self._net_graph.transition_scalar_indexing_structure, - transition_matrices) - self._single_set_of_cims.build_cims(state_res_times, transition_matrices) - return self._single_set_of_cims - - def compute_state_res_time_for_node(self, node_indx: int, times: np.ndarray, trajectory: np.ndarray, - cols_filter: np.ndarray, scalar_indexes_struct: np.ndarray, - T: np.ndarray) -> None: - """Compute the state residence times for a node and fill the matrix ``T`` with the results - - :param node_indx: the index of the node - :type node_indx: int - :param times: the times deltas vector - :type times: numpy.array - :param trajectory: the trajectory - :type trajectory: numpy.ndArray - :param cols_filter: the columns filtering structure - :type cols_filter: numpy.array - :param scalar_indexes_struct: the indexing structure - :type scalar_indexes_struct: numpy.array - :param T: the state residence times vectors - :type T: numpy.ndArray - """ - T[:] = np.bincount(np.sum(trajectory[:, cols_filter] * scalar_indexes_struct / scalar_indexes_struct[0], axis=1) - .astype(np.int), \ - times, - minlength=scalar_indexes_struct[-1]).reshape(-1, T.shape[1]) - - def compute_state_transitions_for_a_node(self, node_indx: int, trajectory: np.ndarray, cols_filter: np.ndarray, - scalar_indexing: np.ndarray, M: np.ndarray): - """Compute the state residence times for a node and fill the matrices ``M`` with the results. - - :param node_indx: the index of the node - :type node_indx: int - :param trajectory: the trajectory - :type trajectory: numpy.ndArray - :param cols_filter: the columns filtering structure - :type cols_filter: numpy.array - :param scalar_indexing: the indexing structure - :type scalar_indexing: numpy.array - :param M: the state transitions matrices - :type M: numpy.ndArray - """ - diag_indices = np.array([x * M.shape[1] + x % M.shape[1] for x in range(M.shape[0] * M.shape[1])], - dtype=np.int64) - trj_tmp = trajectory[trajectory[:, int(trajectory.shape[1] / 2) + node_indx].astype(np.int) >= 0] - M[:] = np.bincount(np.sum(trj_tmp[:, cols_filter] * scalar_indexing / scalar_indexing[0], axis=1).astype(np.int), - minlength=scalar_indexing[-1]).reshape(-1, M.shape[1], M.shape[2]) - M_raveled = M.ravel() - M_raveled[diag_indices] = 0 - M_raveled[diag_indices] = np.sum(M, axis=2).ravel() - - """ - ##############These Methods are actually unused but could become useful in the near future################ - - def init_sets_cims_container(self): - self.sets_of_cims_struct = acims.SetsOfCimsContainer(self._net_graph.nodes, - self._net_graph.nodes_values, - self._net_graph. - get_ordered_by_indx_parents_values_for_all_nodes(), - self._net_graph.p_combs) - - def compute_parameters(self): - for indx, aggr in enumerate(zip(self._net_graph.nodes, self.sets_of_cims_struct.sets_of_cims)): - self.compute_state_res_time_for_node(self._net_graph.get_node_indx(aggr[0]), self.sample_path.trajectories.times, - self.sample_path.trajectories.trajectory, - self._net_graph.time_filtering[indx], - self._net_graph.time_scalar_indexing_strucure[indx], - aggr[1]._state_residence_times) - self.compute_state_transitions_for_a_node(self._net_graph.get_node_indx(aggr[0]), - self.sample_path.trajectories.complete_trajectory, - self._net_graph.transition_filtering[indx], - self._net_graph.transition_scalar_indexing_structure[indx], - aggr[1]._transition_matrices) - aggr[1].build_cims(aggr[1]._state_residence_times, aggr[1]._transition_matrices) - """ - - - - - - - - diff --git a/main_package/PyCTBN/sample_path.py b/main_package/PyCTBN/sample_path.py deleted file mode 100644 index 1d6c6c0..0000000 --- a/main_package/PyCTBN/sample_path.py +++ /dev/null @@ -1,69 +0,0 @@ - -#import abstract_importer as imp -#import structure as st -#import trajectory as tr -from .abstract_importer import AbstractImporter -from .structure import Structure -from .trajectory import Trajectory - - -class SamplePath: - """Aggregates all the informations about the trajectories, the real structure of the sampled net and variables - cardinalites. Has the task of creating the objects ``Trajectory`` and ``Structure`` that will - contain the mentioned data. - - :param importer: the Importer objects that will import ad process data - :type importer: AbstractImporter - :_trajectories: the ``Trajectory`` object that will contain all the concatenated trajectories - :_structure: the ``Structure`` Object that will contain all the structurral infos about the net - :_total_variables_count: the number of variables in the net - """ - def __init__(self, importer: AbstractImporter): - """Constructor Method - """ - self._importer = importer - self._trajectories = None - self._structure = None - self._total_variables_count = None - self._importer.import_data() - - def build_trajectories(self) -> None: - """Builds the Trajectory object that will contain all the trajectories. - Clears all the unused dataframes in ``_importer`` Object - """ - self._trajectories = \ - Trajectory(self._importer.build_list_of_samples_array(self._importer.concatenated_samples), - len(self._importer.sorter) + 1) - self._importer.clear_concatenated_frame() - - def build_structure(self) -> None: - """ - Builds the ``Structure`` object that aggregates all the infos about the net. - """ - if self._importer.sorter != self._importer.variables.iloc[:, 0].to_list(): - raise RuntimeError("The Dataset columns order have to match the order of labels in the variables Frame!") - - self._total_variables_count = len(self._importer.sorter) - labels = self._importer.variables.iloc[:, 0].to_list() - indxs = self._importer.variables.index.to_numpy() - vals = self._importer.variables.iloc[:, 1].to_numpy() - edges = list(self._importer.structure.to_records(index=False)) - self._structure = Structure(labels, indxs, vals, edges, - self._total_variables_count) - - @property - def trajectories(self) -> Trajectory: - return self._trajectories - - @property - def structure(self) -> Structure: - return self._structure - - @property - def total_variables_count(self): - return self._total_variables_count - - - - - diff --git a/main_package/PyCTBN/set_of_cims.py b/main_package/PyCTBN/set_of_cims.py deleted file mode 100644 index bbbd746..0000000 --- a/main_package/PyCTBN/set_of_cims.py +++ /dev/null @@ -1,94 +0,0 @@ -import typing -import numpy as np - -#import conditional_intensity_matrix as cim -from .conditional_intensity_matrix import ConditionalIntensityMatrix - - -class SetOfCims: - """Aggregates all the CIMS of the node identified by the label _node_id. - - :param node_id: the node label - :type node_ind: string - :param parents_states_number: the cardinalities of the parents - :type parents_states_number: List - :param node_states_number: the caridinality of the node - :type node_states_number: int - :param p_combs: the p_comb structure bound to this node - :type p_combs: numpy.ndArray - :_state_residence_time: matrix containing all the state residence time vectors for the node - :_transition_matrices: matrix containing all the transition matrices for the node - :_actual_cims: the cims of the node - """ - - def __init__(self, node_id: str, parents_states_number: typing.List, node_states_number: int, p_combs: np.ndarray): - """Constructor Method - """ - self._node_id = node_id - self._parents_states_number = parents_states_number - self._node_states_number = node_states_number - self._actual_cims = [] - self._state_residence_times = None - self._transition_matrices = None - self._p_combs = p_combs - self.build_times_and_transitions_structures() - - def build_times_and_transitions_structures(self) -> None: - """Initializes at the correct dimensions the state residence times matrix and the state transition matrices. - """ - if not self._parents_states_number: - self._state_residence_times = np.zeros((1, self._node_states_number), dtype=np.float) - self._transition_matrices = np.zeros((1, self._node_states_number, self._node_states_number), dtype=np.int) - else: - self._state_residence_times = \ - np.zeros((np.prod(self._parents_states_number), self._node_states_number), dtype=np.float) - self._transition_matrices = np.zeros([np.prod(self._parents_states_number), self._node_states_number, - self._node_states_number], dtype=np.int) - - def build_cims(self, state_res_times: np.ndarray, transition_matrices: np.ndarray) -> None: - """Build the ``ConditionalIntensityMatrix`` objects given the state residence times and transitions matrices. - Compute the cim coefficients.The class member ``_actual_cims`` will contain the computed cims. - - :param state_res_times: the state residence times matrix - :type state_res_times: numpy.ndArray - :param transition_matrices: the transition matrices - :type transition_matrices: numpy.ndArray - """ - for state_res_time_vector, transition_matrix in zip(state_res_times, transition_matrices): - cim_to_add = ConditionalIntensityMatrix(state_res_time_vector, transition_matrix) - cim_to_add.compute_cim_coefficients() - self._actual_cims.append(cim_to_add) - self._actual_cims = np.array(self._actual_cims) - self._transition_matrices = None - self._state_residence_times = None - - def filter_cims_with_mask(self, mask_arr: np.ndarray, comb: typing.List) -> np.ndarray: - """Filter the cims contained in the array ``_actual_cims`` given the boolean mask ``mask_arr`` and the index - ``comb``. - - :param mask_arr: the boolean mask that indicates which parent to consider - :type mask_arr: numpy.array - :param comb: the state/s of the filtered parents - :type comb: numpy.array - :return: Array of ``ConditionalIntensityMatrix`` objects - :rtype: numpy.array - """ - if mask_arr.size <= 1: - return self._actual_cims - else: - flat_indxs = np.argwhere(np.all(self._p_combs[:, mask_arr] == comb, axis=1)).ravel() - return self._actual_cims[flat_indxs] - - @property - def actual_cims(self) -> np.ndarray: - return self._actual_cims - - @property - def p_combs(self) -> np.ndarray: - return self._p_combs - - def get_cims_number(self): - return len(self._actual_cims) - - - diff --git a/main_package/PyCTBN/simple_cvs_importer.py b/main_package/PyCTBN/simple_cvs_importer.py deleted file mode 100644 index 2f04fb6..0000000 --- a/main_package/PyCTBN/simple_cvs_importer.py +++ /dev/null @@ -1,62 +0,0 @@ -import pandas as pd -import glob -import os - -import typing - -#import abstract_importer as ai -#import sample_path as sp -from .abstract_importer import AbstractImporter -from .sample_path import SamplePath - - -class CSVImporter(AbstractImporter): - - def __init__(self, file_path): - self._df_samples_list = None - super(CSVImporter, self).__init__(file_path) - - def import_data(self): - self.read_csv_file() - self._sorter = self.build_sorter(self._df_samples_list[0]) - self.import_variables() - self.import_structure() - self.compute_row_delta_in_all_samples_frames(self._df_samples_list) - - def read_csv_file(self): - df = pd.read_csv(self._file_path) - df.drop(df.columns[[0]], axis=1, inplace=True) - self._df_samples_list = [df] - - def import_variables(self): - values_list = [3 for var in self._sorter] - # initialize dict of lists - data = {'Name':self._sorter, 'Value':values_list} - # Create the pandas DataFrame - self._df_variables = pd.DataFrame(data) - - def build_sorter(self, sample_frame: pd.DataFrame) -> typing.List: - return list(sample_frame.columns)[1:] - - def import_structure(self): - data = {'From':['X','Y','Z'], 'To':['Z','Z','Y']} - self._df_structure = pd.DataFrame(data) - - def dataset_id(self) -> object: - pass - - -def main(): - read_files = glob.glob(os.path.join('../data', "*.csv")) - print(read_files[0]) - csvimp = CSVImporter(read_files[0]) - s1 = SamplePath(csvimp) - s1.build_trajectories() - s1.build_structure() - print(s1.structure) - print(s1.trajectories) - - -if __name__ == "__main__": - main() - diff --git a/main_package/PyCTBN/structure.py b/main_package/PyCTBN/structure.py deleted file mode 100644 index c2a5692..0000000 --- a/main_package/PyCTBN/structure.py +++ /dev/null @@ -1,98 +0,0 @@ -import typing as ty -import numpy as np - - -class Structure: - """Contains all the infos about the network structure(nodes labels, nodes caridinalites, edges, indexes) - - :param nodes_labels_list: the symbolic names of the variables - :type nodes_labels_list: List - :param nodes_indexes_arr: the indexes of the nodes - :type nodes_indexes_arr: numpy.ndArray - :param nodes_vals_arr: the cardinalites of the nodes - :type nodes_vals_arr: numpy.ndArray - :param edges_list: the edges of the network - :type edges_list: List - :param total_variables_number: the total number of variables in the net - :type total_variables_number: int - """ - - def __init__(self, nodes_labels_list: ty.List, nodes_indexes_arr: np.ndarray, nodes_vals_arr: np.ndarray, - edges_list: ty.List, total_variables_number: int): - """Constructor Method - """ - self._nodes_labels_list = nodes_labels_list - self._nodes_indexes_arr = nodes_indexes_arr - self._nodes_vals_arr = nodes_vals_arr - self._edges_list = edges_list - self._total_variables_number = total_variables_number - - @property - def edges(self) -> ty.List: - return self._edges_list - - @property - def nodes_labels(self) -> ty.List: - return self._nodes_labels_list - - @property - def nodes_indexes(self) -> np.ndarray: - return self._nodes_indexes_arr - - @property - def nodes_values(self) -> np.ndarray: - return self._nodes_vals_arr - - @property - def total_variables_number(self) -> int: - return self._total_variables_number - - def get_node_id(self, node_indx: int) -> str: - """Given the ``node_index`` returns the node label. - - :param node_indx: the node index - :type node_indx: int - :return: the node label - :rtype: string - """ - return self._nodes_labels_list[node_indx] - - def get_node_indx(self, node_id: str) -> int: - """Given the ``node_index`` returns the node label. - - :param node_id: the node label - :type node_id: string - :return: the node index - :rtype: int - """ - pos_indx = self._nodes_labels_list.index(node_id) - return self._nodes_indexes_arr[pos_indx] - - def get_positional_node_indx(self, node_id: str) -> int: - return self._nodes_labels_list.index(node_id) - - def get_states_number(self, node: str) -> int: - """Given the node label ``node`` returns the cardinality of the node. - - :param node: the node label - :type node: string - :return: the node cardinality - :rtype: int - """ - pos_indx = self._nodes_labels_list.index(node) - return self._nodes_vals_arr[pos_indx] - - def __repr__(self): - return "Variables:\n" + str(self._nodes_labels_list) +"\nValues:\n"+ str(self._nodes_vals_arr) +\ - "\nEdges: \n" + str(self._edges_list) - - def __eq__(self, other): - """Overrides the default implementation""" - if isinstance(other, Structure): - return set(self._nodes_labels_list) == set(other._nodes_labels_list) and \ - np.array_equal(self._nodes_vals_arr, other._nodes_vals_arr) and \ - np.array_equal(self._nodes_indexes_arr, other._nodes_indexes_arr) and \ - self._edges_list == other._edges_list - - return NotImplemented - diff --git a/main_package/PyCTBN/structure_estimator.py b/main_package/PyCTBN/structure_estimator.py deleted file mode 100644 index 284e94d..0000000 --- a/main_package/PyCTBN/structure_estimator.py +++ /dev/null @@ -1,240 +0,0 @@ - - -from tqdm import tqdm -import itertools -import json -import typing -import networkx as nx -import numpy as np -from networkx.readwrite import json_graph -from scipy.stats import chi2 as chi2_dist -from scipy.stats import f as f_dist - -#import cache as ch -#import conditional_intensity_matrix as condim -#import network_graph as ng -#import parameters_estimator as pe -#import sample_path as sp -#import structure as st -from .cache import Cache -from .conditional_intensity_matrix import ConditionalIntensityMatrix -from .network_graph import NetworkGraph -from .parameters_estimator import ParametersEstimator -from .sample_path import SamplePath -from .structure import Structure - - -class StructureEstimator: - """Has the task of estimating the network structure given the trajectories in ``samplepath``. - - :param sample_path: the _sample_path object containing the trajectories and the real structure - :type sample_path: SamplePath - :param exp_test_alfa: the significance level for the exponential Hp test - :type exp_test_alfa: float - :param chi_test_alfa: the significance level for the chi Hp test - :type chi_test_alfa: float - :_nodes: the nodes labels - :_nodes_vals: the nodes cardinalities - :_nodes_indxs: the nodes indexes - :_complete_graph: the complete directed graph built using the nodes labels in ``_nodes`` - :_cache: the Cache object - """ - - def __init__(self, sample_path: SamplePath, exp_test_alfa: float, chi_test_alfa: float): - """Constructor Method - """ - self._sample_path = sample_path - self._nodes = np.array(self._sample_path.structure.nodes_labels) - self._nodes_vals = self._sample_path.structure.nodes_values - self._nodes_indxs = self._sample_path.structure.nodes_indexes - self._complete_graph = self.build_complete_graph(self._sample_path.structure.nodes_labels) - self._exp_test_sign = exp_test_alfa - self._chi_test_alfa = chi_test_alfa - self._cache = Cache() - - def build_complete_graph(self, node_ids: typing.List) -> nx.DiGraph: - """Builds a complete directed graph (no self loops) given the nodes labels in the list ``node_ids``: - - :param node_ids: the list of nodes labels - :type node_ids: List - :return: a complete Digraph Object - :rtype: networkx.DiGraph - """ - complete_graph = nx.DiGraph() - complete_graph.add_nodes_from(node_ids) - complete_graph.add_edges_from(itertools.permutations(node_ids, 2)) - return complete_graph - - def complete_test(self, test_parent: str, test_child: str, parent_set: typing.List, child_states_numb: int, - tot_vars_count: int) -> bool: - """Performs a complete independence test on the directed graphs G1 = {test_child U parent_set} - G2 = {G1 U test_parent} (added as an additional parent of the test_child). - Generates all the necessary structures and datas to perform the tests. - - :param test_parent: the node label of the test parent - :type test_parent: string - :param test_child: the node label of the child - :type test_child: string - :param parent_set: the common parent set - :type parent_set: List - :param child_states_numb: the cardinality of the ``test_child`` - :type child_states_numb: int - :param tot_vars_count: the total number of variables in the net - :type tot_vars_count: int - :return: True iff test_child and test_parent are independent given the sep_set parent_set. False otherwise - :rtype: bool - """ - p_set = parent_set[:] - complete_info = parent_set[:] - complete_info.append(test_child) - - parents = np.array(parent_set) - parents = np.append(parents, test_parent) - sorted_parents = self._nodes[np.isin(self._nodes, parents)] - cims_filter = sorted_parents != test_parent - sofc1 = self._cache.find(set(p_set)) - - if not sofc1: - bool_mask1 = np.isin(self._nodes, complete_info) - l1 = list(self._nodes[bool_mask1]) - indxs1 = self._nodes_indxs[bool_mask1] - vals1 = self._nodes_vals[bool_mask1] - eds1 = list(itertools.product(parent_set,test_child)) - s1 = Structure(l1, indxs1, vals1, eds1, tot_vars_count) - g1 = NetworkGraph(s1) - g1.fast_init(test_child) - p1 = ParametersEstimator(self._sample_path.trajectories, g1) - p1.fast_init(test_child) - sofc1 = p1.compute_parameters_for_node(test_child) - self._cache.put(set(p_set), sofc1) - sofc2 = None - p_set.insert(0, test_parent) - if p_set: - sofc2 = self._cache.find(set(p_set)) - if not sofc2: - complete_info.append(test_parent) - bool_mask2 = np.isin(self._nodes, complete_info) - l2 = list(self._nodes[bool_mask2]) - indxs2 = self._nodes_indxs[bool_mask2] - vals2 = self._nodes_vals[bool_mask2] - eds2 = list(itertools.product(p_set, test_child)) - s2 = Structure(l2, indxs2, vals2, eds2, tot_vars_count) - g2 = NetworkGraph(s2) - g2.fast_init(test_child) - p2 = ParametersEstimator(self._sample_path.trajectories, g2) - p2.fast_init(test_child) - sofc2 = p2.compute_parameters_for_node(test_child) - self._cache.put(set(p_set), sofc2) - for cim1, p_comb in zip(sofc1.actual_cims, sofc1.p_combs): - cond_cims = sofc2.filter_cims_with_mask(cims_filter, p_comb) - for cim2 in cond_cims: - if not self.independence_test(child_states_numb, cim1, cim2): - return False - return True - - def independence_test(self, child_states_numb: int, cim1: ConditionalIntensityMatrix, - cim2: ConditionalIntensityMatrix) -> bool: - """Compute the actual independence test using two cims. - It is performed first the exponential test and if the null hypothesis is not rejected, - it is performed also the chi_test. - - :param child_states_numb: the cardinality of the test child - :type child_states_numb: int - :param cim1: a cim belonging to the graph without test parent - :type cim1: ConditionalIntensityMatrix - :param cim2: a cim belonging to the graph with test parent - :type cim2: ConditionalIntensityMatrix - :return:True iff both tests do NOT reject the null hypothesis of indipendence. False otherwise. - :rtype: bool - """ - M1 = cim1.state_transition_matrix - M2 = cim2.state_transition_matrix - r1s = M1.diagonal() - r2s = M2.diagonal() - C1 = cim1.cim - C2 = cim2.cim - F_stats = C2.diagonal() / C1.diagonal() - exp_alfa = self._exp_test_sign - for val in range(0, child_states_numb): - if F_stats[val] < f_dist.ppf(exp_alfa / 2, r1s[val], r2s[val]) or \ - F_stats[val] > f_dist.ppf(1 - exp_alfa / 2, r1s[val], r2s[val]): - return False - M1_no_diag = M1[~np.eye(M1.shape[0], dtype=bool)].reshape(M1.shape[0], -1) - M2_no_diag = M2[~np.eye(M2.shape[0], dtype=bool)].reshape( - M2.shape[0], -1) - chi_2_quantile = chi2_dist.ppf(1 - self._chi_test_alfa, child_states_numb - 1) - Ks = np.sqrt(r1s / r2s) - Ls = np.sqrt(r2s / r1s) - for val in range(0, child_states_numb): - Chi = np.sum(np.power(Ks[val] * M2_no_diag[val] - Ls[val] *M1_no_diag[val], 2) / - (M1_no_diag[val] + M2_no_diag[val])) - if Chi > chi_2_quantile: - return False - return True - - def one_iteration_of_CTPC_algorithm(self, var_id: str, tot_vars_count: int) -> None: - """Performs an iteration of the CTPC algorithm using the node ``var_id`` as ``test_child``. - - :param var_id: the node label of the test child - :type var_id: string - :param tot_vars_count: the number of _nodes in the net - :type tot_vars_count: int - """ - #print("##################TESTING VAR################", var_id) - u = list(self._complete_graph.predecessors(var_id)) - child_states_numb = self._sample_path.structure.get_states_number(var_id) - b = 0 - while b < len(u): - parent_indx = 0 - while parent_indx < len(u): - removed = False - S = self.generate_possible_sub_sets_of_size(u, b, u[parent_indx]) - test_parent = u[parent_indx] - for parents_set in S: - if self.complete_test(test_parent, var_id, parents_set, child_states_numb, tot_vars_count): - self._complete_graph.remove_edge(test_parent, var_id) - u.remove(test_parent) - removed = True - break - if not removed: - parent_indx += 1 - b += 1 - self._cache.clear() - - def generate_possible_sub_sets_of_size(self, u: typing.List, size: int, parent_label: str) -> \ - typing.Iterator: - """Creates a list containing all possible subsets of the list ``u`` of size ``size``, - that do not contains a the node identified by ``parent_label``. - - :param u: the list of nodes - :type u: List - :param size: the size of the subsets - :type size: int - :param parent_label: the node to exclude in the subsets generation - :type parent_label: string - :return: an Iterator Object containing a list of lists - :rtype: Iterator - """ - list_without_test_parent = u[:] - list_without_test_parent.remove(parent_label) - return map(list, itertools.combinations(list_without_test_parent, size)) - - def ctpc_algorithm(self) -> None: - """Compute the CTPC algorithm over the entire net. - """ - ctpc_algo = self.one_iteration_of_CTPC_algorithm - total_vars_numb = self._sample_path.total_variables_count - [ctpc_algo(n, total_vars_numb) for n in tqdm(self._nodes)] - - def save_results(self) -> None: - """Save the estimated Structure to a .json file in the path where the data are loaded from. - The file is named as the input dataset but the results_ word is appendend to the results file. - - """ - res = json_graph.node_link_data(self._complete_graph) - name = self._sample_path._importer.file_path.rsplit('/', 1)[-1] + str(self._sample_path._importer.dataset_id()) - name = 'results_' + name - with open(name, 'w') as f: - json.dump(res, f) - - diff --git a/main_package/PyCTBN/trajectory.py b/main_package/PyCTBN/trajectory.py deleted file mode 100644 index ae3ff93..0000000 --- a/main_package/PyCTBN/trajectory.py +++ /dev/null @@ -1,46 +0,0 @@ - -import numpy as np -import typing - - -class Trajectory: - """ Abstracts the infos about a complete set of trajectories, represented as a numpy array of doubles (the time deltas) - and a numpy matrix of ints (the changes of states). - - :param list_of_columns: the list containing the times array and values matrix - :type list_of_columns: List - :param original_cols_number: total number of cols in the data - :type original_cols_number: int - :_actual_trajectory: the trajectory containing also the duplicated/shifted values - :_times: the array containing the time deltas - """ - - def __init__(self, list_of_columns: typing.List, original_cols_number: int): - """Constructor Method - """ - if type(list_of_columns[0][0]) != np.float64: - raise TypeError('The first array in the list has to be Times') - self._original_cols_number = original_cols_number - self._actual_trajectory = np.array(list_of_columns[1:], dtype=np.int).T - self._times = np.array(list_of_columns[0], dtype=np.float) - - @property - def trajectory(self) -> np.ndarray: - return self._actual_trajectory[:, :self._original_cols_number] - - @property - def complete_trajectory(self) -> np.ndarray: - return self._actual_trajectory - - @property - def times(self): - return self._times - - def size(self): - return self._actual_trajectory.shape[0] - - def __repr__(self): - return "Complete Trajectory Rows: " + str(self.size()) + "\n" + self.complete_trajectory.__repr__() + \ - "\nTimes Rows:" + str(self.times.size) + "\n" + self.times.__repr__() - - diff --git a/main_package/tests/__init__.py b/main_package/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/main_package/tests/coverage.xml b/main_package/tests/coverage.xml deleted file mode 100644 index cef006f..0000000 --- a/main_package/tests/coverage.xml +++ /dev/null @@ -1,963 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/main_package/tests/test_cache.py b/main_package/tests/test_cache.py deleted file mode 100644 index 7cb6706..0000000 --- a/main_package/tests/test_cache.py +++ /dev/null @@ -1,57 +0,0 @@ - -import unittest -import numpy as np - -from ..PyCTBN.cache import Cache -from ..PyCTBN.set_of_cims import SetOfCims - - -class TestCache(unittest.TestCase): - - def test_init(self): - c1 = Cache() - self.assertFalse(c1._list_of_sets_of_parents) - self.assertFalse(c1._actual_cache) - - def test_put(self): - c1 = Cache() - pset1 = {'X', 'Y'} - sofc1 = SetOfCims('Z', [], 3, np.array([])) - c1.put(pset1, sofc1) - self.assertEqual(1, len(c1._actual_cache)) - self.assertEqual(1, len(c1._list_of_sets_of_parents)) - self.assertEqual(sofc1, c1._actual_cache[0]) - pset2 = {'X'} - sofc2 = SetOfCims('Z', [], 3, np.array([])) - c1.put(pset2, sofc2) - self.assertEqual(2, len(c1._actual_cache)) - self.assertEqual(2, len(c1._list_of_sets_of_parents)) - self.assertEqual(sofc2, c1._actual_cache[1]) - - def test_find(self): - c1 = Cache() - pset1 = {'X', 'Y'} - sofc1 = SetOfCims('Z', [], 3, np.array([])) - c1.put(pset1, sofc1) - self.assertEqual(1, len(c1._actual_cache)) - self.assertEqual(1, len(c1._list_of_sets_of_parents)) - self.assertIsInstance(c1.find(pset1), SetOfCims) - self.assertEqual(sofc1, c1.find(pset1)) - self.assertIsInstance(c1.find({'Y', 'X'}), SetOfCims) - self.assertEqual(sofc1, c1.find({'Y', 'X'})) - self.assertIsNone(c1.find({'X'})) - - def test_clear(self): - c1 = Cache() - pset1 = {'X', 'Y'} - sofc1 = SetOfCims('Z', [], 3, np.array([])) - c1.put(pset1, sofc1) - self.assertEqual(1, len(c1._actual_cache)) - self.assertEqual(1, len(c1._list_of_sets_of_parents)) - c1.clear() - self.assertFalse(c1._list_of_sets_of_parents) - self.assertFalse(c1._actual_cache) - - -if __name__ == '__main__': - unittest.main() diff --git a/main_package/tests/test_cim.py b/main_package/tests/test_cim.py deleted file mode 100644 index e3228db..0000000 --- a/main_package/tests/test_cim.py +++ /dev/null @@ -1,46 +0,0 @@ - -import unittest -import numpy as np - -from ..PyCTBN.conditional_intensity_matrix import ConditionalIntensityMatrix - - -class TestConditionalIntensityMatrix(unittest.TestCase): - - @classmethod - def setUpClass(cls) -> None: - cls.state_res_times = np.random.rand(1, 3)[0] - cls.state_res_times = cls.state_res_times * 1000 - cls.state_transition_matrix = np.random.randint(1, 10000, (3, 3)) - for i in range(0, len(cls.state_res_times)): - cls.state_transition_matrix[i, i] = 0 - cls.state_transition_matrix[i, i] = np.sum(cls.state_transition_matrix[i]) - - def test_init(self): - c1 = ConditionalIntensityMatrix(self.state_res_times, self.state_transition_matrix) - self.assertTrue(np.array_equal(self.state_res_times, c1.state_residence_times)) - self.assertTrue(np.array_equal(self.state_transition_matrix, c1.state_transition_matrix)) - self.assertEqual(c1.cim.dtype, np.float) - self.assertEqual(self.state_transition_matrix.shape, c1.cim.shape) - - def test_compute_cim_coefficients(self): - c1 = ConditionalIntensityMatrix(self.state_res_times, self.state_transition_matrix) - c2 = self.state_transition_matrix.astype(np.float) - np.fill_diagonal(c2, c2.diagonal() * -1) - for i in range(0, len(self.state_res_times)): - for j in range(0, len(self.state_res_times)): - c2[i, j] = (c2[i, j] + 1) / (self.state_res_times[i] + 1) - c1.compute_cim_coefficients() - for i in range(0, len(c1.state_residence_times)): - self.assertTrue(np.isclose(np.sum(c1.cim[i]), 0.0, 1e-02, 1e-01)) - for i in range(0, len(self.state_res_times)): - for j in range(0, len(self.state_res_times)): - self.assertTrue(np.isclose(c1.cim[i, j], c2[i, j], 1e-02, 1e-01)) - - def test_repr(self): - c1 = ConditionalIntensityMatrix(self.state_res_times, self.state_transition_matrix) - print(c1) - - -if __name__ == '__main__': - unittest.main() diff --git a/main_package/tests/test_json_importer.py b/main_package/tests/test_json_importer.py deleted file mode 100644 index f806ebd..0000000 --- a/main_package/tests/test_json_importer.py +++ /dev/null @@ -1,175 +0,0 @@ - -import unittest -import os -import glob -import numpy as np -import pandas as pd -import json - -from ..PyCTBN.json_importer import JsonImporter - - -class TestJsonImporter(unittest.TestCase): - - @classmethod - def setUpClass(cls) -> None: - cls.read_files = glob.glob(os.path.join('./data', "*.json")) - #print(os.path.join('../data')) - - def test_init(self): - j1 = JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name', 0) - self.assertEqual(j1._samples_label, 'samples') - self.assertEqual(j1._structure_label, 'dyn.str') - self.assertEqual(j1._variables_label, 'variables') - self.assertEqual(j1._time_key, 'Time') - self.assertEqual(j1._variables_key, 'Name') - self.assertEqual(j1._file_path, self.read_files[0]) - self.assertIsNone(j1._df_samples_list) - self.assertIsNone(j1.variables) - self.assertIsNone(j1.structure) - self.assertIsNone(j1.concatenated_samples) - self.assertIsNone(j1.sorter) - - def test_read_json_file_found(self): - data_set = {"key1": [1, 2, 3], "key2": [4, 5, 6]} - with open('data.json', 'w') as f: - json.dump(data_set, f) - path = os.getcwd() - path = path + '/data.json' - j1 = JsonImporter(path, '', '', '', '', '', 0) - imported_data = j1.read_json_file() - self.assertTrue(self.ordered(data_set) == self.ordered(imported_data)) - os.remove('data.json') - - def test_read_json_file_not_found(self): - path = os.getcwd() - path = path + '/data.json' - j1 = JsonImporter(path, '', '', '', '', '', 0) - self.assertRaises(FileNotFoundError, j1.read_json_file) - - def test_normalize_trajectories(self): - j1 = JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name', 0) - raw_data = j1.read_json_file() - #print(raw_data) - df_samples_list = j1.normalize_trajectories(raw_data, 0, j1._samples_label) - self.assertEqual(len(df_samples_list), len(raw_data[0][j1._samples_label])) - #self.assertEqual(list(j1._df_samples_list[0].columns.values)[1:], j1.sorter) - - def test_normalize_trajectories_wrong_indx(self): - j1 = JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name', 0) - raw_data = j1.read_json_file() - self.assertRaises(IndexError, j1.normalize_trajectories, raw_data, 474, j1._samples_label) - - def test_normalize_trajectories_wrong_key(self): - j1 = JsonImporter(self.read_files[0], 'sample', 'dyn.str', 'variables', 'Time', 'Name', 0) - raw_data = j1.read_json_file() - self.assertRaises(KeyError, j1.normalize_trajectories, raw_data, 0, j1._samples_label) - - def test_compute_row_delta_single_samples_frame(self): - j1 = JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name', 0) - raw_data = j1.read_json_file() - j1._df_samples_list = j1.import_trajectories(raw_data) - sample_frame = j1._df_samples_list[0] - original_copy = sample_frame.copy() - columns_header = list(sample_frame.columns.values) - shifted_cols_header = [s + "S" for s in columns_header[1:]] - new_sample_frame = j1.compute_row_delta_sigle_samples_frame(sample_frame, columns_header[1:], - shifted_cols_header) - self.assertEqual(len(list(sample_frame.columns.values)) + len(shifted_cols_header), - len(list(new_sample_frame.columns.values))) - self.assertEqual(sample_frame.shape[0] - 1, new_sample_frame.shape[0]) - for indx, row in new_sample_frame.iterrows(): - self.assertAlmostEqual(row['Time'], - original_copy.iloc[indx + 1]['Time'] - original_copy.iloc[indx]['Time']) - for indx, row in new_sample_frame.iterrows(): - np.array_equal(np.array(row[columns_header[1:]],dtype=int), - np.array(original_copy.iloc[indx][columns_header[1:]],dtype=int)) - np.array_equal(np.array(row[shifted_cols_header], dtype=int), - np.array(original_copy.iloc[indx + 1][columns_header[1:]], dtype=int)) - - def test_compute_row_delta_in_all_frames(self): - j1 = JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name', 0) - raw_data = j1.read_json_file() - j1._df_samples_list = j1.import_trajectories(raw_data) - j1._sorter = j1.build_sorter(j1._df_samples_list[0]) - j1.compute_row_delta_in_all_samples_frames(j1._df_samples_list) - self.assertEqual(list(j1._df_samples_list[0].columns.values), - list(j1.concatenated_samples.columns.values)[:len(list(j1._df_samples_list[0].columns.values))]) - self.assertEqual(list(j1.concatenated_samples.columns.values)[0], j1._time_key) - - def test_clear_data_frame_list(self): - j1 = JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name', 0) - raw_data = j1.read_json_file() - j1._df_samples_list = j1.import_trajectories(raw_data) - j1._sorter = j1.build_sorter(j1._df_samples_list[0]) - j1.compute_row_delta_in_all_samples_frames(j1._df_samples_list) - j1.clear_data_frame_list() - for df in j1._df_samples_list: - self.assertTrue(df.empty) - - def test_clear_concatenated_frame(self): - j1 = JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name', 0) - j1.import_data() - j1.clear_concatenated_frame() - self.assertTrue(j1.concatenated_samples.empty) - - def test_build_list_of_samples_array(self): - data_set = {"key1": [1, 2, 3], "key2": [4.1, 5.2, 6.3]} - with open('data.json', 'w') as f: - json.dump(data_set, f) - path = os.getcwd() - path = path + '/data.json' - j1 = JsonImporter(path, '', '', '', '', '', 0) - raw_data = j1.read_json_file() - frame = pd.DataFrame(raw_data) - col_list = j1.build_list_of_samples_array(frame) - forced_list = [] - for key in data_set: - forced_list.append(np.array(data_set[key])) - for a1, a2 in zip(col_list, forced_list): - self.assertTrue(np.array_equal(a1, a2)) - os.remove('data.json') - - def test_import_variables(self): - j1 = JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name', 0) - sorter = ['X', 'Y', 'Z'] - raw_data = [{'variables':{"Name": ['X', 'Y', 'Z'], "value": [3, 3, 3]}}] - df_var = j1.import_variables(raw_data) - self.assertEqual(list(df_var[j1._variables_key]), sorter) - - def test_import_structure(self): - j1 = JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name', 0) - raw_data = [{"dyn.str":[{"From":"X","To":"Z"},{"From":"Y","To":"Z"},{"From":"Z","To":"Y"}]}] - df_struct = j1.import_structure(raw_data) - #print(raw_data[0]['dyn.str'][0].items()) - self.assertIsInstance(df_struct, pd.DataFrame) - - def test_import_sampled_cims(self): - j1 = JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name', 0) - raw_data = j1.read_json_file() - j1._df_samples_list = j1.import_trajectories(raw_data) - j1._sorter = j1.build_sorter(j1._df_samples_list[0]) - cims = j1.import_sampled_cims(raw_data, 0, 'dyn.cims') - #j1.import_variables(raw_data, j1.sorter) - self.assertEqual(list(cims.keys()), j1.sorter) - - def test_import_data(self): - j1 = JsonImporter(self.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name', 2) - j1.import_data() - self.assertEqual(list(j1.variables[j1._variables_key]), - list(j1.concatenated_samples.columns.values[1:len(j1.variables[j1._variables_key]) + 1])) - print(j1.variables) - print(j1.structure) - print(j1.concatenated_samples) - - def ordered(self, obj): - if isinstance(obj, dict): - return sorted((k, self.ordered(v)) for k, v in obj.items()) - if isinstance(obj, list): - return sorted(self.ordered(x) for x in obj) - else: - return obj - - -if __name__ == '__main__': - unittest.main() diff --git a/main_package/tests/test_networkgraph.py b/main_package/tests/test_networkgraph.py deleted file mode 100644 index 6bb819b..0000000 --- a/main_package/tests/test_networkgraph.py +++ /dev/null @@ -1,187 +0,0 @@ - -import unittest -import glob -import os -import networkx as nx -import numpy as np -import itertools - -from ..PyCTBN.sample_path import SamplePath -from ..PyCTBN.network_graph import NetworkGraph -from ..PyCTBN.json_importer import JsonImporter - - -class TestNetworkGraph(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.read_files = glob.glob(os.path.join('./data', "*.json")) - cls.importer = JsonImporter(cls.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name', 0) - cls.s1 = SamplePath(cls.importer) - cls.s1.build_trajectories() - cls.s1.build_structure() - - def test_init(self): - g1 = NetworkGraph(self.s1.structure) - self.assertEqual(self.s1.structure, g1._graph_struct) - self.assertIsInstance(g1._graph, nx.DiGraph) - self.assertIsNone(g1.time_scalar_indexing_strucure) - self.assertIsNone(g1.transition_scalar_indexing_structure) - self.assertIsNone(g1.transition_filtering) - self.assertIsNone(g1.p_combs) - - def test_add_nodes(self): - g1 = NetworkGraph(self.s1.structure) - g1.add_nodes(self.s1.structure.nodes_labels) - for n1, n2 in zip(g1.nodes, self.s1.structure.nodes_labels): - self.assertEqual(n1, n2) - - def test_add_edges(self): - g1 = NetworkGraph(self.s1.structure) - g1.add_edges(self.s1.structure.edges) - for e in self.s1.structure.edges: - self.assertIn(tuple(e), g1.edges) - - def test_fast_init(self): - g1 = NetworkGraph(self.s1.structure) - for node in self.s1.structure.nodes_labels: - g1.fast_init(node) - self.assertIsNotNone(g1._graph.nodes) - self.assertIsNotNone(g1._graph.edges) - self.assertIsInstance(g1._time_scalar_indexing_structure, np.ndarray) - self.assertIsInstance(g1._transition_scalar_indexing_structure, np.ndarray) - self.assertIsInstance(g1._time_filtering, np.ndarray) - self.assertIsInstance(g1._transition_filtering, np.ndarray) - self.assertIsInstance(g1._p_combs_structure, np.ndarray) - self.assertIsInstance(g1._aggregated_info_about_nodes_parents, tuple) - - def test_get_ordered_by_indx_set_of_parents(self): - g1 = NetworkGraph(self.s1.structure) - g1.add_nodes(self.s1.structure.nodes_labels) - g1.add_edges(self.s1.structure.edges) - for node in self.s1.structure.nodes_labels: - aggr_info = g1.get_ordered_by_indx_set_of_parents(node) - for indx in range(len(aggr_info[0]) - 1 ): - self.assertLess(g1.get_node_indx(aggr_info[0][indx]), g1.get_node_indx(aggr_info[0][indx + 1])) - for par, par_indx in zip(aggr_info[0], aggr_info[1]): - self.assertEqual(g1.get_node_indx(par), par_indx) - for par, par_val in zip(aggr_info[0], aggr_info[2]): - self.assertEqual(g1._graph_struct.get_states_number(par), par_val) - - def test_build_time_scalar_indexing_structure_for_a_node(self): - g1 = NetworkGraph(self.s1.structure) - g1.add_nodes(self.s1.structure.nodes_labels) - g1.add_edges(self.s1.structure.edges) - for node in self.s1.structure.nodes_labels: - aggr_info = g1.get_ordered_by_indx_set_of_parents(node) - self.aux_build_time_scalar_indexing_structure_for_a_node(g1, node, aggr_info[1], - aggr_info[0], aggr_info[2]) - - def aux_build_time_scalar_indexing_structure_for_a_node(self, graph, node_id, parents_indxs, parents_labels, parents_vals): - time_scalar_indexing = graph.build_time_scalar_indexing_structure_for_a_node(node_id, parents_vals) - self.assertEqual(len(time_scalar_indexing), len(parents_indxs) + 1) - merged_list = parents_labels[:] - merged_list.insert(0, node_id) - vals_list = [] - for node in merged_list: - vals_list.append(graph.get_states_number(node)) - t_vec = np.array(vals_list) - t_vec = t_vec.cumprod() - self.assertTrue(np.array_equal(time_scalar_indexing, t_vec)) - - def test_build_transition_scalar_indexing_structure_for_a_node(self): - g1 = NetworkGraph(self.s1.structure) - g1.add_nodes(self.s1.structure.nodes_labels) - g1.add_edges(self.s1.structure.edges) - for node in self.s1.structure.nodes_labels: - aggr_info = g1.get_ordered_by_indx_set_of_parents(node) - self.aux_build_transition_scalar_indexing_structure_for_a_node(g1, node, aggr_info[1], - aggr_info[0], aggr_info[2]) - - def aux_build_transition_scalar_indexing_structure_for_a_node(self, graph, node_id, parents_indxs, parents_labels, - parents_values): - transition_scalar_indexing = graph.build_transition_scalar_indexing_structure_for_a_node(node_id, - parents_values) - self.assertEqual(len(transition_scalar_indexing), len(parents_indxs) + 2) - merged_list = parents_labels[:] - merged_list.insert(0, node_id) - merged_list.insert(0, node_id) - vals_list = [] - for node_id in merged_list: - vals_list.append(graph.get_states_number(node_id)) - m_vec = np.array([vals_list]) - m_vec = m_vec.cumprod() - self.assertTrue(np.array_equal(transition_scalar_indexing, m_vec)) - - def test_build_time_columns_filtering_structure_for_a_node(self): - g1 = NetworkGraph(self.s1.structure) - g1.add_nodes(self.s1.structure.nodes_labels) - g1.add_edges(self.s1.structure.edges) - for node in self.s1.structure.nodes_labels: - aggr_info = g1.get_ordered_by_indx_set_of_parents(node) - self.aux_build_time_columns_filtering_structure_for_a_node(g1, node, aggr_info[1]) - - def aux_build_time_columns_filtering_structure_for_a_node(self, graph, node_id, p_indxs): - graph.build_time_columns_filtering_for_a_node(graph.get_node_indx(node_id), p_indxs) - single_filter = [] - single_filter.append(graph.get_node_indx(node_id)) - single_filter.extend(p_indxs) - self.assertTrue(np.array_equal(graph.build_time_columns_filtering_for_a_node(graph.get_node_indx(node_id), - p_indxs),np.array(single_filter))) - def test_build_transition_columns_filtering_structure(self): - g1 = NetworkGraph(self.s1.structure) - g1.add_nodes(self.s1.structure.nodes_labels) - g1.add_edges(self.s1.structure.edges) - for node in self.s1.structure.nodes_labels: - aggr_info = g1.get_ordered_by_indx_set_of_parents(node) - self.aux_build_time_columns_filtering_structure_for_a_node(g1, node, aggr_info[1]) - - def aux_build_transition_columns_filtering_structure(self, graph, node_id, p_indxs): - single_filter = [] - single_filter.append(graph.get_node_indx(node_id) + graph._graph_struct.total_variables_number) - single_filter.append(graph.get_node_indx(node_id)) - single_filter.extend(p_indxs) - self.assertTrue(np.array_equal(graph.build_transition_filtering_for_a_node(graph.get_node_indx(node_id), - - p_indxs), np.array(single_filter))) - def test_build_p_combs_structure(self): - g1 = NetworkGraph(self.s1.structure) - g1.add_nodes(self.s1.structure.nodes_labels) - g1.add_edges(self.s1.structure.edges) - for node in self.s1.structure.nodes_labels: - aggr_info = g1.get_ordered_by_indx_set_of_parents(node) - self.aux_build_p_combs_structure(g1, aggr_info[2]) - - def aux_build_p_combs_structure(self, graph, p_vals): - p_combs = graph.build_p_comb_structure_for_a_node(p_vals) - p_possible_vals = [] - for val in p_vals: - vals = [v for v in range(val)] - p_possible_vals.extend(vals) - comb_struct = set(itertools.product(p_possible_vals,repeat=len(p_vals))) - for comb in comb_struct: - self.assertIn(np.array(comb), p_combs) - - def test_get_parents_by_id(self): - g1 = NetworkGraph(self.s1.structure) - g1.add_nodes(self.s1.structure.nodes_labels) - g1.add_edges(self.s1.structure.edges) - for node in g1.nodes: - self.assertListEqual(g1.get_parents_by_id(node), list(g1._graph.predecessors(node))) - - def test_get_states_number(self): - g1 = NetworkGraph(self.s1.structure) - g1.add_nodes(self.s1.structure.nodes_labels) - g1.add_edges(self.s1.structure.edges) - for node, val in zip(g1.nodes, g1.nodes_values): - self.assertEqual(val, g1.get_states_number(node)) - - def test_get_node_indx(self): - g1 = NetworkGraph(self.s1.structure) - g1.add_nodes(self.s1.structure.nodes_labels) - g1.add_edges(self.s1.structure.edges) - for node, indx in zip(g1.nodes, g1.nodes_indexes): - self.assertEqual(indx, g1.get_node_indx(node)) - - -if __name__ == '__main__': - unittest.main() diff --git a/main_package/tests/test_parameters_estimator.py b/main_package/tests/test_parameters_estimator.py deleted file mode 100644 index 9314f84..0000000 --- a/main_package/tests/test_parameters_estimator.py +++ /dev/null @@ -1,67 +0,0 @@ - -import unittest -import numpy as np -import glob -import os - -from ..PyCTBN.network_graph import NetworkGraph -from ..PyCTBN.sample_path import SamplePath -from ..PyCTBN.set_of_cims import SetOfCims -from ..PyCTBN.parameters_estimator import ParametersEstimator -from ..PyCTBN.json_importer import JsonImporter - - -class TestParametersEstimatior(unittest.TestCase): - - @classmethod - def setUpClass(cls) -> None: - cls.read_files = glob.glob(os.path.join('./data', "*.json")) - cls.array_indx = 0 - cls.importer = JsonImporter(cls.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name', - cls.array_indx) - cls.s1 = SamplePath(cls.importer) - cls.s1.build_trajectories() - cls.s1.build_structure() - print(cls.s1.structure.edges) - print(cls.s1.structure.nodes_values) - - def test_fast_init(self): - for node in self.s1.structure.nodes_labels: - g = NetworkGraph(self.s1.structure) - g.fast_init(node) - p1 = ParametersEstimator(self.s1.trajectories, g) - self.assertEqual(p1._trajectories, self.s1.trajectories) - self.assertEqual(p1._net_graph, g) - self.assertIsNone(p1._single_set_of_cims) - p1.fast_init(node) - self.assertIsInstance(p1._single_set_of_cims, SetOfCims) - - def test_compute_parameters_for_node(self): - for indx, node in enumerate(self.s1.structure.nodes_labels): - print(node) - g = NetworkGraph(self.s1.structure) - g.fast_init(node) - p1 = ParametersEstimator(self.s1.trajectories, g) - p1.fast_init(node) - sofc1 = p1.compute_parameters_for_node(node) - sampled_cims = self.aux_import_sampled_cims('dyn.cims') - sc = list(sampled_cims.values()) - self.equality_of_cims_of_node(sc[indx], sofc1._actual_cims) - - def equality_of_cims_of_node(self, sampled_cims, estimated_cims): - self.assertEqual(len(sampled_cims), len(estimated_cims)) - for c1, c2 in zip(sampled_cims, estimated_cims): - self.cim_equality_test(c1, c2.cim) - - def cim_equality_test(self, cim1, cim2): - for r1, r2 in zip(cim1, cim2): - self.assertTrue(np.all(np.isclose(r1, r2, 1e-01, 1e-01) == True)) - - def aux_import_sampled_cims(self, cims_label): - i1 = JsonImporter(self.read_files[0], '', '', '', '', '', self.array_indx) - raw_data = i1.read_json_file() - return i1.import_sampled_cims(raw_data, self.array_indx, cims_label) - - -if __name__ == '__main__': - unittest.main() diff --git a/main_package/tests/test_sample_path.py b/main_package/tests/test_sample_path.py deleted file mode 100644 index e2f10b1..0000000 --- a/main_package/tests/test_sample_path.py +++ /dev/null @@ -1,39 +0,0 @@ - -import unittest -import glob -import os - -from ..PyCTBN.json_importer import JsonImporter -from ..PyCTBN.sample_path import SamplePath -from ..PyCTBN.trajectory import Trajectory -from ..PyCTBN.structure import Structure - - -class TestSamplePath(unittest.TestCase): - - @classmethod - def setUpClass(cls) -> None: - cls.read_files = glob.glob(os.path.join('./data', "*.json")) - cls.importer = JsonImporter(cls.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name', 0) - - def test_init(self): - s1 = SamplePath(self.importer) - self.assertIsNone(s1.trajectories) - self.assertIsNone(s1.structure) - self.assertFalse(s1._importer.concatenated_samples.empty) - self.assertIsNone(s1._total_variables_count) - - def test_build_trajectories(self): - s1 = SamplePath(self.importer) - s1.build_trajectories() - self.assertIsInstance(s1.trajectories, Trajectory) - - def test_build_structure(self): - s1 = SamplePath(self.importer) - s1.build_structure() - self.assertIsInstance(s1.structure, Structure) - self.assertEqual(s1._total_variables_count, len(s1._importer.sorter)) - - -if __name__ == '__main__': - unittest.main() diff --git a/main_package/tests/test_setofcims.py b/main_package/tests/test_setofcims.py deleted file mode 100644 index daed9f0..0000000 --- a/main_package/tests/test_setofcims.py +++ /dev/null @@ -1,133 +0,0 @@ - -import unittest -import numpy as np -import itertools - -from ..PyCTBN.set_of_cims import SetOfCims - - -class TestSetOfCims(unittest.TestCase): - - @classmethod - def setUpClass(cls) -> None: - cls.node_id = 'X' - cls.possible_cardinalities = [2, 3] - cls.possible_states = [[0,1], [0, 1, 2]] - cls.node_states_number = range(2, 4) - - def test_init(self): - # empty parent set - for sn in self.node_states_number: - p_combs = self.build_p_comb_structure_for_a_node([]) - self.aux_test_init(self.node_id, [], sn, p_combs) - # one parent - for sn in self.node_states_number: - for p in itertools.product(self.possible_cardinalities, repeat=1): - p_combs = self.build_p_comb_structure_for_a_node(list(p)) - self.aux_test_init(self.node_id, list(p), sn, p_combs) - #two parents - for sn in self.node_states_number: - for p in itertools.product(self.possible_cardinalities, repeat=2): - p_combs = self.build_p_comb_structure_for_a_node(list(p)) - self.aux_test_init(self.node_id, list(p), sn, p_combs) - - def test_build_cims(self): - # empty parent set - for sn in self.node_states_number: - p_combs = self.build_p_comb_structure_for_a_node([]) - self.aux_test_build_cims(self.node_id, [], sn, p_combs) - # one parent - for sn in self.node_states_number: - for p in itertools.product(self.possible_cardinalities, repeat=1): - p_combs = self.build_p_comb_structure_for_a_node(list(p)) - self.aux_test_build_cims(self.node_id, list(p), sn, p_combs) - #two parents - for sn in self.node_states_number: - for p in itertools.product(self.possible_cardinalities, repeat=2): - p_combs = self.build_p_comb_structure_for_a_node(list(p)) - self.aux_test_build_cims(self.node_id, list(p), sn, p_combs) - - def test_filter_cims_with_mask(self): - p_combs = self.build_p_comb_structure_for_a_node(self.possible_cardinalities) - sofc1 = SetOfCims('X', self.possible_cardinalities, 3, p_combs) - state_res_times_list = [] - transition_matrices_list = [] - for i in range(len(p_combs)): - state_res_times = np.random.rand(1, 3)[0] - state_res_times = state_res_times * 1000 - state_transition_matrix = np.random.randint(1, 10000, (3, 3)) - state_res_times_list.append(state_res_times) - transition_matrices_list.append(state_transition_matrix) - sofc1.build_cims(np.array(state_res_times_list), np.array(transition_matrices_list)) - for length_of_mask in range(3): - for mask in list(itertools.permutations([True, False],r=length_of_mask)): - m = np.array(mask) - for parent_value in range(self.possible_cardinalities[0]): - cims = sofc1.filter_cims_with_mask(m, [parent_value]) - if length_of_mask == 0 or length_of_mask == 1: - self.assertTrue(np.array_equal(sofc1._actual_cims, cims)) - else: - indxs = self.another_filtering_method(p_combs, m, [parent_value]) - self.assertTrue(np.array_equal(cims, sofc1._actual_cims[indxs])) - - def aux_test_build_cims(self, node_id, p_values, node_states, p_combs): - state_res_times_list = [] - transition_matrices_list = [] - so1 = SetOfCims(node_id, p_values, node_states, p_combs) - for i in range(len(p_combs)): - state_res_times = np.random.rand(1, node_states)[0] - state_res_times = state_res_times * 1000 - state_transition_matrix = np.random.randint(1, 10000, (node_states, node_states)) - state_res_times_list.append(state_res_times) - transition_matrices_list.append(state_transition_matrix) - so1.build_cims(np.array(state_res_times_list), np.array(transition_matrices_list)) - self.assertEqual(len(state_res_times_list), so1.get_cims_number()) - self.assertIsInstance(so1._actual_cims, np.ndarray) - self.assertIsNone(so1._transition_matrices) - self.assertIsNone(so1._state_residence_times) - - def aux_test_init(self, node_id, parents_states_number, node_states_number, p_combs): - sofcims = SetOfCims(node_id, parents_states_number, node_states_number, p_combs) - self.assertEqual(sofcims._node_id, node_id) - self.assertTrue(np.array_equal(sofcims._p_combs, p_combs)) - self.assertTrue(np.array_equal(sofcims._parents_states_number, parents_states_number)) - self.assertEqual(sofcims._node_states_number, node_states_number) - self.assertFalse(sofcims._actual_cims) - self.assertEqual(sofcims._state_residence_times.shape[0], np.prod(np.array(parents_states_number))) - self.assertEqual(len(sofcims._state_residence_times[0]), node_states_number) - self.assertEqual(sofcims._transition_matrices.shape[0], np.prod(np.array(parents_states_number))) - self.assertEqual(len(sofcims._transition_matrices[0][0]), node_states_number) - - def build_p_comb_structure_for_a_node(self, parents_values): - """ - Builds the combinatory structure that contains the combinations of all the values contained in parents_values. - - Parameters: - parents_values: the cardinalities of the nodes - Returns: - a numpy matrix containing a grid of the combinations - """ - tmp = [] - for val in parents_values: - tmp.append([x for x in range(val)]) - if len(parents_values) > 0: - parents_comb = np.array(np.meshgrid(*tmp)).T.reshape(-1, len(parents_values)) - if len(parents_values) > 1: - tmp_comb = parents_comb[:, 1].copy() - parents_comb[:, 1] = parents_comb[:, 0].copy() - parents_comb[:, 0] = tmp_comb - else: - parents_comb = np.array([[]], dtype=np.int) - return parents_comb - - def another_filtering_method(self,p_combs, mask, parent_value): - masked_combs = p_combs[:, mask] - indxs = [] - for indx, val in enumerate(masked_combs): - if val == parent_value: - indxs.append(indx) - return np.array(indxs) - - -if __name__ == '__main__': - unittest.main() diff --git a/main_package/tests/test_structure.py b/main_package/tests/test_structure.py deleted file mode 100644 index b6b4493..0000000 --- a/main_package/tests/test_structure.py +++ /dev/null @@ -1,81 +0,0 @@ - -import unittest -import numpy as np -from ..PyCTBN.structure import Structure - - -class TestStructure(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.labels = ['X','Y','Z'] - cls.indxs = np.array([0,1,2]) - cls.vals = np.array([3,3,3]) - cls.edges = [('X','Z'),('Y','Z'), ('Z','Y')] - cls.vars_numb = len(cls.labels) - - def test_init(self): - s1 = Structure(self.labels, self.indxs, self.vals, self.edges, self.vars_numb) - self.assertListEqual(self.labels,s1.nodes_labels) - self.assertIsInstance(s1.nodes_indexes, np.ndarray) - self.assertTrue(np.array_equal(self.indxs, s1.nodes_indexes)) - self.assertIsInstance(s1.nodes_values, np.ndarray) - self.assertTrue(np.array_equal(self.vals, s1.nodes_values)) - self.assertListEqual(self.edges, s1.edges) - self.assertEqual(self.vars_numb, s1.total_variables_number) - - def test_get_node_id(self): - s1 = Structure(self.labels, self.indxs, self.vals, self.edges, self.vars_numb) - for indx, var in enumerate(self.labels): - self.assertEqual(var, s1.get_node_id(indx)) - - def test_get_node_indx(self): - l2 = self.labels[:] - l2.remove('Y') - i2 = self.indxs.copy() - np.delete(i2, 1) - v2 = self.vals.copy() - np.delete(v2, 1) - e2 = [('X','Z')] - n2 = self.vars_numb - 1 - s1 = Structure(l2, i2, v2, e2, n2) - for indx, var in zip(i2, l2): - self.assertEqual(indx, s1.get_node_indx(var)) - - def test_get_positional_node_indx(self): - l2 = self.labels[:] - l2.remove('Y') - i2 = self.indxs.copy() - np.delete(i2, 1) - v2 = self.vals.copy() - np.delete(v2, 1) - e2 = [('X', 'Z')] - n2 = self.vars_numb - 1 - s1 = Structure(l2, i2, v2, e2, n2) - for indx, var in enumerate(s1.nodes_labels): - self.assertEqual(indx, s1.get_positional_node_indx(var)) - - def test_get_states_number(self): - l2 = self.labels[:] - l2.remove('Y') - i2 = self.indxs.copy() - np.delete(i2, 1) - v2 = self.vals.copy() - np.delete(v2, 1) - e2 = [('X', 'Z')] - n2 = self.vars_numb - 1 - s1 = Structure(l2, i2, v2, e2, n2) - for val, node in zip(v2, l2): - self.assertEqual(val, s1.get_states_number(node)) - - def test_equality(self): - s1 = Structure(self.labels, self.indxs, self.vals, self.edges, self.vars_numb) - s2 = Structure(self.labels, self.indxs, self.vals, self.edges, self.vars_numb) - self.assertEqual(s1, s2) - - def test_repr(self): - s1 = Structure(self.labels, self.indxs, self.vals, self.edges, self.vars_numb) - print(s1) - - -if __name__ == '__main__': - unittest.main() diff --git a/main_package/tests/test_structure_estimator.py b/main_package/tests/test_structure_estimator.py deleted file mode 100644 index ca21cab..0000000 --- a/main_package/tests/test_structure_estimator.py +++ /dev/null @@ -1,103 +0,0 @@ - -import glob -import math -import os -import unittest - -import networkx as nx -import numpy as np -import psutil -from line_profiler import LineProfiler -import timeit - -from ..PyCTBN.cache import Cache -from ..PyCTBN.sample_path import SamplePath -from ..PyCTBN.structure_estimator import StructureEstimator -from ..PyCTBN.json_importer import JsonImporter - - -class TestStructureEstimator(unittest.TestCase): - - @classmethod - def setUpClass(cls): - cls.read_files = glob.glob(os.path.join('./data', "*.json")) - cls.importer = JsonImporter(cls.read_files[0], 'samples', 'dyn.str', 'variables', 'Time', 'Name', 0) - cls.s1 = SamplePath(cls.importer) - cls.s1.build_trajectories() - cls.s1.build_structure() - - def test_init(self): - exp_alfa = 0.1 - chi_alfa = 0.1 - se1 = StructureEstimator(self.s1, exp_alfa, chi_alfa) - self.assertEqual(self.s1, se1._sample_path) - self.assertTrue(np.array_equal(se1._nodes, np.array(self.s1.structure.nodes_labels))) - self.assertTrue(np.array_equal(se1._nodes_indxs, self.s1.structure.nodes_indexes)) - self.assertTrue(np.array_equal(se1._nodes_vals, self.s1.structure.nodes_values)) - self.assertEqual(se1._exp_test_sign, exp_alfa) - self.assertEqual(se1._chi_test_alfa, chi_alfa) - self.assertIsInstance(se1._complete_graph, nx.DiGraph) - self.assertIsInstance(se1._cache, Cache) - - def test_build_complete_graph(self): - exp_alfa = 0.1 - chi_alfa = 0.1 - nodes_numb = len(self.s1.structure.nodes_labels) - se1 = StructureEstimator(self.s1, exp_alfa, chi_alfa) - cg = se1.build_complete_graph(self.s1.structure.nodes_labels) - self.assertEqual(len(cg.edges), nodes_numb*(nodes_numb - 1)) - for node in self.s1.structure.nodes_labels: - no_self_loops = self.s1.structure.nodes_labels[:] - no_self_loops.remove(node) - for n2 in no_self_loops: - self.assertIn((node, n2), cg.edges) - - def test_generate_possible_sub_sets_of_size(self): - exp_alfa = 0.1 - chi_alfa = 0.1 - nodes_numb = len(self.s1.structure.nodes_labels) - se1 = StructureEstimator(self.s1, exp_alfa, chi_alfa) - - for node in self.s1.structure.nodes_labels: - for b in range(nodes_numb): - sets = se1.generate_possible_sub_sets_of_size(self.s1.structure.nodes_labels, b, node) - sets2 = se1.generate_possible_sub_sets_of_size(self.s1.structure.nodes_labels, b, node) - self.assertEqual(len(list(sets)), math.floor(math.factorial(nodes_numb - 1) / - (math.factorial(b)*math.factorial(nodes_numb -1 - b)))) - for sset in sets2: - self.assertFalse(node in sset) - - def test_time(self): - se1 = StructureEstimator(self.s1, 0.1, 0.1) - lp = LineProfiler() - #lp.add_function(se1.complete_test) - #lp.add_function(se1.one_iteration_of_CTPC_algorithm) - #lp.add_function(se1.independence_test) - lp_wrapper = lp(se1.ctpc_algorithm) - lp_wrapper() - lp.print_stats() - #print("Last time", lp.dump_stats()) - #print("Exec Time", timeit.timeit(se1.ctpc_algorithm, number=1)) - print(se1._complete_graph.edges) - print(self.s1.structure.edges) - for ed in self.s1.structure.edges: - self.assertIn(tuple(ed), se1._complete_graph.edges) - tuples_edges = [tuple(rec) for rec in self.s1.structure.edges] - spurious_edges = [] - for ed in se1._complete_graph.edges: - if not(ed in tuples_edges): - spurious_edges.append(ed) - print("Spurious Edges:",spurious_edges) - print("Adj Matrix:", nx.adj_matrix(se1._complete_graph).toarray().astype(bool)) - #se1.save_results() - - def test_memory(self): - se1 = StructureEstimator(self.s1, 0.1, 0.1) - se1.ctpc_algorithm() - current_process = psutil.Process(os.getpid()) - mem = current_process.memory_info().rss - print("Average Memory Usage in MB:", mem / 10**6) - - -if __name__ == '__main__': - unittest.main() diff --git a/main_package/tests/test_trajectory.py b/main_package/tests/test_trajectory.py deleted file mode 100644 index f826632..0000000 --- a/main_package/tests/test_trajectory.py +++ /dev/null @@ -1,46 +0,0 @@ - -import unittest -import numpy as np - -from ..PyCTBN.trajectory import Trajectory - - -class TestTrajectory(unittest.TestCase): - - def test_init(self): - cols_list = [np.array([1.2,1.3,.14]), np.arange(1,4), np.arange(4,7)] - t1 = Trajectory(cols_list, len(cols_list) - 2) - self.assertTrue(np.array_equal(cols_list[0], t1.times)) - self.assertTrue(np.array_equal(np.ravel(t1.complete_trajectory[:, : 1]), cols_list[1])) - self.assertTrue(np.array_equal(np.ravel(t1.complete_trajectory[:, 1: 2]), cols_list[2])) - self.assertEqual(len(cols_list) - 1, t1.complete_trajectory.shape[1]) - self.assertEqual(t1.size(), t1.times.size) - - def test_init_first_array_not_float_type(self): - cols_list = [np.arange(1, 4), np.arange(4, 7), np.array([1.2, 1.3, .14])] - self.assertRaises(TypeError, Trajectory, cols_list, len(cols_list)) - - def test_complete_trajectory(self): - cols_list = [np.array([1.2, 1.3, .14]), np.arange(1, 4), np.arange(4, 7)] - t1 = Trajectory(cols_list, len(cols_list) - 2) - complete = np.column_stack((cols_list[1], cols_list[2])) - self.assertTrue(np.array_equal(t1.complete_trajectory, complete)) - - def test_trajectory(self): - cols_list = [np.array([1.2, 1.3, .14]), np.arange(1, 4), np.arange(4, 7)] - t1 = Trajectory(cols_list, len(cols_list) - 2) - self.assertTrue(np.array_equal(cols_list[1], t1.trajectory.ravel())) - - def test_times(self): - cols_list = [np.array([1.2, 1.3, .14]), np.arange(1, 4), np.arange(4, 7)] - t1 = Trajectory(cols_list, len(cols_list) - 2) - self.assertTrue(np.array_equal(cols_list[0], t1.times)) - - def test_repr(self): - cols_list = [np.array([1.2, 1.3, .14]), np.arange(1, 4), np.arange(4, 7)] - t1 = Trajectory(cols_list, len(cols_list) - 2) - print(t1) - - -if __name__ == '__main__': - unittest.main()