Skip to content

Feature Selection Methods

This module provides various classes for feature selection analysis.

Classes

CTest: support class for handling different feature selection methods. SelectionMethod: Abstract class.

CTest

Bases: Enum

CTest Enumerator.

Source code in causalflow/selection_methods/SelectionMethod.py
19
20
21
22
23
24
class CTest(Enum):
    """CTest Enumerator."""

    Corr = "Correlation"
    MI = "Mutual Information"
    TE = "Transfer Entropy"

SelectionMethod

Bases: ABC

SelectionMethod abstract class.

Source code in causalflow/selection_methods/SelectionMethod.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class SelectionMethod(ABC):
    """SelectionMethod abstract class."""

    def __init__(self, ctest):
        """
        Class constructor.

        Args:
            ctest (CTest): Feature Selection method's name.
        """
        self.ctest = ctest
        self.data = None
        self.alpha = None
        self.min_lag = None
        self.max_lag = None
        self.result = None


    @property
    def name(self):
        """
        Return Selection Method name.

        Returns:
            (str): Selection Method name.
        """
        return self.ctest.value


    def initialise(self, data: Data, alpha, min_lag, max_lag, graph):
        """
        Initialise the selection method.

        Args:
            data (Data): Data.
            alpha (float): significance threshold.
            min_lag (int): min lag time.
            max_lag (int): max lag time.
            graph (DAG): initial DAG (empty).
        """
        self.data = data
        self.alpha = alpha
        self.min_lag = min_lag
        self.max_lag = max_lag
        self.result = graph


    @abstractmethod
    def compute_dependencies(self) -> DAG:
        """Abstract method."""
        pass


    def _prepare_ts(self, target, lag, apply_lag = True, consider_autodep = True):
        """
        Prepare the dataframe to the analysis.

        Args:
            target (str): name target var
            lag (int): lag time to apply
            apply_lag (bool, optional): True if you want to apply the lag, False otherwise. Defaults to True.
            consider_autodep (bool, optional): True if you want to consider autodependecy check. Defaults to True.

        Returns:
            tuple(DataFrame, DataFrame): source and target dataframe.
        """
        if not consider_autodep:
            if apply_lag:
                Y = self.data.d[target][lag:]
                X = self.data.d.loc[:, self.data.d.columns != target][:-lag]
            else:
                Y = self.data.d[target]
                X = self.data.d.loc[:, self.data.d.columns != target]
        else:
            if apply_lag:
                Y = self.data.d[target][lag:]
                X = self.data.d[:-lag]
            else:
                Y = self.data.d[target]
                X = self.data.d
        return X, Y


    def _add_dependency(self, t, s, score, pval, lag):
        """
        Add dependency from source (s) to target (t) specifying the score, pval and the lag.

        Args:
            t (str): target feature name.
            s (str): source feature name.
            score (float): selection method score.
            pval (float): pval associated to the dependency.
            lag (int): lag time of the dependency.
        """
        self.result.add_source(t, s, score, pval, lag)

        str_s = "(" + s + " -" + str(lag) + ")"
        str_t = "(" + t + ")"

        CP.info("\tlink: " + str_s + " -?> " + str_t)
        CP.info("\t|val = " + str(round(score,3)) + " |pval = " + str(str(round(pval,3))))

name property

Return Selection Method name.

Returns:

Type Description
str

Selection Method name.

__init__(ctest)

Class constructor.

Parameters:

Name Type Description Default
ctest CTest

Feature Selection method's name.

required
Source code in causalflow/selection_methods/SelectionMethod.py
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(self, ctest):
    """
    Class constructor.

    Args:
        ctest (CTest): Feature Selection method's name.
    """
    self.ctest = ctest
    self.data = None
    self.alpha = None
    self.min_lag = None
    self.max_lag = None
    self.result = None

compute_dependencies() abstractmethod

Abstract method.

Source code in causalflow/selection_methods/SelectionMethod.py
86
87
88
89
@abstractmethod
def compute_dependencies(self) -> DAG:
    """Abstract method."""
    pass

initialise(data, alpha, min_lag, max_lag, graph)

Initialise the selection method.

Parameters:

Name Type Description Default
data Data

Data.

required
alpha float

significance threshold.

required
min_lag int

min lag time.

required
max_lag int

max lag time.

required
graph DAG

initial DAG (empty).

required
Source code in causalflow/selection_methods/SelectionMethod.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def initialise(self, data: Data, alpha, min_lag, max_lag, graph):
    """
    Initialise the selection method.

    Args:
        data (Data): Data.
        alpha (float): significance threshold.
        min_lag (int): min lag time.
        max_lag (int): max lag time.
        graph (DAG): initial DAG (empty).
    """
    self.data = data
    self.alpha = alpha
    self.min_lag = min_lag
    self.max_lag = max_lag
    self.result = graph

This module provides various classes for Correlation-based feature selection analysis.

Classes

Corr: Correlation class.

Corr

Bases: SelectionMethod

Feature selection method based on Correlation analysis.

Source code in causalflow/selection_methods/Corr.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class Corr(SelectionMethod):
    """Feature selection method based on Correlation analysis."""

    def __init__(self):
        """Contructor class."""
        super().__init__(CTest.Corr)


    def compute_dependencies(self):
        """
        Compute list of dependencies for each target by correlation analysis.

        Returns:
            (dict): dictonary(TARGET: list SOURCES)
        """
        CP.info("\n##")
        CP.info("## " + self.name + " analysis")
        CP.info("##")

        for lag in range(self.min_lag, self.max_lag + 1):
            for target in self.data.features:
                CP.info("\n## Target variable: " + target)

                X, Y = self._prepare_ts(target, lag)
                scores, pval = f_regression(X, Y)

                # Filter on pvalue
                f = pval < self.alpha

                # Result of the selection
                sel_sources, sel_sources_score, sel_sources_pval = X.columns[f].tolist(), scores[f].tolist(), pval[f].tolist()

                for s, score, pval in zip(sel_sources, sel_sources_score, sel_sources_pval):
                    self._add_dependency(target, s, score, pval, lag)

        return self.result

__init__()

Contructor class.

Source code in causalflow/selection_methods/Corr.py
15
16
17
def __init__(self):
    """Contructor class."""
    super().__init__(CTest.Corr)

compute_dependencies()

Compute list of dependencies for each target by correlation analysis.

Returns:

Type Description
dict

dictonary(TARGET: list SOURCES)

Source code in causalflow/selection_methods/Corr.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def compute_dependencies(self):
    """
    Compute list of dependencies for each target by correlation analysis.

    Returns:
        (dict): dictonary(TARGET: list SOURCES)
    """
    CP.info("\n##")
    CP.info("## " + self.name + " analysis")
    CP.info("##")

    for lag in range(self.min_lag, self.max_lag + 1):
        for target in self.data.features:
            CP.info("\n## Target variable: " + target)

            X, Y = self._prepare_ts(target, lag)
            scores, pval = f_regression(X, Y)

            # Filter on pvalue
            f = pval < self.alpha

            # Result of the selection
            sel_sources, sel_sources_score, sel_sources_pval = X.columns[f].tolist(), scores[f].tolist(), pval[f].tolist()

            for s, score, pval in zip(sel_sources, sel_sources_score, sel_sources_pval):
                self._add_dependency(target, s, score, pval, lag)

    return self.result

This module provides various classes for Partial Correlation-based feature selection analysis.

Classes

ParCorr: Partial Correlation class.

ParCorr

Bases: SelectionMethod

Feature selection method based on Partial Correlation analysis.

Source code in causalflow/selection_methods/ParCorr.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class ParCorr(SelectionMethod):
    """Feature selection method based on Partial Correlation analysis."""

    def __init__(self):
        """Class contructor."""
        super().__init__(CTest.Corr)


    def get_residual(self, covar, target):
        """
        Calculate residual of the target variable obtaining conditioning on the covar variables.

        Args:
            covar (np.array): conditioning variables.
            target (np.array): target variable.

        Returns:
            (np.array): residual.
        """
        beta = np.linalg.lstsq(covar, target, rcond=None)[0]
        return target - np.dot(covar, beta)


    def partial_corr(self, X, Y, Z):
        """
        Calculate Partial correlation between X and Y conditioning on Z.

        Args:
            X (np.array): source candidate variable.
            Y (np.array): target variable.
            Z (np.array): conditioning variable.

        Returns:
            (float, float): partial correlation, p-value.
        """
        pcorr, pval = stats.pearsonr(self.get_residual(Z, X), self.get_residual(Z, Y))

        return pcorr, pval

    def compute_dependencies(self):
        """
        Compute list of dependencies for each target by partial correlation analysis.

        Returns:
            (dict): dictonary(TARGET: list SOURCES).
        """
        CP.info("\n##")
        CP.info("## " + self.name + " analysis")
        CP.info("##")

        for lag in range(self.min_lag, self.max_lag + 1):
            for target in self.data.features:
                CP.info("\n## Target variable: " + target)
                candidates = self.data.features

                Y = np.array(self.data.d[target][lag:])

                while candidates:
                    tmp_res = None
                    covars = self._get_sources(target)
                    Z = np.array(self.data.d[covars][:-lag])

                    for candidate in candidates:
                        X = np.array(self.data.d[candidate][:-lag])
                        score, pval = self.partial_corr(X, Y, Z)
                        if pval < self.alpha and (tmp_res is None or abs(tmp_res[1]) < abs(score)):
                            tmp_res = (candidate, score, pval)

                    if tmp_res is not None: 
                        self._add_dependency(target, tmp_res[0], tmp_res[1], tmp_res[2], lag)
                        candidates.remove(tmp_res[0])
                    else:
                        break
        return self.result

__init__()

Class contructor.

Source code in causalflow/selection_methods/ParCorr.py
16
17
18
def __init__(self):
    """Class contructor."""
    super().__init__(CTest.Corr)

compute_dependencies()

Compute list of dependencies for each target by partial correlation analysis.

Returns:

Type Description
dict

dictonary(TARGET: list SOURCES).

Source code in causalflow/selection_methods/ParCorr.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def compute_dependencies(self):
    """
    Compute list of dependencies for each target by partial correlation analysis.

    Returns:
        (dict): dictonary(TARGET: list SOURCES).
    """
    CP.info("\n##")
    CP.info("## " + self.name + " analysis")
    CP.info("##")

    for lag in range(self.min_lag, self.max_lag + 1):
        for target in self.data.features:
            CP.info("\n## Target variable: " + target)
            candidates = self.data.features

            Y = np.array(self.data.d[target][lag:])

            while candidates:
                tmp_res = None
                covars = self._get_sources(target)
                Z = np.array(self.data.d[covars][:-lag])

                for candidate in candidates:
                    X = np.array(self.data.d[candidate][:-lag])
                    score, pval = self.partial_corr(X, Y, Z)
                    if pval < self.alpha and (tmp_res is None or abs(tmp_res[1]) < abs(score)):
                        tmp_res = (candidate, score, pval)

                if tmp_res is not None: 
                    self._add_dependency(target, tmp_res[0], tmp_res[1], tmp_res[2], lag)
                    candidates.remove(tmp_res[0])
                else:
                    break
    return self.result

get_residual(covar, target)

Calculate residual of the target variable obtaining conditioning on the covar variables.

Parameters:

Name Type Description Default
covar np.array

conditioning variables.

required
target np.array

target variable.

required

Returns:

Type Description
np.array

residual.

Source code in causalflow/selection_methods/ParCorr.py
21
22
23
24
25
26
27
28
29
30
31
32
33
def get_residual(self, covar, target):
    """
    Calculate residual of the target variable obtaining conditioning on the covar variables.

    Args:
        covar (np.array): conditioning variables.
        target (np.array): target variable.

    Returns:
        (np.array): residual.
    """
    beta = np.linalg.lstsq(covar, target, rcond=None)[0]
    return target - np.dot(covar, beta)

partial_corr(X, Y, Z)

Calculate Partial correlation between X and Y conditioning on Z.

Parameters:

Name Type Description Default
X np.array

source candidate variable.

required
Y np.array

target variable.

required
Z np.array

conditioning variable.

required

Returns:

Type Description
(float, float)

partial correlation, p-value.

Source code in causalflow/selection_methods/ParCorr.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def partial_corr(self, X, Y, Z):
    """
    Calculate Partial correlation between X and Y conditioning on Z.

    Args:
        X (np.array): source candidate variable.
        Y (np.array): target variable.
        Z (np.array): conditioning variable.

    Returns:
        (float, float): partial correlation, p-value.
    """
    pcorr, pval = stats.pearsonr(self.get_residual(Z, X), self.get_residual(Z, Y))

    return pcorr, pval

This module provides various classes for Mutual Information-based feature selection analysis.

Classes

MIestimator: support class for handling different Mutual Information estimators. MI: Mutual Information class.

MI

Bases: SelectionMethod

Feature selection method based on Mutual Information analysis.

Source code in causalflow/selection_methods/MI.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class MI(SelectionMethod):
    """Feature selection method based on Mutual Information analysis."""

    def __init__(self, estimator: MIestimator):
        """
        Class contructor.

        Args:
            estimator (MIestimator): Gaussian/Kraskov
        """
        super().__init__(CTest.MI)
        self.estimator = estimator

    @property
    def isOpenCLinstalled(self) -> bool:
        """
        Check whether the pyopencl pkg is installed.

        Returns:
            bool: True if pyopencl is installed.
        """
        try:
            importlib.import_module('pyopencl')
            return True
        except ImportError:
            return False

    def _select_estimator(self):
        """Select the MI estimator."""
        CP.info("\n##")
        CP.info("## MI Estimator selection")
        CP.info("##")

        isGaussian = True

        for f in self.data.features:
            # Perform Shapiro-Wilk test
            shapiro_stat, shapiro_p_value = shapiro(self.data.d[f])
            # Perform Kolmogorov-Smirnov test
            ks_stat, ks_p_value = kstest(self.data.d[f], 'norm')

            # Print results
            CP.debug("\n")
            CP.debug(f"Feature '{f}':")
            CP.debug(f"\t- Shapiro-Wilk test: val={round(shapiro_stat, 2)}, p-val={round(shapiro_p_value, 2)}")
            CP.debug(f"\t- Kolmogorov-Smirnov test: val={round(ks_stat, 2)}, p-val={round(ks_p_value, 2)}")

            # Check if p-values are less than significance level (e.g., 0.05) for normality
            if shapiro_p_value < 0.05 or ks_p_value < 0.05:
                CP.debug("\tNot normally distributed")
                isGaussian = False
                # break
            else:
                CP.debug("\tNormally distributed")

        if isGaussian:
            self.estimator = MIestimator.Gaussian
        else:
            self.estimator = MIestimator.OpenCLKraskov if self.isOpenCLinstalled else MIestimator.Kraskov
        CP.info("\n## MI Estimator: " + self.estimator.value)

    def compute_dependencies(self):
        """
        Compute list of dependencies for each target by mutual information analysis.

        Returns:
            (DAG): dependency dag
        """
        if self.estimator is MIestimator.Auto: self._select_estimator()

        with _suppress_stdout():
            data = Data(self.d.values, dim_order='sp') # sp = samples(row) x processes(col)

            network_analysis = MultivariateMI()
            settings = {'cmi_estimator': self.estimator.value,
                        'max_lag_sources': self.max_lag,
                        'min_lag_sources': self.min_lag,
                        'alpha_max_stats': self.alpha,
                        'alpha_min_stats': self.alpha,
                        'alpha_omnibus': self.alpha,
                        'alpha_max_seq': self.alpha,
                        'verbose': False}
            results = network_analysis.analyse_network(settings=settings, data=data)

        for t in results._single_target.keys():
            sel_sources = [s[0] for s in results._single_target[t]['selected_vars_sources']]
            if sel_sources:
                sel_sources_lag = [s[1] for s in results._single_target[t]['selected_vars_sources']]
                sel_sources_score = results._single_target[t]['selected_sources_mi']
                sel_sources_pval = results._single_target[t]['selected_sources_pval']
                for s, score, pval, lag in zip(sel_sources, sel_sources_score, sel_sources_pval, sel_sources_lag):
                    self._add_dependency(self.features[t], self.features[s], score, pval, lag)

        return self.result

isOpenCLinstalled: bool property

Check whether the pyopencl pkg is installed.

Returns:

Name Type Description
bool bool

True if pyopencl is installed.

__init__(estimator)

Class contructor.

Parameters:

Name Type Description Default
estimator MIestimator

Gaussian/Kraskov

required
Source code in causalflow/selection_methods/MI.py
29
30
31
32
33
34
35
36
37
def __init__(self, estimator: MIestimator):
    """
    Class contructor.

    Args:
        estimator (MIestimator): Gaussian/Kraskov
    """
    super().__init__(CTest.MI)
    self.estimator = estimator

compute_dependencies()

Compute list of dependencies for each target by mutual information analysis.

Returns:

Type Description
DAG

dependency dag

Source code in causalflow/selection_methods/MI.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def compute_dependencies(self):
    """
    Compute list of dependencies for each target by mutual information analysis.

    Returns:
        (DAG): dependency dag
    """
    if self.estimator is MIestimator.Auto: self._select_estimator()

    with _suppress_stdout():
        data = Data(self.d.values, dim_order='sp') # sp = samples(row) x processes(col)

        network_analysis = MultivariateMI()
        settings = {'cmi_estimator': self.estimator.value,
                    'max_lag_sources': self.max_lag,
                    'min_lag_sources': self.min_lag,
                    'alpha_max_stats': self.alpha,
                    'alpha_min_stats': self.alpha,
                    'alpha_omnibus': self.alpha,
                    'alpha_max_seq': self.alpha,
                    'verbose': False}
        results = network_analysis.analyse_network(settings=settings, data=data)

    for t in results._single_target.keys():
        sel_sources = [s[0] for s in results._single_target[t]['selected_vars_sources']]
        if sel_sources:
            sel_sources_lag = [s[1] for s in results._single_target[t]['selected_vars_sources']]
            sel_sources_score = results._single_target[t]['selected_sources_mi']
            sel_sources_pval = results._single_target[t]['selected_sources_pval']
            for s, score, pval, lag in zip(sel_sources, sel_sources_score, sel_sources_pval, sel_sources_lag):
                self._add_dependency(self.features[t], self.features[s], score, pval, lag)

    return self.result

MIestimator

Bases: Enum

MIestimator Enumerator.

Source code in causalflow/selection_methods/MI.py
17
18
19
20
21
22
23
class MIestimator(Enum):
    """MIestimator Enumerator."""

    Auto = 'Auto'
    Gaussian = 'JidtGaussianCMI'
    Kraskov = 'JidtKraskovCMI'
    OpenCLKraskov = 'OpenCLKraskovCMI'

This module provides various classes for Transfer Entropy-based feature selection analysis.

Classes

TEestimator: support class for handling different Transfer Entropy estimators. TE: Transfer Entropy class.

TE

Bases: SelectionMethod

Feature selection method based on Trasfer Entropy analysis.

Source code in causalflow/selection_methods/TE.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class TE(SelectionMethod):
    """Feature selection method based on Trasfer Entropy analysis."""

    def __init__(self, estimator: TEestimator):
        """
        Class contructor.

        Args:
            estimator (TEestimator): Gaussian/Kraskov.
        """
        super().__init__(CTest.TE)
        self.estimator = estimator

    @property
    def isOpenCLinstalled(self) -> bool:
        """
        Check whether the pyopencl pkg is installed.

        Returns:
            bool: True if pyopencl is installed.
        """
        try:
            importlib.import_module('pyopencl')
            return True
        except ImportError:
            return False

    def _select_estimator(self):
        """Select the TE estimator."""
        CP.info("\n##")
        CP.info("## TE Estimator selection")
        CP.info("##")

        isGaussian = True

        for f in self.data.features:
            # Perform Shapiro-Wilk test
            shapiro_stat, shapiro_p_value = shapiro(self.data.d[f])
            # Perform Kolmogorov-Smirnov test
            ks_stat, ks_p_value = kstest(self.data.d[f], 'norm')

            # Print results
            CP.debug("\n")
            CP.debug(f"Feature '{f}':")
            CP.debug(f"\t- Shapiro-Wilk test: val={round(shapiro_stat, 2)}, p-val={round(shapiro_p_value, 2)}")
            CP.debug(f"\t- Kolmogorov-Smirnov test: val={round(ks_stat, 2)}, p-val={round(ks_p_value, 2)}")

            # Check if p-values are less than significance level (e.g., 0.05) for normality
            if shapiro_p_value < 0.05 or ks_p_value < 0.05:
                CP.debug("\tNot normally distributed")
                isGaussian = False
                # break
            else:
                CP.debug("\tNormally distributed")

        if isGaussian:
            self.estimator = TEestimator.Gaussian
        else:
            self.estimator = TEestimator.OpenCLKraskov if self.isOpenCLinstalled else TEestimator.Kraskov
        CP.info("\n## TE Estimator: " + self.estimator.value)


    def compute_dependencies(self):
        """
        Compute list of dependencies for each target by transfer entropy analysis.

        Returns:
            (DAG): dependency dag.
        """
        if self.estimator is TEestimator.Auto: self._select_estimator()

        multi_network_analysis = MultivariateTE()
        bi_network_analysis = BivariateMI()
        cross_settings = {'cmi_estimator': self.estimator.value,
                    'max_lag_sources': self.max_lag,
                    'min_lag_sources': self.min_lag,
                    'max_lag_target': self.max_lag,
                    'min_lag_target': self.min_lag,
                    'alpha_max_stats': self.alpha,
                    'alpha_min_stats': self.alpha,
                    'alpha_omnibus': self.alpha,
                    'alpha_max_seq': self.alpha,
                    'verbose': False}
        autodep_settings = copy.deepcopy(cross_settings)
        if self.min_lag == 0:
            autodep_settings['min_lag_sources'] = 1

        CP.info("\n##")
        CP.info("## " + self.name + " analysis")
        CP.info("##")
        for target in self.data.features:
            CP.info("\n## Target variable: " + target)
            with _suppress_stdout():
                t = self.data.features.index(target)

                # Check auto-dependency
                tmp_d = np.c_[self.data.d.values[:, t], self.data.d.values[:, t]]
                data = Data(tmp_d, dim_order='sp') # sp = samples(row) x processes(col)
                res_auto = bi_network_analysis.analyse_single_target(settings = autodep_settings, data = data, target = 0, sources = 1)

                # Check cross-dependencies
                data = Data(self.data.d.values, dim_order='sp') # sp = samples(row) x processes(col)
                res_cross = multi_network_analysis.analyse_single_target(settings = cross_settings, data = data, target = t)

            # Auto-dependency handling
            auto_lag = [s[1] for s in res_auto._single_target[0]['selected_vars_sources']]
            auto_score = res_auto._single_target[0]['selected_sources_mi']
            auto_pval = res_auto._single_target[0]['selected_sources_pval']
            if auto_score is not None:
                for score, pval, lag in zip(auto_score, auto_pval, auto_lag):
                    self._add_dependency(self.data.features[t], self.data.features[t], score, pval, lag)

            # Cross-dependencies handling    
            sel_sources = [s[0] for s in res_cross._single_target[t]['selected_vars_sources']]
            if sel_sources:
                sel_sources_lag = [s[1] for s in res_cross._single_target[t]['selected_vars_sources']]
                sel_sources_score = res_cross._single_target[t]['selected_sources_te']
                sel_sources_pval = res_cross._single_target[t]['selected_sources_pval']
                for s, score, pval, lag in zip(sel_sources, sel_sources_score, sel_sources_pval, sel_sources_lag):
                    self._add_dependency(self.data.features[t], self.data.features[s], score, pval, lag)

            if auto_score is None and not sel_sources:
                CP.info("\tno sources selected")

        return self.result

isOpenCLinstalled: bool property

Check whether the pyopencl pkg is installed.

Returns:

Name Type Description
bool bool

True if pyopencl is installed.

__init__(estimator)

Class contructor.

Parameters:

Name Type Description Default
estimator TEestimator

Gaussian/Kraskov.

required
Source code in causalflow/selection_methods/TE.py
33
34
35
36
37
38
39
40
41
def __init__(self, estimator: TEestimator):
    """
    Class contructor.

    Args:
        estimator (TEestimator): Gaussian/Kraskov.
    """
    super().__init__(CTest.TE)
    self.estimator = estimator

compute_dependencies()

Compute list of dependencies for each target by transfer entropy analysis.

Returns:

Type Description
DAG

dependency dag.

Source code in causalflow/selection_methods/TE.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def compute_dependencies(self):
    """
    Compute list of dependencies for each target by transfer entropy analysis.

    Returns:
        (DAG): dependency dag.
    """
    if self.estimator is TEestimator.Auto: self._select_estimator()

    multi_network_analysis = MultivariateTE()
    bi_network_analysis = BivariateMI()
    cross_settings = {'cmi_estimator': self.estimator.value,
                'max_lag_sources': self.max_lag,
                'min_lag_sources': self.min_lag,
                'max_lag_target': self.max_lag,
                'min_lag_target': self.min_lag,
                'alpha_max_stats': self.alpha,
                'alpha_min_stats': self.alpha,
                'alpha_omnibus': self.alpha,
                'alpha_max_seq': self.alpha,
                'verbose': False}
    autodep_settings = copy.deepcopy(cross_settings)
    if self.min_lag == 0:
        autodep_settings['min_lag_sources'] = 1

    CP.info("\n##")
    CP.info("## " + self.name + " analysis")
    CP.info("##")
    for target in self.data.features:
        CP.info("\n## Target variable: " + target)
        with _suppress_stdout():
            t = self.data.features.index(target)

            # Check auto-dependency
            tmp_d = np.c_[self.data.d.values[:, t], self.data.d.values[:, t]]
            data = Data(tmp_d, dim_order='sp') # sp = samples(row) x processes(col)
            res_auto = bi_network_analysis.analyse_single_target(settings = autodep_settings, data = data, target = 0, sources = 1)

            # Check cross-dependencies
            data = Data(self.data.d.values, dim_order='sp') # sp = samples(row) x processes(col)
            res_cross = multi_network_analysis.analyse_single_target(settings = cross_settings, data = data, target = t)

        # Auto-dependency handling
        auto_lag = [s[1] for s in res_auto._single_target[0]['selected_vars_sources']]
        auto_score = res_auto._single_target[0]['selected_sources_mi']
        auto_pval = res_auto._single_target[0]['selected_sources_pval']
        if auto_score is not None:
            for score, pval, lag in zip(auto_score, auto_pval, auto_lag):
                self._add_dependency(self.data.features[t], self.data.features[t], score, pval, lag)

        # Cross-dependencies handling    
        sel_sources = [s[0] for s in res_cross._single_target[t]['selected_vars_sources']]
        if sel_sources:
            sel_sources_lag = [s[1] for s in res_cross._single_target[t]['selected_vars_sources']]
            sel_sources_score = res_cross._single_target[t]['selected_sources_te']
            sel_sources_pval = res_cross._single_target[t]['selected_sources_pval']
            for s, score, pval, lag in zip(sel_sources, sel_sources_score, sel_sources_pval, sel_sources_lag):
                self._add_dependency(self.data.features[t], self.data.features[s], score, pval, lag)

        if auto_score is None and not sel_sources:
            CP.info("\tno sources selected")

    return self.result

TEestimator

Bases: Enum

TEestimator Enumerator.

Source code in causalflow/selection_methods/TE.py
21
22
23
24
25
26
27
class TEestimator(Enum):
    """TEestimator Enumerator."""

    Auto = 'Auto'
    Gaussian = 'JidtGaussianCMI'
    Kraskov = 'JidtKraskovCMI'
    OpenCLKraskov = 'OpenCLKraskovCMI'