Skip to content

Feature Selection Methods

SelectionMethod

Bases: ABC

SelectionMethod abstract class

Source code in fpcmci/selection_methods/SelectionMethod.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class SelectionMethod(ABC):
    """
    SelectionMethod abstract class
    """
    def __init__(self, ctest):
        self.ctest = ctest
        self.data = None
        self.alpha = None
        self.min_lag = None
        self.max_lag = None
        self.result = None


    @property
    def name(self):
        """
        Returns Selection Method name

        Returns:
            (str): Selection Method name
        """
        return self.ctest.value


    # def initialise(self, data: Data, alpha, min_lag, max_lag):
    def initialise(self, data: Data, alpha, min_lag, max_lag, graph):
        """
        Initialises the selection method

        Args:
            data (Data): Data
            alpha (float): significance threshold
            min_lag (int): min lag time
            max_lag (int): max lag time
        """
        self.data = data
        self.alpha = alpha
        self.min_lag = min_lag
        self.max_lag = max_lag
        self.result = graph


    @abstractmethod
    def compute_dependencies(self) -> DAG:
        """
        abstract method
        """
        pass


    def _prepare_ts(self, target, lag, apply_lag = True, consider_autodep = True):
        """
        prepare the dataframe to the analysis

        Args:
            target (str): name target var
            lag (int): lag time to apply
            apply_lag (bool, optional): True if you want to apply the lag, False otherwise. Defaults to True.

        Returns:
            tuple(DataFrame, DataFrame): source and target dataframe
        """
        if not consider_autodep:
            if apply_lag:
                Y = self.data.d[target][lag:]
                X = self.data.d.loc[:, self.data.d.columns != target][:-lag]
            else:
                Y = self.data.d[target]
                X = self.data.d.loc[:, self.data.d.columns != target]
        else:
            if apply_lag:
                Y = self.data.d[target][lag:]
                X = self.data.d[:-lag]
            else:
                Y = self.data.d[target]
                X = self.data.d
        return X, Y


    def _add_dependecy(self, t, s, score, pval, lag):
        """
        Adds found dependency from source (s) to target (t) specifying the 
        score, pval and the lag

        Args:
            t (str): target feature name
            s (str): source feature name
            score (float): selection method score
            pval (float): pval associated to the dependency
            lag (int): lag time of the dependency
        """
        self.result.add_source(t, s, score, pval, lag)

        str_s = "(" + s + " -" + str(lag) + ")"
        str_t = "(" + t + ")"

        CP.info("\tlink: " + str_s + " -?> " + str_t)
        CP.info("\t|val = " + str(round(score,3)) + " |pval = " + str(str(round(pval,3))))

name property

Returns Selection Method name

Returns:

Type Description
str

Selection Method name

compute_dependencies() abstractmethod

abstract method

Source code in fpcmci/selection_methods/SelectionMethod.py
70
71
72
73
74
75
@abstractmethod
def compute_dependencies(self) -> DAG:
    """
    abstract method
    """
    pass

initialise(data, alpha, min_lag, max_lag, graph)

Initialises the selection method

Parameters:

Name Type Description Default
data Data

Data

required
alpha float

significance threshold

required
min_lag int

min lag time

required
max_lag int

max lag time

required
Source code in fpcmci/selection_methods/SelectionMethod.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def initialise(self, data: Data, alpha, min_lag, max_lag, graph):
    """
    Initialises the selection method

    Args:
        data (Data): Data
        alpha (float): significance threshold
        min_lag (int): min lag time
        max_lag (int): max lag time
    """
    self.data = data
    self.alpha = alpha
    self.min_lag = min_lag
    self.max_lag = max_lag
    self.result = graph

Corr

Bases: SelectionMethod

Feature selection method based on Correlation analysis

Source code in fpcmci/selection_methods/Corr.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class Corr(SelectionMethod):
    """
    Feature selection method based on Correlation analysis
    """
    def __init__(self):
        """
        Corr contructor class
        """
        super().__init__(CTest.Corr)


    def compute_dependencies(self):
        """
        compute list of dependencies for each target by correlation analysis

        Returns:
            (dict): dictonary(TARGET: list SOURCES)
        """
        CP.info("\n##")
        CP.info("## " + self.name + " analysis")
        CP.info("##")

        for lag in range(self.min_lag, self.max_lag + 1):
            for target in self.data.features:
                CP.info("\n## Target variable: " + target)

                X, Y = self._prepare_ts(target, lag)
                scores, pval = f_regression(X, Y)

                # Filter on pvalue
                f = pval < self.alpha

                # Result of the selection
                sel_sources, sel_sources_score, sel_sources_pval = X.columns[f].tolist(), scores[f].tolist(), pval[f].tolist()

                for s, score, pval in zip(sel_sources, sel_sources_score, sel_sources_pval):
                    self._add_dependecy(target, s, score, pval, lag)

        return self.result

__init__()

Corr contructor class

Source code in fpcmci/selection_methods/Corr.py
 9
10
11
12
13
def __init__(self):
    """
    Corr contructor class
    """
    super().__init__(CTest.Corr)

compute_dependencies()

compute list of dependencies for each target by correlation analysis

Returns:

Type Description
dict

dictonary(TARGET: list SOURCES)

Source code in fpcmci/selection_methods/Corr.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def compute_dependencies(self):
    """
    compute list of dependencies for each target by correlation analysis

    Returns:
        (dict): dictonary(TARGET: list SOURCES)
    """
    CP.info("\n##")
    CP.info("## " + self.name + " analysis")
    CP.info("##")

    for lag in range(self.min_lag, self.max_lag + 1):
        for target in self.data.features:
            CP.info("\n## Target variable: " + target)

            X, Y = self._prepare_ts(target, lag)
            scores, pval = f_regression(X, Y)

            # Filter on pvalue
            f = pval < self.alpha

            # Result of the selection
            sel_sources, sel_sources_score, sel_sources_pval = X.columns[f].tolist(), scores[f].tolist(), pval[f].tolist()

            for s, score, pval in zip(sel_sources, sel_sources_score, sel_sources_pval):
                self._add_dependecy(target, s, score, pval, lag)

    return self.result

ParCorr

Bases: SelectionMethod

Feature selection method based on Partial Correlation analysis

Source code in fpcmci/selection_methods/ParCorr.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class ParCorr(SelectionMethod):
    """
    Feature selection method based on Partial Correlation analysis
    """
    def __init__(self):
        """
        ParCorr class contructor
        """
        super().__init__(CTest.Corr)


    def get_residual(self, covar, target):
        """
        Calculate residual of the target variable obtaining conditioning on the covar variables

        Args:
            covar (np.array): conditioning variables
            target (np.array): target variable

        Returns:
            (np.array): residual
        """
        beta = np.linalg.lstsq(covar, target, rcond=None)[0]
        return target - np.dot(covar, beta)


    def partial_corr(self, X, Y, Z):
        """
        Calculate Partial correlation between X and Y conditioning on Z

        Args:
            X (np.array): source candidate variable
            Y (np.array): target variable
            Z (np.array): conditioning variable

        Returns:
            (float, float): partial correlation, p-value
        """

        pcorr, pval = stats.pearsonr(self.get_residual(Z, X), self.get_residual(Z, Y))

        return pcorr, pval

    def compute_dependencies(self):
        """
        compute list of dependencies for each target by partial correlation analysis

        Returns:
            (dict): dictonary(TARGET: list SOURCES)
        """
        CP.info("\n##")
        CP.info("## " + self.name + " analysis")
        CP.info("##")

        for lag in range(self.min_lag, self.max_lag + 1):
            for target in self.data.features:
                CP.info("\n## Target variable: " + target)
                candidates = self.data.features

                Y = np.array(self.data.d[target][lag:])

                while candidates:
                    tmp_res = None
                    covars = self._get_sources(target)
                    Z = np.array(self.data.d[covars][:-lag])

                    for candidate in candidates:
                        X = np.array(self.data.d[candidate][:-lag])
                        score, pval = self.partial_corr(X, Y, Z)
                        if pval < self.alpha and (tmp_res is None or abs(tmp_res[1]) < abs(score)):
                            tmp_res = (candidate, score, pval)

                    if tmp_res is not None: 
                        self._add_dependecy(target, tmp_res[0], tmp_res[1], tmp_res[2], lag)
                        candidates.remove(tmp_res[0])
                    else:
                        break
        return self.result

__init__()

ParCorr class contructor

Source code in fpcmci/selection_methods/ParCorr.py
10
11
12
13
14
def __init__(self):
    """
    ParCorr class contructor
    """
    super().__init__(CTest.Corr)

compute_dependencies()

compute list of dependencies for each target by partial correlation analysis

Returns:

Type Description
dict

dictonary(TARGET: list SOURCES)

Source code in fpcmci/selection_methods/ParCorr.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def compute_dependencies(self):
    """
    compute list of dependencies for each target by partial correlation analysis

    Returns:
        (dict): dictonary(TARGET: list SOURCES)
    """
    CP.info("\n##")
    CP.info("## " + self.name + " analysis")
    CP.info("##")

    for lag in range(self.min_lag, self.max_lag + 1):
        for target in self.data.features:
            CP.info("\n## Target variable: " + target)
            candidates = self.data.features

            Y = np.array(self.data.d[target][lag:])

            while candidates:
                tmp_res = None
                covars = self._get_sources(target)
                Z = np.array(self.data.d[covars][:-lag])

                for candidate in candidates:
                    X = np.array(self.data.d[candidate][:-lag])
                    score, pval = self.partial_corr(X, Y, Z)
                    if pval < self.alpha and (tmp_res is None or abs(tmp_res[1]) < abs(score)):
                        tmp_res = (candidate, score, pval)

                if tmp_res is not None: 
                    self._add_dependecy(target, tmp_res[0], tmp_res[1], tmp_res[2], lag)
                    candidates.remove(tmp_res[0])
                else:
                    break
    return self.result

get_residual(covar, target)

Calculate residual of the target variable obtaining conditioning on the covar variables

Parameters:

Name Type Description Default
covar np.array

conditioning variables

required
target np.array

target variable

required

Returns:

Type Description
np.array

residual

Source code in fpcmci/selection_methods/ParCorr.py
17
18
19
20
21
22
23
24
25
26
27
28
29
def get_residual(self, covar, target):
    """
    Calculate residual of the target variable obtaining conditioning on the covar variables

    Args:
        covar (np.array): conditioning variables
        target (np.array): target variable

    Returns:
        (np.array): residual
    """
    beta = np.linalg.lstsq(covar, target, rcond=None)[0]
    return target - np.dot(covar, beta)

partial_corr(X, Y, Z)

Calculate Partial correlation between X and Y conditioning on Z

Parameters:

Name Type Description Default
X np.array

source candidate variable

required
Y np.array

target variable

required
Z np.array

conditioning variable

required

Returns:

Type Description
(float, float)

partial correlation, p-value

Source code in fpcmci/selection_methods/ParCorr.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def partial_corr(self, X, Y, Z):
    """
    Calculate Partial correlation between X and Y conditioning on Z

    Args:
        X (np.array): source candidate variable
        Y (np.array): target variable
        Z (np.array): conditioning variable

    Returns:
        (float, float): partial correlation, p-value
    """

    pcorr, pval = stats.pearsonr(self.get_residual(Z, X), self.get_residual(Z, Y))

    return pcorr, pval

MI

Bases: SelectionMethod

Feature selection method based on Mutual Information analysis

Source code in fpcmci/selection_methods/MI.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class MI(SelectionMethod):
    """
    Feature selection method based on Mutual Information analysis
    """
    def __init__(self, estimator: MIestimator):
        """
        MI class contructor

        Args:
            estimator (MIestimator): Gaussian/Kraskov
        """
        super().__init__(CTest.MI)
        self.estimator = estimator

    def compute_dependencies(self):
        """
        compute list of dependencies for each target by mutual information analysis

        Returns:
            (dict): dictonary(TARGET: list SOURCES)
        """
        with _suppress_stdout():
            data = Data(self.d.values, dim_order='sp') # sp = samples(row) x processes(col)

            network_analysis = MultivariateMI()
            settings = {'cmi_estimator': self.estimator.value,
                        'max_lag_sources': self.max_lag,
                        'min_lag_sources': self.min_lag,
                        'alpha_max_stats': self.alpha,
                        'alpha_min_stats': self.alpha,
                        'alpha_omnibus': self.alpha,
                        'alpha_max_seq': self.alpha,
                        'verbose': False}
            results = network_analysis.analyse_network(settings=settings, data=data)

        for t in results._single_target.keys():
            sel_sources = [s[0] for s in results._single_target[t]['selected_vars_sources']]
            if sel_sources:
                sel_sources_lag = [s[1] for s in results._single_target[t]['selected_vars_sources']]
                sel_sources_score = results._single_target[t]['selected_sources_mi']
                sel_sources_pval = results._single_target[t]['selected_sources_pval']
                for s, score, pval, lag in zip(sel_sources, sel_sources_score, sel_sources_pval, sel_sources_lag):
                    self._add_dependecy(self.features[t], self.features[s], score, pval, lag)

        return self.result

__init__(estimator)

MI class contructor

Parameters:

Name Type Description Default
estimator MIestimator

Gaussian/Kraskov

required
Source code in fpcmci/selection_methods/MI.py
15
16
17
18
19
20
21
22
23
def __init__(self, estimator: MIestimator):
    """
    MI class contructor

    Args:
        estimator (MIestimator): Gaussian/Kraskov
    """
    super().__init__(CTest.MI)
    self.estimator = estimator

compute_dependencies()

compute list of dependencies for each target by mutual information analysis

Returns:

Type Description
dict

dictonary(TARGET: list SOURCES)

Source code in fpcmci/selection_methods/MI.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def compute_dependencies(self):
    """
    compute list of dependencies for each target by mutual information analysis

    Returns:
        (dict): dictonary(TARGET: list SOURCES)
    """
    with _suppress_stdout():
        data = Data(self.d.values, dim_order='sp') # sp = samples(row) x processes(col)

        network_analysis = MultivariateMI()
        settings = {'cmi_estimator': self.estimator.value,
                    'max_lag_sources': self.max_lag,
                    'min_lag_sources': self.min_lag,
                    'alpha_max_stats': self.alpha,
                    'alpha_min_stats': self.alpha,
                    'alpha_omnibus': self.alpha,
                    'alpha_max_seq': self.alpha,
                    'verbose': False}
        results = network_analysis.analyse_network(settings=settings, data=data)

    for t in results._single_target.keys():
        sel_sources = [s[0] for s in results._single_target[t]['selected_vars_sources']]
        if sel_sources:
            sel_sources_lag = [s[1] for s in results._single_target[t]['selected_vars_sources']]
            sel_sources_score = results._single_target[t]['selected_sources_mi']
            sel_sources_pval = results._single_target[t]['selected_sources_pval']
            for s, score, pval, lag in zip(sel_sources, sel_sources_score, sel_sources_pval, sel_sources_lag):
                self._add_dependecy(self.features[t], self.features[s], score, pval, lag)

    return self.result

TE

Bases: SelectionMethod

Feature selection method based on Trasfer Entropy analysis

Source code in fpcmci/selection_methods/TE.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class TE(SelectionMethod):
    """
    Feature selection method based on Trasfer Entropy analysis
    """
    def __init__(self, estimator: TEestimator):
        """
        TE class contructor

        Args:
            estimator (TEestimator): Gaussian/Kraskov
        """
        super().__init__(CTest.TE)
        self.estimator = estimator


    def compute_dependencies(self):
        """
        compute list of dependencies for each target by transfer entropy analysis

        Returns:
            (dict): dictonary(TARGET: list SOURCES)
        """
        multi_network_analysis = MultivariateTE()
        bi_network_analysis = BivariateMI()
        settings = {'cmi_estimator': self.estimator.value,
                    'max_lag_sources': self.max_lag,
                    'min_lag_sources': self.min_lag,
                    'max_lag_target': self.max_lag,
                    'min_lag_target': self.min_lag,
                    'alpha_max_stats': self.alpha,
                    'alpha_min_stats': self.alpha,
                    'alpha_omnibus': self.alpha,
                    'alpha_max_seq': self.alpha,
                    'verbose': False}

        CP.info("\n##")
        CP.info("## " + self.name + " analysis")
        CP.info("##")
        for target in self.data.features:
            CP.info("\n## Target variable: " + target)
            with _suppress_stdout():
                t = self.data.features.index(target)

                # Check auto-dependency
                tmp_d = np.c_[self.data.d.values[:, t], self.data.d.values[:, t]]
                data = Data(tmp_d, dim_order='sp') # sp = samples(row) x processes(col)
                res_auto = bi_network_analysis.analyse_single_target(settings = settings, data = data, target = 0, sources = 1)

                # Check cross-dependencies
                data = Data(self.data.d.values, dim_order='sp') # sp = samples(row) x processes(col)
                res_cross = multi_network_analysis.analyse_single_target(settings = settings, data = data, target = t)

            # Auto-dependency handling
            auto_lag = [s[1] for s in res_auto._single_target[0]['selected_vars_sources']]
            auto_score = res_auto._single_target[0]['selected_sources_mi']
            auto_pval = res_auto._single_target[0]['selected_sources_pval']
            if auto_score is not None:
                for score, pval, lag in zip(auto_score, auto_pval, auto_lag):
                    self._add_dependecy(self.data.features[t], self.data.features[t], score, pval, lag)

            # Cross-dependencies handling    
            sel_sources = [s[0] for s in res_cross._single_target[t]['selected_vars_sources']]
            if sel_sources:
                sel_sources_lag = [s[1] for s in res_cross._single_target[t]['selected_vars_sources']]
                sel_sources_score = res_cross._single_target[t]['selected_sources_te']
                sel_sources_pval = res_cross._single_target[t]['selected_sources_pval']
                for s, score, pval, lag in zip(sel_sources, sel_sources_score, sel_sources_pval, sel_sources_lag):
                    self._add_dependecy(self.data.features[t], self.data.features[s], score, pval, lag)

            if auto_score is None and not sel_sources:
                CP.info("\tno sources selected")

        return self.result

__init__(estimator)

TE class contructor

Parameters:

Name Type Description Default
estimator TEestimator

Gaussian/Kraskov

required
Source code in fpcmci/selection_methods/TE.py
19
20
21
22
23
24
25
26
27
def __init__(self, estimator: TEestimator):
    """
    TE class contructor

    Args:
        estimator (TEestimator): Gaussian/Kraskov
    """
    super().__init__(CTest.TE)
    self.estimator = estimator

compute_dependencies()

compute list of dependencies for each target by transfer entropy analysis

Returns:

Type Description
dict

dictonary(TARGET: list SOURCES)

Source code in fpcmci/selection_methods/TE.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def compute_dependencies(self):
    """
    compute list of dependencies for each target by transfer entropy analysis

    Returns:
        (dict): dictonary(TARGET: list SOURCES)
    """
    multi_network_analysis = MultivariateTE()
    bi_network_analysis = BivariateMI()
    settings = {'cmi_estimator': self.estimator.value,
                'max_lag_sources': self.max_lag,
                'min_lag_sources': self.min_lag,
                'max_lag_target': self.max_lag,
                'min_lag_target': self.min_lag,
                'alpha_max_stats': self.alpha,
                'alpha_min_stats': self.alpha,
                'alpha_omnibus': self.alpha,
                'alpha_max_seq': self.alpha,
                'verbose': False}

    CP.info("\n##")
    CP.info("## " + self.name + " analysis")
    CP.info("##")
    for target in self.data.features:
        CP.info("\n## Target variable: " + target)
        with _suppress_stdout():
            t = self.data.features.index(target)

            # Check auto-dependency
            tmp_d = np.c_[self.data.d.values[:, t], self.data.d.values[:, t]]
            data = Data(tmp_d, dim_order='sp') # sp = samples(row) x processes(col)
            res_auto = bi_network_analysis.analyse_single_target(settings = settings, data = data, target = 0, sources = 1)

            # Check cross-dependencies
            data = Data(self.data.d.values, dim_order='sp') # sp = samples(row) x processes(col)
            res_cross = multi_network_analysis.analyse_single_target(settings = settings, data = data, target = t)

        # Auto-dependency handling
        auto_lag = [s[1] for s in res_auto._single_target[0]['selected_vars_sources']]
        auto_score = res_auto._single_target[0]['selected_sources_mi']
        auto_pval = res_auto._single_target[0]['selected_sources_pval']
        if auto_score is not None:
            for score, pval, lag in zip(auto_score, auto_pval, auto_lag):
                self._add_dependecy(self.data.features[t], self.data.features[t], score, pval, lag)

        # Cross-dependencies handling    
        sel_sources = [s[0] for s in res_cross._single_target[t]['selected_vars_sources']]
        if sel_sources:
            sel_sources_lag = [s[1] for s in res_cross._single_target[t]['selected_vars_sources']]
            sel_sources_score = res_cross._single_target[t]['selected_sources_te']
            sel_sources_pval = res_cross._single_target[t]['selected_sources_pval']
            for s, score, pval, lag in zip(sel_sources, sel_sources_score, sel_sources_pval, sel_sources_lag):
                self._add_dependecy(self.data.features[t], self.data.features[s], score, pval, lag)

        if auto_score is None and not sel_sources:
            CP.info("\tno sources selected")

    return self.result