Skip to content

Preprocessing

This module provides the Data class.

Classes

Data: public class for handling data used for the causal discovery.

Data

Data class manages the preprocess of the data before the causal analysis.

Source code in causalflow/preprocessing/data.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class Data():
    """Data class manages the preprocess of the data before the causal analysis."""

    def __init__(self, data, vars = None, fill_nan = True, stand = False, subsampling : SubsamplingMethod = None, show_subsampling = False):
        """
        Class constructor.

        Args:
            data (str/DataFrame/np.array): it can be a string specifing the path of a csv file to load/pandas.DataFrame/numpy.array.
            vars (list(str), optional): List containing variable names. If unset then, 
                if data = (str/DataFrame) vars = data columns name elif data = np.array vars = [X_0 .. X_N]
                Defaults to None.
            fill_nan (bool, optional): Fill NaNs bit. Defaults to True.
            stand (bool, optional): Standardization bit. Defaults to False.
            subsampling (SubsamplingMethod, optional): Subsampling method. If None not active. Defaults to None.
            show_subsampling (bool, optional): If True shows subsampling result. Defaults to False.

        Raises:
            TypeError: if data is not str - DataFrame - ndarray.
        """
        # Data handling
        if type(data) == np.ndarray:
            self.d = pd.DataFrame(data)
            if vars is None: self.d.columns = list(['X_' + str(f) for f in range(len(self.d.columns))])
        elif type(data) == pd.DataFrame:
            self.d = data
        elif type(data) == str:
            self.d = pd.read_csv(data)
        else:
            raise TypeError("data field not in the correct type\ndata must be one of the following type:\n- numpy.ndarray\n- pandas.DataFrame\n- .csv path")


        # Columns name handling
        if vars is not None:
            self.d.columns = list(vars)


        self.orig_features = self.features
        self.orig_pretty_features = self.pretty_features
        self.orig_N = self.N
        self.orig_T = len(self.d)

        # Filling NaNs
        if fill_nan:
            if self.d.isnull().values.any():
                self.d.fillna(inplace=True, method="ffill")
                self.d.fillna(inplace=True, method="bfill")

        # Subsampling data
        if subsampling is not None:
            subsampler = Subsampler(self.d, ss_method = subsampling)
            self.d = pd.DataFrame(subsampler.subsample(), columns = self.features)
            if show_subsampling: subsampler.plot_subsampled_data()

        # Standardize data
        if stand:
            scaler = StandardScaler()
            scaler = scaler.fit(self.d)
            self.d = pd.DataFrame(scaler.transform(self.d), columns = self.features)

    @property  
    def features(self):
        """
        Return list of features.

        Returns:
            list(str): list of feature names.
        """
        return list(self.d.columns)

    @property
    def pretty_features(self):
        """
        Return list of features with LATEX symbols.

        Returns:
            list(str): list of feature names.
        """
        return [r'$' + str(v) + '$' for v in self.d.columns]

    @property
    def N(self):
        """
        Number of features.

        Returns:
            (int): number of features.
        """
        return len(self.d.columns)

    @property
    def T(self):
        """
        Dataframe length.

        Returns:
            (int): dataframe length.
        """
        return len(self.d)


    def shrink(self, selected_features):
        """
        Shrink dataframe d on the selected features.

        Args:
            selected_features (list(str)): list of variables.
        """
        self.d = self.d[selected_features]


    def plot_timeseries(self, savefig = None):
        """
        Plot timeseries data.

        Args:
            savefig (str): figure path.
        """
        # Create grid
        gs = gridspec.GridSpec(self.N, 1)

        # Time vector
        T = list(range(self.T))

        plt.figure()
        for i in range(0, self.d.shape[1]):
            ax = plt.subplot(gs[i, 0])
            plt.plot(T, self.d.values[:, i], color = 'tab:red')
            plt.ylabel(str(self.pretty_features[i]))

        if savefig is not None:
            plt.savefig(savefig)
        else:
            plt.show()


    def save_csv(self, csvpath):
        """
        Save timeseries data into a CSV file.

        Args:
            csvpath (str): CSV path.
        """
        self.d.to_csv(csvpath, index=False)

N property

Number of features.

Returns:

Type Description
int

number of features.

T property

Dataframe length.

Returns:

Type Description
int

dataframe length.

features property

Return list of features.

Returns:

Name Type Description
list str

list of feature names.

pretty_features property

Return list of features with LATEX symbols.

Returns:

Name Type Description
list str

list of feature names.

__init__(data, vars=None, fill_nan=True, stand=False, subsampling=None, show_subsampling=False)

Class constructor.

Parameters:

Name Type Description Default
data str / DataFrame / np.array

it can be a string specifing the path of a csv file to load/pandas.DataFrame/numpy.array.

required
vars list(str)

List containing variable names. If unset then, if data = (str/DataFrame) vars = data columns name elif data = np.array vars = [X_0 .. X_N] Defaults to None.

None
fill_nan bool

Fill NaNs bit. Defaults to True.

True
stand bool

Standardization bit. Defaults to False.

False
subsampling SubsamplingMethod

Subsampling method. If None not active. Defaults to None.

None
show_subsampling bool

If True shows subsampling result. Defaults to False.

False

Raises:

Type Description
TypeError

if data is not str - DataFrame - ndarray.

Source code in causalflow/preprocessing/data.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def __init__(self, data, vars = None, fill_nan = True, stand = False, subsampling : SubsamplingMethod = None, show_subsampling = False):
    """
    Class constructor.

    Args:
        data (str/DataFrame/np.array): it can be a string specifing the path of a csv file to load/pandas.DataFrame/numpy.array.
        vars (list(str), optional): List containing variable names. If unset then, 
            if data = (str/DataFrame) vars = data columns name elif data = np.array vars = [X_0 .. X_N]
            Defaults to None.
        fill_nan (bool, optional): Fill NaNs bit. Defaults to True.
        stand (bool, optional): Standardization bit. Defaults to False.
        subsampling (SubsamplingMethod, optional): Subsampling method. If None not active. Defaults to None.
        show_subsampling (bool, optional): If True shows subsampling result. Defaults to False.

    Raises:
        TypeError: if data is not str - DataFrame - ndarray.
    """
    # Data handling
    if type(data) == np.ndarray:
        self.d = pd.DataFrame(data)
        if vars is None: self.d.columns = list(['X_' + str(f) for f in range(len(self.d.columns))])
    elif type(data) == pd.DataFrame:
        self.d = data
    elif type(data) == str:
        self.d = pd.read_csv(data)
    else:
        raise TypeError("data field not in the correct type\ndata must be one of the following type:\n- numpy.ndarray\n- pandas.DataFrame\n- .csv path")


    # Columns name handling
    if vars is not None:
        self.d.columns = list(vars)


    self.orig_features = self.features
    self.orig_pretty_features = self.pretty_features
    self.orig_N = self.N
    self.orig_T = len(self.d)

    # Filling NaNs
    if fill_nan:
        if self.d.isnull().values.any():
            self.d.fillna(inplace=True, method="ffill")
            self.d.fillna(inplace=True, method="bfill")

    # Subsampling data
    if subsampling is not None:
        subsampler = Subsampler(self.d, ss_method = subsampling)
        self.d = pd.DataFrame(subsampler.subsample(), columns = self.features)
        if show_subsampling: subsampler.plot_subsampled_data()

    # Standardize data
    if stand:
        scaler = StandardScaler()
        scaler = scaler.fit(self.d)
        self.d = pd.DataFrame(scaler.transform(self.d), columns = self.features)

plot_timeseries(savefig=None)

Plot timeseries data.

Parameters:

Name Type Description Default
savefig str

figure path.

None
Source code in causalflow/preprocessing/data.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def plot_timeseries(self, savefig = None):
    """
    Plot timeseries data.

    Args:
        savefig (str): figure path.
    """
    # Create grid
    gs = gridspec.GridSpec(self.N, 1)

    # Time vector
    T = list(range(self.T))

    plt.figure()
    for i in range(0, self.d.shape[1]):
        ax = plt.subplot(gs[i, 0])
        plt.plot(T, self.d.values[:, i], color = 'tab:red')
        plt.ylabel(str(self.pretty_features[i]))

    if savefig is not None:
        plt.savefig(savefig)
    else:
        plt.show()

save_csv(csvpath)

Save timeseries data into a CSV file.

Parameters:

Name Type Description Default
csvpath str

CSV path.

required
Source code in causalflow/preprocessing/data.py
153
154
155
156
157
158
159
160
def save_csv(self, csvpath):
    """
    Save timeseries data into a CSV file.

    Args:
        csvpath (str): CSV path.
    """
    self.d.to_csv(csvpath, index=False)

shrink(selected_features)

Shrink dataframe d on the selected features.

Parameters:

Name Type Description Default
selected_features list(str

list of variables.

required
Source code in causalflow/preprocessing/data.py
118
119
120
121
122
123
124
125
def shrink(self, selected_features):
    """
    Shrink dataframe d on the selected features.

    Args:
        selected_features (list(str)): list of variables.
    """
    self.d = self.d[selected_features]

This module provides the Subsampler class.

Classes

Subsampler: public class for subsampling.

Subsampler

Subsampler class.

It subsamples the data by using a subsampling method chosen among
  • Static - subsamples data by taking one sample each step-samples
  • WSDynamic - entropy based method with dynamic window size computed by breakpoint analysis
  • WSFFTStatic - entropy based method with fixed window size computed by FFT analysis
  • WSStatic - entropy base method with predefined window size
Source code in causalflow/preprocessing/Subsampler.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class Subsampler():
    """
    Subsampler class.

    It subsamples the data by using a subsampling method chosen among:
        - Static - subsamples data by taking one sample each step-samples
        - WSDynamic - entropy based method with dynamic window size computed by breakpoint analysis
        - WSFFTStatic - entropy based method with fixed window size computed by FFT analysis
        - WSStatic - entropy base method with predefined window size
    """

    def __init__(self, 
                 df: pd.DataFrame, 
                 ss_method: SubsamplingMethod):
        """
        Class constructor.

        Args:
            df (pd.DataFrame): dataframe to subsample.
            ss_method (SubsamplingMethod): subsampling method.
        """
        self.df = df
        self.ss_method = ss_method
        self.ss_method.initialise(df)


    def subsample(self):
        """
        Run the subsampling algorithm and returns the subsapled ndarray.

        Returns:
            (ndarray): Subsampled dataframe value.
        """
        self.result = self.ss_method.run()
        return self.df.values[self.result, :]


    def plot_subsampled_data(self, dpi = 100, show = True):
        """
        Plot dataframe sub-sampled data.

        Args:
            dpi (int, optional): image dpi. Defaults to 100.
            show (bool, optional): if True it shows the figure and block the process. Defaults to True.
        """
        n_plot = self.df.shape[1]

        # Create grid
        gs = gridspec.GridSpec(n_plot, 1)

        # Time vector
        T = list(range(0, self.df.shape[0]))

        pl.figure(dpi = dpi)
        for i in range(0, n_plot):
            ax = pl.subplot(gs[i, 0])
            pl.plot(T, self.df.values[:, i], color = 'tab:red')
            pl.scatter(np.array(T)[self.result],
                       self.df.values[self.result, i],
                       s = 80,
                       facecolors = 'none',
                       edgecolors = 'b')
            pl.gca().set(ylabel = r'$' + str(self.df.columns.values[i]) + '$')
        if show:
            pl.show()

__init__(df, ss_method)

Class constructor.

Parameters:

Name Type Description Default
df pd.DataFrame

dataframe to subsample.

required
ss_method SubsamplingMethod

subsampling method.

required
Source code in causalflow/preprocessing/Subsampler.py
25
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, 
             df: pd.DataFrame, 
             ss_method: SubsamplingMethod):
    """
    Class constructor.

    Args:
        df (pd.DataFrame): dataframe to subsample.
        ss_method (SubsamplingMethod): subsampling method.
    """
    self.df = df
    self.ss_method = ss_method
    self.ss_method.initialise(df)

plot_subsampled_data(dpi=100, show=True)

Plot dataframe sub-sampled data.

Parameters:

Name Type Description Default
dpi int

image dpi. Defaults to 100.

100
show bool

if True it shows the figure and block the process. Defaults to True.

True
Source code in causalflow/preprocessing/Subsampler.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def plot_subsampled_data(self, dpi = 100, show = True):
    """
    Plot dataframe sub-sampled data.

    Args:
        dpi (int, optional): image dpi. Defaults to 100.
        show (bool, optional): if True it shows the figure and block the process. Defaults to True.
    """
    n_plot = self.df.shape[1]

    # Create grid
    gs = gridspec.GridSpec(n_plot, 1)

    # Time vector
    T = list(range(0, self.df.shape[0]))

    pl.figure(dpi = dpi)
    for i in range(0, n_plot):
        ax = pl.subplot(gs[i, 0])
        pl.plot(T, self.df.values[:, i], color = 'tab:red')
        pl.scatter(np.array(T)[self.result],
                   self.df.values[self.result, i],
                   s = 80,
                   facecolors = 'none',
                   edgecolors = 'b')
        pl.gca().set(ylabel = r'$' + str(self.df.columns.values[i]) + '$')
    if show:
        pl.show()

subsample()

Run the subsampling algorithm and returns the subsapled ndarray.

Returns:

Type Description
ndarray

Subsampled dataframe value.

Source code in causalflow/preprocessing/Subsampler.py
40
41
42
43
44
45
46
47
48
def subsample(self):
    """
    Run the subsampling algorithm and returns the subsapled ndarray.

    Returns:
        (ndarray): Subsampled dataframe value.
    """
    self.result = self.ss_method.run()
    return self.df.values[self.result, :]

This module provides the EntropyBasedMethod class.

Classes

EntropyBasedMethod: EntropyBasedMethod abstract class.

EntropyBasedMethod

Bases: ABC

EntropyBasedMethod abstract class.

Source code in causalflow/preprocessing/subsampling_methods/EntropyBasedMethod.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class EntropyBasedMethod(ABC):
    """EntropyBasedMethod abstract class."""

    def __init__(self, threshold):
        """
        Class constructor.

        Args:
            threshold (float): entropy threshold.
        """
        self.windows = list()
        self.segments = list()
        self.threshold = threshold


    def create_rounded_copy(self):
        """
        Create deepcopy of the dataframe but with rounded values.

        Returns:
            (pd.DataFrame): rounded dataframe.
        """
        de = deepcopy(self.df)
        de = de.round(1)
        return de


    def __normalization(self):
        """Normalize entropy for each moving window."""
        max_e = max([mw.entropy for mw in self.windows])
        for mw in self.windows:
            mw.entropy = mw.entropy / max_e


    def moving_window_analysis(self):
        """Compute dataframe entropy on moving windows."""
        de = self.create_rounded_copy()

        for ll, rl in self.segments:
            # Create moving window
            mw_df = de.values[ll: rl]

            # Build a Moving Window
            mw = MovingWindow(mw_df)

            # Compute entropy
            mw.get_entropy()

            # Compute optimal number of samples
            mw.optimal_sampling(self.threshold)

            # Collect result in a list
            self.windows.append(mw)

        # Entropy normalization
        self.__normalization()


    def extract_indexes(self):
        """Extract a list of indexes corresponding to the samples selected by the subsampling procedure."""
        _sample_index_list = list()
        for i, mw in enumerate(self.windows):
            sum_ws = sum([wind.T for wind in self.windows[:i]])
            sample_index = [si + sum_ws for si in mw.opt_samples_index]
            _sample_index_list += sample_index
        return _sample_index_list


    @abstractmethod
    def dataset_segmentation(self):
        """Abstract method."""
        pass

__init__(threshold)

Class constructor.

Parameters:

Name Type Description Default
threshold float

entropy threshold.

required
Source code in causalflow/preprocessing/subsampling_methods/EntropyBasedMethod.py
16
17
18
19
20
21
22
23
24
25
def __init__(self, threshold):
    """
    Class constructor.

    Args:
        threshold (float): entropy threshold.
    """
    self.windows = list()
    self.segments = list()
    self.threshold = threshold

__normalization()

Normalize entropy for each moving window.

Source code in causalflow/preprocessing/subsampling_methods/EntropyBasedMethod.py
40
41
42
43
44
def __normalization(self):
    """Normalize entropy for each moving window."""
    max_e = max([mw.entropy for mw in self.windows])
    for mw in self.windows:
        mw.entropy = mw.entropy / max_e

create_rounded_copy()

Create deepcopy of the dataframe but with rounded values.

Returns:

Type Description
pd.DataFrame

rounded dataframe.

Source code in causalflow/preprocessing/subsampling_methods/EntropyBasedMethod.py
28
29
30
31
32
33
34
35
36
37
def create_rounded_copy(self):
    """
    Create deepcopy of the dataframe but with rounded values.

    Returns:
        (pd.DataFrame): rounded dataframe.
    """
    de = deepcopy(self.df)
    de = de.round(1)
    return de

dataset_segmentation() abstractmethod

Abstract method.

Source code in causalflow/preprocessing/subsampling_methods/EntropyBasedMethod.py
81
82
83
84
@abstractmethod
def dataset_segmentation(self):
    """Abstract method."""
    pass

extract_indexes()

Extract a list of indexes corresponding to the samples selected by the subsampling procedure.

Source code in causalflow/preprocessing/subsampling_methods/EntropyBasedMethod.py
71
72
73
74
75
76
77
78
def extract_indexes(self):
    """Extract a list of indexes corresponding to the samples selected by the subsampling procedure."""
    _sample_index_list = list()
    for i, mw in enumerate(self.windows):
        sum_ws = sum([wind.T for wind in self.windows[:i]])
        sample_index = [si + sum_ws for si in mw.opt_samples_index]
        _sample_index_list += sample_index
    return _sample_index_list

moving_window_analysis()

Compute dataframe entropy on moving windows.

Source code in causalflow/preprocessing/subsampling_methods/EntropyBasedMethod.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def moving_window_analysis(self):
    """Compute dataframe entropy on moving windows."""
    de = self.create_rounded_copy()

    for ll, rl in self.segments:
        # Create moving window
        mw_df = de.values[ll: rl]

        # Build a Moving Window
        mw = MovingWindow(mw_df)

        # Compute entropy
        mw.get_entropy()

        # Compute optimal number of samples
        mw.optimal_sampling(self.threshold)

        # Collect result in a list
        self.windows.append(mw)

    # Entropy normalization
    self.__normalization()

This module provides subsampling methods for data preprocessing.

Classes

SSMode: An enumerator containing all the supported subsampling methods. SubsamplingMethod: A class for implementing various subsampling techniques.

SSMode

Bases: Enum

Enumerator containing all the supported subsampling methods.

Source code in causalflow/preprocessing/subsampling_methods/SubsamplingMethod.py
14
15
16
17
18
19
20
class SSMode(Enum):
    """Enumerator containing all the supported subsampling methods."""

    WSDynamic = 'Dynamic-size moving window'
    WSStatic = 'Static-size moving window'
    WSFFTStatic = 'FFT static-size moving window'
    Static = 'Static'

SubsamplingMethod

Bases: ABC

SubsamplingMethod abstract class.

Source code in causalflow/preprocessing/subsampling_methods/SubsamplingMethod.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class SubsamplingMethod(ABC):
    """SubsamplingMethod abstract class."""

    def __init__(self, ssmode: SSMode):
        """
        Class constructor.

        Args:
            ssmode (SSMore): Subsampling method.
        """
        self.ssmode = ssmode
        self.df = None


    def initialise(self, dataframe: pd.DataFrame):
        """
        Initialise class by setting the dataframe to subsample.

        Args:
            dataframe (pd.DataFrame): Pandas DataFrame to subsample.
        """
        self.df = dataframe


    @abstractmethod
    def run(self):
        """Run subsampler."""
        pass

__init__(ssmode)

Class constructor.

Parameters:

Name Type Description Default
ssmode SSMore

Subsampling method.

required
Source code in causalflow/preprocessing/subsampling_methods/SubsamplingMethod.py
26
27
28
29
30
31
32
33
34
def __init__(self, ssmode: SSMode):
    """
    Class constructor.

    Args:
        ssmode (SSMore): Subsampling method.
    """
    self.ssmode = ssmode
    self.df = None

initialise(dataframe)

Initialise class by setting the dataframe to subsample.

Parameters:

Name Type Description Default
dataframe pd.DataFrame

Pandas DataFrame to subsample.

required
Source code in causalflow/preprocessing/subsampling_methods/SubsamplingMethod.py
37
38
39
40
41
42
43
44
def initialise(self, dataframe: pd.DataFrame):
    """
    Initialise class by setting the dataframe to subsample.

    Args:
        dataframe (pd.DataFrame): Pandas DataFrame to subsample.
    """
    self.df = dataframe

run() abstractmethod

Run subsampler.

Source code in causalflow/preprocessing/subsampling_methods/SubsamplingMethod.py
47
48
49
50
@abstractmethod
def run(self):
    """Run subsampler."""
    pass

This module provides the MovingWindow class to facilitate the entropy-based subsampling methods.

Classes

MovingWindow: A class used by the entropy-based subsampling methods.

MovingWindow

Moving window class used by the entropy-based subsampling methods.

Source code in causalflow/preprocessing/subsampling_methods/moving_window.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class MovingWindow:
    """Moving window class used by the entropy-based subsampling methods."""

    def __init__(self, window):
        """
        Class constuctor.

        Args:
            window (int): moving window size.
        """
        self.window = window
        self.T, self.dim = window.shape
        self.entropy = None
        self.opt_size = None
        self.opt_samples_index = None

    def get_pdf(self):
        """Compute the probability distribution function from an array of data."""
        counts = {}

        for i in range(0, self.T):
            t = tuple(self.window[i, :])
            if t in counts:
                counts[t] += 1
            else:
                counts[t] = 1

        pdf = {k: v / self.T for k, v in counts.items()}

        return list(pdf.values())


    def get_entropy(self):
        """Compute the entropy based on probability distribution function."""
        self.entropy = entropy(self.get_pdf(), base = 2)


    def samples_selector(self, step) -> list:
        """
        Select sample to be taken from a moving window.

        Args:
            step (int): subsampling frequency.

        Returns:
            list: list of indexes corresponding to the sample to be taken.
        """
        return [i for i in range(0, self.T, step)]


    def optimal_sampling(self, thres):
        """
        Find the optimal number of sample for a particular moving window.

        Args:
            thres (float): stopping criteria threshold.
        """
        converged = False
        _old_step = 0
        _sub_index = list(range(0, self.T))
        _old_sub_index = list(range(0, self.T))
        _max_n = math.floor(self.T / 2)

        for n in range(_max_n, 1, -1):
            # resampling window with n samples and build another Moving Window
            step = int(self.T / n)
            if step == _old_step:
                continue
            _old_step = step
            _old_sub_index = _sub_index
            _sub_index = self.samples_selector(step)
            _sub_w = MovingWindow(self.window[_sub_index])

            # compute entropy on the sub window
            _sub_w.get_entropy()

            # stopping criteria
            if self.entropy != 0:
                if abs(_sub_w.entropy - self.entropy) / self.entropy >= thres:
                    converged = True
                    break
        self.opt_size = len(_old_sub_index) if converged else len(_sub_index)
        self.opt_samples_index = _old_sub_index if converged else _sub_index

__init__(window)

Class constuctor.

Parameters:

Name Type Description Default
window int

moving window size.

required
Source code in causalflow/preprocessing/subsampling_methods/moving_window.py
15
16
17
18
19
20
21
22
23
24
25
26
def __init__(self, window):
    """
    Class constuctor.

    Args:
        window (int): moving window size.
    """
    self.window = window
    self.T, self.dim = window.shape
    self.entropy = None
    self.opt_size = None
    self.opt_samples_index = None

get_entropy()

Compute the entropy based on probability distribution function.

Source code in causalflow/preprocessing/subsampling_methods/moving_window.py
44
45
46
def get_entropy(self):
    """Compute the entropy based on probability distribution function."""
    self.entropy = entropy(self.get_pdf(), base = 2)

get_pdf()

Compute the probability distribution function from an array of data.

Source code in causalflow/preprocessing/subsampling_methods/moving_window.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def get_pdf(self):
    """Compute the probability distribution function from an array of data."""
    counts = {}

    for i in range(0, self.T):
        t = tuple(self.window[i, :])
        if t in counts:
            counts[t] += 1
        else:
            counts[t] = 1

    pdf = {k: v / self.T for k, v in counts.items()}

    return list(pdf.values())

optimal_sampling(thres)

Find the optimal number of sample for a particular moving window.

Parameters:

Name Type Description Default
thres float

stopping criteria threshold.

required
Source code in causalflow/preprocessing/subsampling_methods/moving_window.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def optimal_sampling(self, thres):
    """
    Find the optimal number of sample for a particular moving window.

    Args:
        thres (float): stopping criteria threshold.
    """
    converged = False
    _old_step = 0
    _sub_index = list(range(0, self.T))
    _old_sub_index = list(range(0, self.T))
    _max_n = math.floor(self.T / 2)

    for n in range(_max_n, 1, -1):
        # resampling window with n samples and build another Moving Window
        step = int(self.T / n)
        if step == _old_step:
            continue
        _old_step = step
        _old_sub_index = _sub_index
        _sub_index = self.samples_selector(step)
        _sub_w = MovingWindow(self.window[_sub_index])

        # compute entropy on the sub window
        _sub_w.get_entropy()

        # stopping criteria
        if self.entropy != 0:
            if abs(_sub_w.entropy - self.entropy) / self.entropy >= thres:
                converged = True
                break
    self.opt_size = len(_old_sub_index) if converged else len(_sub_index)
    self.opt_samples_index = _old_sub_index if converged else _sub_index

samples_selector(step)

Select sample to be taken from a moving window.

Parameters:

Name Type Description Default
step int

subsampling frequency.

required

Returns:

Name Type Description
list list

list of indexes corresponding to the sample to be taken.

Source code in causalflow/preprocessing/subsampling_methods/moving_window.py
49
50
51
52
53
54
55
56
57
58
59
def samples_selector(self, step) -> list:
    """
    Select sample to be taken from a moving window.

    Args:
        step (int): subsampling frequency.

    Returns:
        list: list of indexes corresponding to the sample to be taken.
    """
    return [i for i in range(0, self.T, step)]

This module provides the Static class.

Classes

Static: Subsamples data by taking one sample each step-samples.

Static

Bases: SubsamplingMethod

Subsample data by taking one sample each step-samples.

Source code in causalflow/preprocessing/subsampling_methods/Static.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Static(SubsamplingMethod):
    """Subsample data by taking one sample each step-samples."""

    def __init__(self, step):
        """
        Class constructor.

        Args:
            step (int): integer subsampling step.

        Raises:
            ValueError: if step == None.
        """
        super().__init__(SSMode.Static)
        if step is None:
            raise ValueError("step not specified")
        self.step = step

    def run(self):
        """Run subsampler."""
        return range(0, len(self.df.values), self.step)

__init__(step)

Class constructor.

Parameters:

Name Type Description Default
step int

integer subsampling step.

required

Raises:

Type Description
ValueError

if step == None.

Source code in causalflow/preprocessing/subsampling_methods/Static.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def __init__(self, step):
    """
    Class constructor.

    Args:
        step (int): integer subsampling step.

    Raises:
        ValueError: if step == None.
    """
    super().__init__(SSMode.Static)
    if step is None:
        raise ValueError("step not specified")
    self.step = step

run()

Run subsampler.

Source code in causalflow/preprocessing/subsampling_methods/Static.py
28
29
30
def run(self):
    """Run subsampler."""
    return range(0, len(self.df.values), self.step)

This module provides the WSDynamic class.

Classes

WSDynamic: Subsampling method with dynamic window size based on entropy analysis.

WSDynamic

Bases: SubsamplingMethod, EntropyBasedMethod

Subsampling method with dynamic window size based on entropy analysis.

Source code in causalflow/preprocessing/subsampling_methods/WSDynamic.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class WSDynamic(SubsamplingMethod, EntropyBasedMethod):
    """Subsampling method with dynamic window size based on entropy analysis."""

    def __init__(self, window_min_size, entropy_threshold):
        """
        Class constructor.

        Args:
            window_min_size (int): minimun window size.
            entropy_threshold (float): entropy threshold.

        Raises:
            ValueError: if window_min_size == None.
        """
        SubsamplingMethod.__init__(self, SSMode.WSDynamic)
        EntropyBasedMethod.__init__(self, entropy_threshold)
        if window_min_size is None:
            raise ValueError("window_type = DYNAMIC but window_min_size not specified")
        self.wms = window_min_size
        self.ws = None


    def dataset_segmentation(self):
        """Segment dataset based on breakpoint analysis and a min window size."""
        de = self.create_rounded_copy()
        algo = rpt.Pelt(model = "l2", min_size = self.wms).fit(de)
        seg_res = algo.predict(pen = 10)
        self.segments = [(seg_res[i - 1], seg_res[i]) for i in range(1, len(seg_res))]
        self.segments.insert(0, (0, seg_res[0]))


    def run(self):
        """
        Run subsampler.

        Returns:
            (list[int]): indexes of the remaining samples.
        """
        # build list of segment
        self.dataset_segmentation()

        # compute entropy moving window
        self.moving_window_analysis()

        # extracting subsampling procedure results
        idxs = self.extract_indexes()

        return idxs

__init__(window_min_size, entropy_threshold)

Class constructor.

Parameters:

Name Type Description Default
window_min_size int

minimun window size.

required
entropy_threshold float

entropy threshold.

required

Raises:

Type Description
ValueError

if window_min_size == None.

Source code in causalflow/preprocessing/subsampling_methods/WSDynamic.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def __init__(self, window_min_size, entropy_threshold):
    """
    Class constructor.

    Args:
        window_min_size (int): minimun window size.
        entropy_threshold (float): entropy threshold.

    Raises:
        ValueError: if window_min_size == None.
    """
    SubsamplingMethod.__init__(self, SSMode.WSDynamic)
    EntropyBasedMethod.__init__(self, entropy_threshold)
    if window_min_size is None:
        raise ValueError("window_type = DYNAMIC but window_min_size not specified")
    self.wms = window_min_size
    self.ws = None

dataset_segmentation()

Segment dataset based on breakpoint analysis and a min window size.

Source code in causalflow/preprocessing/subsampling_methods/WSDynamic.py
35
36
37
38
39
40
41
def dataset_segmentation(self):
    """Segment dataset based on breakpoint analysis and a min window size."""
    de = self.create_rounded_copy()
    algo = rpt.Pelt(model = "l2", min_size = self.wms).fit(de)
    seg_res = algo.predict(pen = 10)
    self.segments = [(seg_res[i - 1], seg_res[i]) for i in range(1, len(seg_res))]
    self.segments.insert(0, (0, seg_res[0]))

run()

Run subsampler.

Returns:

Type Description
list[int]

indexes of the remaining samples.

Source code in causalflow/preprocessing/subsampling_methods/WSDynamic.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def run(self):
    """
    Run subsampler.

    Returns:
        (list[int]): indexes of the remaining samples.
    """
    # build list of segment
    self.dataset_segmentation()

    # compute entropy moving window
    self.moving_window_analysis()

    # extracting subsampling procedure results
    idxs = self.extract_indexes()

    return idxs

This module provides the WSFFTStatic class.

Classes

WSFFTStatic: Subsampling method with static window size based on Fourier analysis.

WSFFTStatic

Bases: SubsamplingMethod, EntropyBasedMethod

Subsampling method with static window size based on Fourier analysis.

Source code in causalflow/preprocessing/subsampling_methods/WSFFTStatic.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class WSFFTStatic(SubsamplingMethod, EntropyBasedMethod):
    """Subsampling method with static window size based on Fourier analysis."""

    def __init__(self, sampling_time, entropy_threshold):
        """
        Class constructor.

        Args:
            sampling_time (float): timeseries sampling time.
            entropy_threshold (float): entropy threshold.
        """
        SubsamplingMethod.__init__(self, SSMode.WSFFTStatic)
        EntropyBasedMethod.__init__(self, entropy_threshold)
        self.sampling_time = sampling_time


    def __fourier_window(self):
        """
        Compute window size based on Fourier analysis performed on dataframe.

        Returns:
            (int): window size
        """
        N, dim = self.df.shape
        xf = rfftfreq(N, self.sampling_time)
        w_array = list()
        for i in range(0, dim):
            yf = np.abs(rfft(self.df.values[:, i]))

            peak_indices, _ = scipy.signal.find_peaks(yf)
            highest_peak_index = peak_indices[np.argmax(yf[peak_indices])]
            w_array.append(ceil(1 / (2 * xf[highest_peak_index]) / self.sampling_time))
            # fig, ax = pl.subplots()
            # ax.plot(xf, yf)
            # ax.plot(xf[highest_peak_index], np.abs(yf[highest_peak_index]), "x")
            # pl.show()
        return min(w_array)


    def dataset_segmentation(self):
        """Segments dataset with a fixed window size."""
        seg_res = [i for i in range(0, len(self.df.values), self.ws)]
        self.segments = [(i, i + self.ws) for i in range(0, len(self.df.values) - self.ws, self.ws)]
        if not seg_res.__contains__(len(self.df.values)):
            self.segments.append((seg_res[-1], len(self.df.values)))
            seg_res.append(len(self.df.values))


    def run(self):
        """
        Run subsampler.

        Returns:
            (list[int]): indexes of the remaining samples.
        """
        # define window size
        self.ws = self.__fourier_window()

        # build list of segment
        self.dataset_segmentation()

        # compute entropy moving window
        self.moving_window_analysis()

        # extracting subsampling procedure results
        idxs = self.extract_indexes()

        return idxs

__fourier_window()

Compute window size based on Fourier analysis performed on dataframe.

Returns:

Type Description
int

window size

Source code in causalflow/preprocessing/subsampling_methods/WSFFTStatic.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __fourier_window(self):
    """
    Compute window size based on Fourier analysis performed on dataframe.

    Returns:
        (int): window size
    """
    N, dim = self.df.shape
    xf = rfftfreq(N, self.sampling_time)
    w_array = list()
    for i in range(0, dim):
        yf = np.abs(rfft(self.df.values[:, i]))

        peak_indices, _ = scipy.signal.find_peaks(yf)
        highest_peak_index = peak_indices[np.argmax(yf[peak_indices])]
        w_array.append(ceil(1 / (2 * xf[highest_peak_index]) / self.sampling_time))
        # fig, ax = pl.subplots()
        # ax.plot(xf, yf)
        # ax.plot(xf[highest_peak_index], np.abs(yf[highest_peak_index]), "x")
        # pl.show()
    return min(w_array)

__init__(sampling_time, entropy_threshold)

Class constructor.

Parameters:

Name Type Description Default
sampling_time float

timeseries sampling time.

required
entropy_threshold float

entropy threshold.

required
Source code in causalflow/preprocessing/subsampling_methods/WSFFTStatic.py
19
20
21
22
23
24
25
26
27
28
29
def __init__(self, sampling_time, entropy_threshold):
    """
    Class constructor.

    Args:
        sampling_time (float): timeseries sampling time.
        entropy_threshold (float): entropy threshold.
    """
    SubsamplingMethod.__init__(self, SSMode.WSFFTStatic)
    EntropyBasedMethod.__init__(self, entropy_threshold)
    self.sampling_time = sampling_time

dataset_segmentation()

Segments dataset with a fixed window size.

Source code in causalflow/preprocessing/subsampling_methods/WSFFTStatic.py
55
56
57
58
59
60
61
def dataset_segmentation(self):
    """Segments dataset with a fixed window size."""
    seg_res = [i for i in range(0, len(self.df.values), self.ws)]
    self.segments = [(i, i + self.ws) for i in range(0, len(self.df.values) - self.ws, self.ws)]
    if not seg_res.__contains__(len(self.df.values)):
        self.segments.append((seg_res[-1], len(self.df.values)))
        seg_res.append(len(self.df.values))

run()

Run subsampler.

Returns:

Type Description
list[int]

indexes of the remaining samples.

Source code in causalflow/preprocessing/subsampling_methods/WSFFTStatic.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def run(self):
    """
    Run subsampler.

    Returns:
        (list[int]): indexes of the remaining samples.
    """
    # define window size
    self.ws = self.__fourier_window()

    # build list of segment
    self.dataset_segmentation()

    # compute entropy moving window
    self.moving_window_analysis()

    # extracting subsampling procedure results
    idxs = self.extract_indexes()

    return idxs

This module provides the WSStatic class.

Classes

WSStatic: Entropy based subsampling method with static window size.

WSStatic

Bases: SubsamplingMethod, EntropyBasedMethod

Entropy based subsampling method with static window size.

Source code in causalflow/preprocessing/subsampling_methods/WSStatic.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class WSStatic(SubsamplingMethod, EntropyBasedMethod):
    """Entropy based subsampling method with static window size."""

    def __init__(self, window_size, entropy_threshold):
        """
        Class constructor.

        Args:
            window_size (int): minimun window size.
            entropy_threshold (float): entropy threshold.

        Raises:
            ValueError: if window_size == None.
        """
        SubsamplingMethod.__init__(self, SSMode.WSDynamic)
        EntropyBasedMethod.__init__(self, entropy_threshold)
        if window_size is None:
            raise ValueError("window_type = STATIC but window_size not specified")
        self.ws = window_size


    def dataset_segmentation(self):
        """Segment dataset with a fixed window size."""
        seg_res = [i for i in range(0, len(self.df.values), self.ws)]
        self.segments = [(i, i + self.ws) for i in range(0, len(self.df.values) - self.ws, self.ws)]
        if not seg_res.__contains__(len(self.df.values)):
            self.segments.append((seg_res[-1], len(self.df.values)))
            seg_res.append(len(self.df.values))


    def run(self):
        """
        Run subsampler.

        Returns:
            (list[int]): indexes of the remaining samples.
        """
        # build list of segment
        self.dataset_segmentation()

        # compute entropy moving window
        self.moving_window_analysis()

        # extracting subsampling procedure results
        idxs = self.extract_indexes()

        return idxs

__init__(window_size, entropy_threshold)

Class constructor.

Parameters:

Name Type Description Default
window_size int

minimun window size.

required
entropy_threshold float

entropy threshold.

required

Raises:

Type Description
ValueError

if window_size == None.

Source code in causalflow/preprocessing/subsampling_methods/WSStatic.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def __init__(self, window_size, entropy_threshold):
    """
    Class constructor.

    Args:
        window_size (int): minimun window size.
        entropy_threshold (float): entropy threshold.

    Raises:
        ValueError: if window_size == None.
    """
    SubsamplingMethod.__init__(self, SSMode.WSDynamic)
    EntropyBasedMethod.__init__(self, entropy_threshold)
    if window_size is None:
        raise ValueError("window_type = STATIC but window_size not specified")
    self.ws = window_size

dataset_segmentation()

Segment dataset with a fixed window size.

Source code in causalflow/preprocessing/subsampling_methods/WSStatic.py
33
34
35
36
37
38
39
def dataset_segmentation(self):
    """Segment dataset with a fixed window size."""
    seg_res = [i for i in range(0, len(self.df.values), self.ws)]
    self.segments = [(i, i + self.ws) for i in range(0, len(self.df.values) - self.ws, self.ws)]
    if not seg_res.__contains__(len(self.df.values)):
        self.segments.append((seg_res[-1], len(self.df.values)))
        seg_res.append(len(self.df.values))

run()

Run subsampler.

Returns:

Type Description
list[int]

indexes of the remaining samples.

Source code in causalflow/preprocessing/subsampling_methods/WSStatic.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def run(self):
    """
    Run subsampler.

    Returns:
        (list[int]): indexes of the remaining samples.
    """
    # build list of segment
    self.dataset_segmentation()

    # compute entropy moving window
    self.moving_window_analysis()

    # extracting subsampling procedure results
    idxs = self.extract_indexes()

    return idxs