Skip to content

Causal Discovery

This module provides the CausalDiscoveryMethod class.

Classes

CausalDiscoveryMethod: abstract class used by all the causal discovery algorithms.

CausalDiscoveryMethod

Bases: ABC

CausalDiscoveryMethod class.

CausalDiscoveryMethod is an abstract causal discovery method for large-scale time series datasets.

Source code in causalflow/causal_discovery/CausalDiscoveryMethod.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class CausalDiscoveryMethod(ABC):
    """
    CausalDiscoveryMethod class.

    CausalDiscoveryMethod is an abstract causal discovery method for 
    large-scale time series datasets.
    """

    def __init__(self, 
                 data: Data, 
                 min_lag, max_lag, 
                 verbosity: CPLevel, 
                 alpha = 0.05, 
                 resfolder = None,
                 neglect_only_autodep = False,
                 clean_cls = True):
        """
        Class contructor.

        Args:
            data (Data): data to analyse.
            min_lag (int): minimum time lag.
            max_lag (int): maximum time lag.
            verbosity (CPLevel): verbosity level.
            alpha (float, optional): significance level. Defaults to 0.05.
            resfolder (string, optional): result folder to create. Defaults to None.
            neglect_only_autodep (bool, optional): Bit for neglecting variables with only autodependency. Defaults to False.
            clean_cls (bool): Clean console bit. Default to True.

        """
        self.data = data
        self.alpha = alpha
        self.min_lag = min_lag
        self.max_lag = max_lag
        self.CM = DAG(self.data.features, min_lag, max_lag, neglect_only_autodep)
        self.neglect_only_autodep = neglect_only_autodep

        self.resfolder = resfolder
        self.respath, self.dag_path, self.ts_dag_path = None, None, None
        if resfolder is not None:
            logpath, self.respath, self.dag_path, self.ts_dag_path = utils.get_selectorpath(resfolder)  
            self.logger = Logger(logpath, clean_cls)
            sys.stdout = self.logger

        CP.set_verbosity(verbosity)


    @abstractmethod
    def run(self) -> DAG:
        """
        Run causal discovery method.

        Returns:
            DAG: causal model.
        """
        pass


    def load(self, res_path):
        """
        Load previously estimated result .

        Args:
            res_path (str): pickle file path.
        """
        with open(res_path, 'rb') as f:
            r = pickle.load(f)
            self.CM = r['causal_model']
            self.alpha = r['alpha']
            self.dag_path = r['dag_path']
            self.ts_dag_path = r['ts_dag_path']


    def save(self):
        """Save causal discovery result as pickle file if resfolder is set."""
        if self.respath is not None:
            if self.CM:
                res = dict()
                res['causal_model'] = copy.deepcopy(self.CM)
                res['alpha'] = self.alpha
                res['dag_path'] = self.dag_path
                res['ts_dag_path'] = self.ts_dag_path
                with open(self.respath, 'wb') as resfile:
                    pickle.dump(res, resfile)
            else:
                CP.warning("Causal model impossible to save")

__init__(data, min_lag, max_lag, verbosity, alpha=0.05, resfolder=None, neglect_only_autodep=False, clean_cls=True)

Class contructor.

Parameters:

Name Type Description Default
data Data

data to analyse.

required
min_lag int

minimum time lag.

required
max_lag int

maximum time lag.

required
verbosity CPLevel

verbosity level.

required
alpha float

significance level. Defaults to 0.05.

0.05
resfolder string

result folder to create. Defaults to None.

None
neglect_only_autodep bool

Bit for neglecting variables with only autodependency. Defaults to False.

False
clean_cls bool

Clean console bit. Default to True.

True
Source code in causalflow/causal_discovery/CausalDiscoveryMethod.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(self, 
             data: Data, 
             min_lag, max_lag, 
             verbosity: CPLevel, 
             alpha = 0.05, 
             resfolder = None,
             neglect_only_autodep = False,
             clean_cls = True):
    """
    Class contructor.

    Args:
        data (Data): data to analyse.
        min_lag (int): minimum time lag.
        max_lag (int): maximum time lag.
        verbosity (CPLevel): verbosity level.
        alpha (float, optional): significance level. Defaults to 0.05.
        resfolder (string, optional): result folder to create. Defaults to None.
        neglect_only_autodep (bool, optional): Bit for neglecting variables with only autodependency. Defaults to False.
        clean_cls (bool): Clean console bit. Default to True.

    """
    self.data = data
    self.alpha = alpha
    self.min_lag = min_lag
    self.max_lag = max_lag
    self.CM = DAG(self.data.features, min_lag, max_lag, neglect_only_autodep)
    self.neglect_only_autodep = neglect_only_autodep

    self.resfolder = resfolder
    self.respath, self.dag_path, self.ts_dag_path = None, None, None
    if resfolder is not None:
        logpath, self.respath, self.dag_path, self.ts_dag_path = utils.get_selectorpath(resfolder)  
        self.logger = Logger(logpath, clean_cls)
        sys.stdout = self.logger

    CP.set_verbosity(verbosity)

load(res_path)

Load previously estimated result .

Parameters:

Name Type Description Default
res_path str

pickle file path.

required
Source code in causalflow/causal_discovery/CausalDiscoveryMethod.py
78
79
80
81
82
83
84
85
86
87
88
89
90
def load(self, res_path):
    """
    Load previously estimated result .

    Args:
        res_path (str): pickle file path.
    """
    with open(res_path, 'rb') as f:
        r = pickle.load(f)
        self.CM = r['causal_model']
        self.alpha = r['alpha']
        self.dag_path = r['dag_path']
        self.ts_dag_path = r['ts_dag_path']

run() abstractmethod

Run causal discovery method.

Returns:

Name Type Description
DAG DAG

causal model.

Source code in causalflow/causal_discovery/CausalDiscoveryMethod.py
67
68
69
70
71
72
73
74
75
@abstractmethod
def run(self) -> DAG:
    """
    Run causal discovery method.

    Returns:
        DAG: causal model.
    """
    pass

save()

Save causal discovery result as pickle file if resfolder is set.

Source code in causalflow/causal_discovery/CausalDiscoveryMethod.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def save(self):
    """Save causal discovery result as pickle file if resfolder is set."""
    if self.respath is not None:
        if self.CM:
            res = dict()
            res['causal_model'] = copy.deepcopy(self.CM)
            res['alpha'] = self.alpha
            res['dag_path'] = self.dag_path
            res['ts_dag_path'] = self.ts_dag_path
            with open(self.respath, 'wb') as resfile:
                pickle.dump(res, resfile)
        else:
            CP.warning("Causal model impossible to save")

This module provides the DYNOTEARS class.

Classes

DYNOTEARS: class containing the DYNOTEARS causal discovery algorithm.

DYNOTEARS

Bases: CausalDiscoveryMethod

DYNOTEARS causal discovery method.

Source code in causalflow/causal_discovery/baseline/DYNOTEARS.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class DYNOTEARS(CausalDiscoveryMethod):
    """DYNOTEARS causal discovery method."""

    def __init__(self, 
                 data, 
                 min_lag,
                 max_lag, 
                 verbosity, 
                 alpha = 0.05, 
                 resfolder = None,
                 neglect_only_autodep = False,
                 clean_cls = True):
        """
        Class constructor.

        Args:
            data (Data): data to analyse.
            min_lag (int): minimum time lag.
            max_lag (int): maximum time lag.
            verbosity (CPLevel): verbosity level.
            alpha (float, optional): PCMCI significance level. Defaults to 0.05.
            resfolder (string, optional): result folder to create. Defaults to None.
            neglect_only_autodep (bool, optional): Bit for neglecting variables with only autodependency. Defaults to False.
            clean_cls (bool): Clean console bit. Default to True.
        """
        super().__init__(data, min_lag, max_lag, verbosity, alpha, resfolder, neglect_only_autodep, clean_cls)

    def run(self) -> DAG:
        """
        Run DYNOTEARS algorithm.

        Returns:
            DAG: causal discovery result.
        """
        graph_dict = dict()
        for name in self.data.features:
            graph_dict[name] = []
        sm = from_pandas_dynamic(self.data.d, p=self.max_lag)

        tname_to_name_dict = dict()
        count_lag = 0
        idx_name = 0
        for tname in sm.nodes:
            tname_to_name_dict[tname] = self.data.features[idx_name]
            if count_lag == self.max_lag:
                idx_name = idx_name +1
                count_lag = -1
            count_lag = count_lag +1

        for ce in sm.edges:
            c = ce[0]
            e = ce[1]
            w = sm.adj[c][e]["weight"]
            tc = int(c.partition("lag")[2])
            te = int(e.partition("lag")[2])
            t = tc - te
            if (tname_to_name_dict[c], -t) not in graph_dict[tname_to_name_dict[e]]:
                graph_dict[tname_to_name_dict[e]].append((tname_to_name_dict[c], w, -t))

        self.CM = self._to_DAG(graph_dict)

        if self.resfolder is not None: self.logger.close()
        return self.CM


    def _to_DAG(self, graph):
        """
        Re-elaborate the result in a DAG.            

        Args:
            graph (dict): graph to convert into a DAG

        Returns:
            (DAG): result re-elaborated.
        """
        tmp_dag = DAG(self.data.features, self.min_lag, self.max_lag, self.neglect_only_autodep)
        tmp_dag.sys_context = dict()
        for t in graph.keys():
            for s in graph[t]:
                lag = abs(s[2])
                if lag >= self.min_lag and lag <= self.max_lag:
                    tmp_dag.add_source(t, s[0], abs(s[1]), 0, s[2])
        return tmp_dag

__init__(data, min_lag, max_lag, verbosity, alpha=0.05, resfolder=None, neglect_only_autodep=False, clean_cls=True)

Class constructor.

Parameters:

Name Type Description Default
data Data

data to analyse.

required
min_lag int

minimum time lag.

required
max_lag int

maximum time lag.

required
verbosity CPLevel

verbosity level.

required
alpha float

PCMCI significance level. Defaults to 0.05.

0.05
resfolder string

result folder to create. Defaults to None.

None
neglect_only_autodep bool

Bit for neglecting variables with only autodependency. Defaults to False.

False
clean_cls bool

Clean console bit. Default to True.

True
Source code in causalflow/causal_discovery/baseline/DYNOTEARS.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(self, 
             data, 
             min_lag,
             max_lag, 
             verbosity, 
             alpha = 0.05, 
             resfolder = None,
             neglect_only_autodep = False,
             clean_cls = True):
    """
    Class constructor.

    Args:
        data (Data): data to analyse.
        min_lag (int): minimum time lag.
        max_lag (int): maximum time lag.
        verbosity (CPLevel): verbosity level.
        alpha (float, optional): PCMCI significance level. Defaults to 0.05.
        resfolder (string, optional): result folder to create. Defaults to None.
        neglect_only_autodep (bool, optional): Bit for neglecting variables with only autodependency. Defaults to False.
        clean_cls (bool): Clean console bit. Default to True.
    """
    super().__init__(data, min_lag, max_lag, verbosity, alpha, resfolder, neglect_only_autodep, clean_cls)

run()

Run DYNOTEARS algorithm.

Returns:

Name Type Description
DAG DAG

causal discovery result.

Source code in causalflow/causal_discovery/baseline/DYNOTEARS.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def run(self) -> DAG:
    """
    Run DYNOTEARS algorithm.

    Returns:
        DAG: causal discovery result.
    """
    graph_dict = dict()
    for name in self.data.features:
        graph_dict[name] = []
    sm = from_pandas_dynamic(self.data.d, p=self.max_lag)

    tname_to_name_dict = dict()
    count_lag = 0
    idx_name = 0
    for tname in sm.nodes:
        tname_to_name_dict[tname] = self.data.features[idx_name]
        if count_lag == self.max_lag:
            idx_name = idx_name +1
            count_lag = -1
        count_lag = count_lag +1

    for ce in sm.edges:
        c = ce[0]
        e = ce[1]
        w = sm.adj[c][e]["weight"]
        tc = int(c.partition("lag")[2])
        te = int(e.partition("lag")[2])
        t = tc - te
        if (tname_to_name_dict[c], -t) not in graph_dict[tname_to_name_dict[e]]:
            graph_dict[tname_to_name_dict[e]].append((tname_to_name_dict[c], w, -t))

    self.CM = self._to_DAG(graph_dict)

    if self.resfolder is not None: self.logger.close()
    return self.CM

This module provides the LPCMCI class.

Classes

LPCMCI: class containing the LPCMCI causal discovery algorithm.

LPCMCI

Bases: CausalDiscoveryMethod

LPCMCI causal discovery method.

Source code in causalflow/causal_discovery/baseline/LPCMCI.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class LPCMCI(CausalDiscoveryMethod):
    """LPCMCI causal discovery method."""

    def __init__(self, 
                 data: Data,
                 min_lag, max_lag, 
                 val_condtest: CondIndTest, 
                 verbosity: CPLevel,
                 alpha = 0.05, 
                 resfolder = None, 
                 neglect_only_autodep = False,
                 clean_cls = True):
        """
        Class constructor.

        Args:
            data (Data): data to analyse.
            min_lag (int): minimum time lag.
            max_lag (int): maximum time lag.
            val_condtest (CondIndTest): validation method.
            verbosity (CPLevel): verbosity level.
            alpha (float, optional): PCMCI significance level. Defaults to 0.05.
            resfolder (string, optional): result folder to create. Defaults to None.
            neglect_only_autodep (bool, optional): Bit for neglecting variables with only autodependency. Defaults to False.
            clean_cls (bool): Clean console bit. Default to True.
        """
        super().__init__(data, min_lag, max_lag, verbosity, alpha, resfolder, neglect_only_autodep, clean_cls)

        # build tigramite dataset
        vector = np.vectorize(float)
        d = vector(data.d)

        # init pcmci
        self.lpcmci = lpcmci(dataframe = pp.DataFrame(data = d, var_names = data.features),
                             cond_ind_test = val_condtest,
                             verbosity = verbosity.value)


    def run(self, link_assumptions = None) -> DAG:
        """
        Run causal discovery algorithm.

        Args:
            link_assumptions (dict, optional): prior knowledge on causal model links. Defaults to None.

        Returns:
            (DAG): estimated causal model.
        """
        CP.info('\n')
        CP.info(DASH)
        CP.info("Running Causal Discovery Algorithm")
        self.result = self.lpcmci.run_lpcmci(link_assumptions = link_assumptions,
                                             tau_max = self.max_lag,
                                             tau_min = self.min_lag,
                                             pc_alpha = self.alpha)

        self.CM = self._to_DAG()

        if self.resfolder is not None: self.logger.close()
        return self.CM


    def _to_DAG(self):
        """
        Re-elaborate the PCMCI result in a new dictionary.

        Returns:
            (DAG): lpcmci result re-elaborated.
        """
        vars = self.data.features
        tmp_dag = DAG(vars, self.min_lag, self.max_lag)
        tmp_dag.sys_context = dict()
        N, lags = self.result['graph'][0].shape
        for s in range(len(self.result['graph'])):
            for t in range(N):
                for lag in range(lags):
                    if self.result['graph'][s][t,lag] != '':
                        arrowtype = self.result['graph'][s][t,lag]

                        if arrowtype == LinkType.Bidirected.value:
                            if ((vars[s], abs(lag)) in tmp_dag.g[vars[t]].sources and 
                                tmp_dag.g[t].sources[(vars[s], abs(lag))][TYPE] == LinkType.Bidirected.value):
                                continue
                            else:
                                tmp_dag.add_source(vars[t], 
                                                vars[s],
                                                self.result['val_matrix'][s][t,lag],
                                                self.result['p_matrix'][s][t,lag],
                                                lag,
                                                arrowtype)


                        elif arrowtype == LinkType.Uncertain.value:
                            if ((vars[t], abs(lag)) in tmp_dag.g[vars[s]].sources and 
                                tmp_dag.g[vars[s]].sources[(vars[t], abs(lag))][TYPE] == LinkType.Uncertain.value):
                                continue
                            else:
                                tmp_dag.add_source(vars[t], 
                                                vars[s],
                                                self.result['val_matrix'][s][t,lag],
                                                self.result['p_matrix'][s][t,lag],
                                                lag,
                                                arrowtype)


                        elif (arrowtype == LinkType.Directed.value or
                              arrowtype == LinkType.HalfUncertain.value):
                            tmp_dag.add_source(vars[t], 
                                            vars[s],
                                            self.result['val_matrix'][s][t,lag],
                                            self.result['p_matrix'][s][t,lag],
                                            lag,
                                            arrowtype)
        return tmp_dag

__init__(data, min_lag, max_lag, val_condtest, verbosity, alpha=0.05, resfolder=None, neglect_only_autodep=False, clean_cls=True)

Class constructor.

Parameters:

Name Type Description Default
data Data

data to analyse.

required
min_lag int

minimum time lag.

required
max_lag int

maximum time lag.

required
val_condtest CondIndTest

validation method.

required
verbosity CPLevel

verbosity level.

required
alpha float

PCMCI significance level. Defaults to 0.05.

0.05
resfolder string

result folder to create. Defaults to None.

None
neglect_only_autodep bool

Bit for neglecting variables with only autodependency. Defaults to False.

False
clean_cls bool

Clean console bit. Default to True.

True
Source code in causalflow/causal_discovery/baseline/LPCMCI.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(self, 
             data: Data,
             min_lag, max_lag, 
             val_condtest: CondIndTest, 
             verbosity: CPLevel,
             alpha = 0.05, 
             resfolder = None, 
             neglect_only_autodep = False,
             clean_cls = True):
    """
    Class constructor.

    Args:
        data (Data): data to analyse.
        min_lag (int): minimum time lag.
        max_lag (int): maximum time lag.
        val_condtest (CondIndTest): validation method.
        verbosity (CPLevel): verbosity level.
        alpha (float, optional): PCMCI significance level. Defaults to 0.05.
        resfolder (string, optional): result folder to create. Defaults to None.
        neglect_only_autodep (bool, optional): Bit for neglecting variables with only autodependency. Defaults to False.
        clean_cls (bool): Clean console bit. Default to True.
    """
    super().__init__(data, min_lag, max_lag, verbosity, alpha, resfolder, neglect_only_autodep, clean_cls)

    # build tigramite dataset
    vector = np.vectorize(float)
    d = vector(data.d)

    # init pcmci
    self.lpcmci = lpcmci(dataframe = pp.DataFrame(data = d, var_names = data.features),
                         cond_ind_test = val_condtest,
                         verbosity = verbosity.value)

run(link_assumptions=None)

Run causal discovery algorithm.

Parameters:

Name Type Description Default
link_assumptions dict

prior knowledge on causal model links. Defaults to None.

None

Returns:

Type Description
DAG

estimated causal model.

Source code in causalflow/causal_discovery/baseline/LPCMCI.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def run(self, link_assumptions = None) -> DAG:
    """
    Run causal discovery algorithm.

    Args:
        link_assumptions (dict, optional): prior knowledge on causal model links. Defaults to None.

    Returns:
        (DAG): estimated causal model.
    """
    CP.info('\n')
    CP.info(DASH)
    CP.info("Running Causal Discovery Algorithm")
    self.result = self.lpcmci.run_lpcmci(link_assumptions = link_assumptions,
                                         tau_max = self.max_lag,
                                         tau_min = self.min_lag,
                                         pc_alpha = self.alpha)

    self.CM = self._to_DAG()

    if self.resfolder is not None: self.logger.close()
    return self.CM

This module provides the PCMCI class.

Classes

PCMCI: class containing the PCMCI causal discovery algorithm.

PCMCI

Bases: CausalDiscoveryMethod

PCMCI causal discovery method.

Source code in causalflow/causal_discovery/baseline/PCMCI.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class PCMCI(CausalDiscoveryMethod):
    """PCMCI causal discovery method."""

    def __init__(self, 
                 data: Data, 
                 min_lag, max_lag, 
                 val_condtest: CondIndTest, 
                 verbosity: CPLevel,
                 pc_alpha = 0.05, 
                 alpha = 0.05, 
                 resfolder = None, 
                 neglect_only_autodep = False,
                 clean_cls = True):
        """
        Class constructor.

        Args:
            data (Data): data to analyse.
            min_lag (int): minimum time lag.
            max_lag (int): maximum time lag.
            val_condtest (CondIndTest): validation method.
            verbosity (CPLevel): verbosity level.
            pc_alpha (float, optional): PC significance level. Defaults to 0.05.
            alpha (float, optional): PCMCI significance level. Defaults to 0.05.
            resfolder (string, optional): result folder to create. Defaults to None.
            neglect_only_autodep (bool, optional): Bit for neglecting variables with only autodependency. Defaults to False.
            clean_cls (bool): Clean console bit. Default to True.
        """
        super().__init__(data, min_lag, max_lag, verbosity, alpha, resfolder, neglect_only_autodep, clean_cls)
        self.pc_alpha = pc_alpha

        # build tigramite dataset
        vector = np.vectorize(float)
        d = vector(data.d)

        # init pcmci
        self.pcmci = pcmci(dataframe = pp.DataFrame(data = d, var_names = data.features),
                           cond_ind_test = val_condtest,
                           verbosity = verbosity.value)


    def run(self, link_assumptions = None) -> DAG:
        """
        Run causal discovery algorithm.

        Args:
            link_assumptions (dict, optional): prior knowledge on causal model links. Defaults to None.

        Returns:
            (DAG): estimated causal model.
        """
        CP.info('\n')
        CP.info(DASH)
        CP.info("Running Causal Discovery Algorithm")

        self.result = self.pcmci.run_pcmci(link_assumptions=link_assumptions,
                                           tau_max = self.max_lag,
                                           tau_min = self.min_lag,
                                           alpha_level = self.alpha,
                                           pc_alpha = self.pc_alpha)

        self.CM = self._to_DAG()

        if self.resfolder is not None: self.logger.close()        
        return self.CM


    def _to_DAG(self):
        """
        Re-elaborates the PCMCI result in a new dictionary.

        Returns:
            (DAG): pcmci result re-elaborated.
        """
        vars = self.data.features
        tmp_dag = DAG(vars, self.min_lag, self.max_lag)
        tmp_dag.sys_context = dict()
        N, lags = self.result['graph'][0].shape
        for s in range(len(self.result['graph'])):
            for t in range(N):
                for lag in range(lags):
                    if self.result['graph'][s][t,lag] != '':
                        arrowtype = self.result['graph'][s][t,lag]

                        if arrowtype == LinkType.Bidirected.value:
                            if ((vars[s], abs(lag)) in tmp_dag.g[vars[t]].sources and 
                                tmp_dag.g[t].sources[(vars[s], abs(lag))][TYPE] == LinkType.Bidirected.value):
                                continue
                            else:
                                tmp_dag.add_source(vars[t], 
                                                vars[s],
                                                self.result['val_matrix'][s][t,lag],
                                                self.result['p_matrix'][s][t,lag],
                                                lag,
                                                arrowtype)


                        elif arrowtype == LinkType.Uncertain.value:
                            if ((vars[t], abs(lag)) in tmp_dag.g[vars[s]].sources and 
                                tmp_dag.g[vars[s]].sources[(vars[t], abs(lag))][TYPE] == LinkType.Uncertain.value):
                                continue
                            else:
                                tmp_dag.add_source(vars[t], 
                                                vars[s],
                                                self.result['val_matrix'][s][t,lag],
                                                self.result['p_matrix'][s][t,lag],
                                                lag,
                                                arrowtype)


                        elif (arrowtype == LinkType.Directed.value or
                              arrowtype == LinkType.HalfUncertain.value):
                            tmp_dag.add_source(vars[t], 
                                            vars[s],
                                            self.result['val_matrix'][s][t,lag],
                                            self.result['p_matrix'][s][t,lag],
                                            lag,
                                            arrowtype)
        return tmp_dag

__init__(data, min_lag, max_lag, val_condtest, verbosity, pc_alpha=0.05, alpha=0.05, resfolder=None, neglect_only_autodep=False, clean_cls=True)

Class constructor.

Parameters:

Name Type Description Default
data Data

data to analyse.

required
min_lag int

minimum time lag.

required
max_lag int

maximum time lag.

required
val_condtest CondIndTest

validation method.

required
verbosity CPLevel

verbosity level.

required
pc_alpha float

PC significance level. Defaults to 0.05.

0.05
alpha float

PCMCI significance level. Defaults to 0.05.

0.05
resfolder string

result folder to create. Defaults to None.

None
neglect_only_autodep bool

Bit for neglecting variables with only autodependency. Defaults to False.

False
clean_cls bool

Clean console bit. Default to True.

True
Source code in causalflow/causal_discovery/baseline/PCMCI.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def __init__(self, 
             data: Data, 
             min_lag, max_lag, 
             val_condtest: CondIndTest, 
             verbosity: CPLevel,
             pc_alpha = 0.05, 
             alpha = 0.05, 
             resfolder = None, 
             neglect_only_autodep = False,
             clean_cls = True):
    """
    Class constructor.

    Args:
        data (Data): data to analyse.
        min_lag (int): minimum time lag.
        max_lag (int): maximum time lag.
        val_condtest (CondIndTest): validation method.
        verbosity (CPLevel): verbosity level.
        pc_alpha (float, optional): PC significance level. Defaults to 0.05.
        alpha (float, optional): PCMCI significance level. Defaults to 0.05.
        resfolder (string, optional): result folder to create. Defaults to None.
        neglect_only_autodep (bool, optional): Bit for neglecting variables with only autodependency. Defaults to False.
        clean_cls (bool): Clean console bit. Default to True.
    """
    super().__init__(data, min_lag, max_lag, verbosity, alpha, resfolder, neglect_only_autodep, clean_cls)
    self.pc_alpha = pc_alpha

    # build tigramite dataset
    vector = np.vectorize(float)
    d = vector(data.d)

    # init pcmci
    self.pcmci = pcmci(dataframe = pp.DataFrame(data = d, var_names = data.features),
                       cond_ind_test = val_condtest,
                       verbosity = verbosity.value)

run(link_assumptions=None)

Run causal discovery algorithm.

Parameters:

Name Type Description Default
link_assumptions dict

prior knowledge on causal model links. Defaults to None.

None

Returns:

Type Description
DAG

estimated causal model.

Source code in causalflow/causal_discovery/baseline/PCMCI.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def run(self, link_assumptions = None) -> DAG:
    """
    Run causal discovery algorithm.

    Args:
        link_assumptions (dict, optional): prior knowledge on causal model links. Defaults to None.

    Returns:
        (DAG): estimated causal model.
    """
    CP.info('\n')
    CP.info(DASH)
    CP.info("Running Causal Discovery Algorithm")

    self.result = self.pcmci.run_pcmci(link_assumptions=link_assumptions,
                                       tau_max = self.max_lag,
                                       tau_min = self.min_lag,
                                       alpha_level = self.alpha,
                                       pc_alpha = self.pc_alpha)

    self.CM = self._to_DAG()

    if self.resfolder is not None: self.logger.close()        
    return self.CM

This module provides the PCMCI+ class.

Classes

PCMCIplus: class containing the PCMCI+ causal discovery algorithm.

PCMCIplus

Bases: CausalDiscoveryMethod

PCMCI+ causal discovery method.

Source code in causalflow/causal_discovery/baseline/PCMCIplus.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class PCMCIplus(CausalDiscoveryMethod):
    """PCMCI+ causal discovery method."""

    def __init__(self, 
                 data: Data, 
                 min_lag, max_lag, 
                 val_condtest: CondIndTest, 
                 verbosity: CPLevel,
                 alpha = 0.05, 
                 resfolder = None, 
                 neglect_only_autodep = False,
                 clean_cls = True):
        """
        Class constructor.

        Args:
            data (Data): data to analyse.
            min_lag (int): minimum time lag.
            max_lag (int): maximum time lag.
            val_condtest (CondIndTest): validation method.
            verbosity (CPLevel): verbosity level.
            alpha (float, optional): significance level. Defaults to 0.05.
            resfolder (string, optional): result folder to create. Defaults to None.
            neglect_only_autodep (bool, optional): Bit for neglecting variables with only autodependency. Defaults to False.
            clean_cls (bool): Clean console bit. Default to True.
        """
        super().__init__(data, min_lag, max_lag, verbosity, alpha, resfolder, neglect_only_autodep, clean_cls)

        # build tigramite dataset
        vector = np.vectorize(float)
        d = vector(data.d)

        # init pcmci
        self.pcmci = pcmci(dataframe = pp.DataFrame(data = d, var_names = data.features),
                           cond_ind_test = val_condtest,
                           verbosity = verbosity.value)


    def run(self, link_assumptions=None) -> DAG:
        """
        Run causal discovery algorithm.

        Args:
            link_assumptions (dict, optional): prior knowledge on causal model links. Defaults to None.

        Returns:
            (DAG): estimated causal model.
        """
        CP.info('\n')
        CP.info(DASH)
        CP.info("Running Causal Discovery Algorithm")

        self.result = self.pcmci.run_pcmciplus(link_assumptions=link_assumptions,
                                               tau_max = self.max_lag,
                                               tau_min = 0,
                                               pc_alpha = self.alpha)

        self.CM = self._to_DAG()

        if self.resfolder is not None: self.logger.close()
        return self.CM


    def _to_DAG(self):
        """
        Re-elaborates the PCMCI result in a new dictionary.

        Returns:
            (DAG): pcmci result re-elaborated.
        """
        vars = self.data.features
        tmp_dag = DAG(vars, self.min_lag, self.max_lag)
        tmp_dag.sys_context = dict()
        N, lags = self.result['graph'][0].shape
        for s in range(len(self.result['graph'])):
            for t in range(N):
                for lag in range(lags):
                    if self.result['graph'][s][t,lag] != '':
                        arrowtype = self.result['graph'][s][t,lag]

                        if arrowtype == LinkType.Bidirected.value:
                            if ((vars[s], abs(lag)) in tmp_dag.g[vars[t]].sources and 
                                tmp_dag.g[t].sources[(vars[s], abs(lag))][TYPE] == LinkType.Bidirected.value):
                                continue
                            else:
                                tmp_dag.add_source(vars[t], 
                                                vars[s],
                                                self.result['val_matrix'][s][t,lag],
                                                self.result['p_matrix'][s][t,lag],
                                                lag,
                                                arrowtype)


                        elif arrowtype == LinkType.Uncertain.value:
                            if ((vars[t], abs(lag)) in tmp_dag.g[vars[s]].sources and 
                                tmp_dag.g[vars[s]].sources[(vars[t], abs(lag))][TYPE] == LinkType.Uncertain.value):
                                continue
                            else:
                                tmp_dag.add_source(vars[t], 
                                                vars[s],
                                                self.result['val_matrix'][s][t,lag],
                                                self.result['p_matrix'][s][t,lag],
                                                lag,
                                                arrowtype)


                        elif (arrowtype == LinkType.Directed.value or
                              arrowtype == LinkType.HalfUncertain.value):
                            tmp_dag.add_source(vars[t], 
                                            vars[s],
                                            self.result['val_matrix'][s][t,lag],
                                            self.result['p_matrix'][s][t,lag],
                                            lag,
                                            arrowtype)
        return tmp_dag

__init__(data, min_lag, max_lag, val_condtest, verbosity, alpha=0.05, resfolder=None, neglect_only_autodep=False, clean_cls=True)

Class constructor.

Parameters:

Name Type Description Default
data Data

data to analyse.

required
min_lag int

minimum time lag.

required
max_lag int

maximum time lag.

required
val_condtest CondIndTest

validation method.

required
verbosity CPLevel

verbosity level.

required
alpha float

significance level. Defaults to 0.05.

0.05
resfolder string

result folder to create. Defaults to None.

None
neglect_only_autodep bool

Bit for neglecting variables with only autodependency. Defaults to False.

False
clean_cls bool

Clean console bit. Default to True.

True
Source code in causalflow/causal_discovery/baseline/PCMCIplus.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(self, 
             data: Data, 
             min_lag, max_lag, 
             val_condtest: CondIndTest, 
             verbosity: CPLevel,
             alpha = 0.05, 
             resfolder = None, 
             neglect_only_autodep = False,
             clean_cls = True):
    """
    Class constructor.

    Args:
        data (Data): data to analyse.
        min_lag (int): minimum time lag.
        max_lag (int): maximum time lag.
        val_condtest (CondIndTest): validation method.
        verbosity (CPLevel): verbosity level.
        alpha (float, optional): significance level. Defaults to 0.05.
        resfolder (string, optional): result folder to create. Defaults to None.
        neglect_only_autodep (bool, optional): Bit for neglecting variables with only autodependency. Defaults to False.
        clean_cls (bool): Clean console bit. Default to True.
    """
    super().__init__(data, min_lag, max_lag, verbosity, alpha, resfolder, neglect_only_autodep, clean_cls)

    # build tigramite dataset
    vector = np.vectorize(float)
    d = vector(data.d)

    # init pcmci
    self.pcmci = pcmci(dataframe = pp.DataFrame(data = d, var_names = data.features),
                       cond_ind_test = val_condtest,
                       verbosity = verbosity.value)

run(link_assumptions=None)

Run causal discovery algorithm.

Parameters:

Name Type Description Default
link_assumptions dict

prior knowledge on causal model links. Defaults to None.

None

Returns:

Type Description
DAG

estimated causal model.

Source code in causalflow/causal_discovery/baseline/PCMCIplus.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def run(self, link_assumptions=None) -> DAG:
    """
    Run causal discovery algorithm.

    Args:
        link_assumptions (dict, optional): prior knowledge on causal model links. Defaults to None.

    Returns:
        (DAG): estimated causal model.
    """
    CP.info('\n')
    CP.info(DASH)
    CP.info("Running Causal Discovery Algorithm")

    self.result = self.pcmci.run_pcmciplus(link_assumptions=link_assumptions,
                                           tau_max = self.max_lag,
                                           tau_min = 0,
                                           pc_alpha = self.alpha)

    self.CM = self._to_DAG()

    if self.resfolder is not None: self.logger.close()
    return self.CM

This module provides the TCDF class.

Classes

TCDF: class containing the TCDF causal discovery algorithm.

TCDF

Bases: CausalDiscoveryMethod

TCDF causal discovery method.

Source code in causalflow/causal_discovery/baseline/TCDF.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class TCDF(CausalDiscoveryMethod):
    """TCDF causal discovery method."""

    def __init__(self, 
                 data, 
                 min_lag,
                 max_lag, 
                 verbosity, 
                 resfolder = None,
                 neglect_only_autodep = False,
                 clean_cls = True):
        """
        Class constructor.

        Args:
            data (Data): data to analyse.
            min_lag (int): minimum time lag.
            max_lag (int): maximum time lag.
            verbosity (CPLevel): verbosity level.
            resfolder (string, optional): result folder to create. Defaults to None.
            neglect_only_autodep (bool, optional): Bit for neglecting variables with only autodependency. Defaults to False.
            clean_cls (bool): Clean console bit. Default to True.
        """
        super().__init__(data, min_lag, max_lag, verbosity, resfolder=resfolder, neglect_only_autodep=neglect_only_autodep, clean_cls=clean_cls)


    def run(self, 
            epochs=1000,  
            kernel_size=4, 
            dilation_coefficient=4, 
            hidden_layers=0, 
            learning_rate=0.01,
            cuda=False) -> DAG:
        """
        Run causal discovery algorithm.

        Returns:
            (DAG): estimated causal model.
        """
        # Remove all arguments from directory
        dir_path = os.path.dirname(os.path.realpath(__file__))
        Path(dir_path+"/args").mkdir(exist_ok=True)
        Path(dir_path+"/results").mkdir(exist_ok=True)
        script = dir_path + "/pkgs/TCDF-master/runTCDF" + ".py"
        r_arg_list = []
        r_arg_list.append("--epochs")
        r_arg_list.append(str(epochs))
        r_arg_list.append("--kernel_size")
        r_arg_list.append(str(kernel_size))
        r_arg_list.append("--dilation_coefficient")
        r_arg_list.append(str(dilation_coefficient))
        r_arg_list.append("--hidden_layers")
        r_arg_list.append(str(hidden_layers))
        r_arg_list.append("--learning_rate")
        r_arg_list.append(str(learning_rate))
        r_arg_list.append("--significance")
        r_arg_list.append(str(0.8))
        self.data.d.to_csv(dir_path + "/args/data.csv", index=False)
        r_arg_list.append("--data")
        r_arg_list.append(dir_path + "/args/data.csv")            
        if cuda: r_arg_list.append("--cuda")
        r_arg_list.append("--path")
        r_arg_list.append(dir_path)

        cmd = ["python", script] + r_arg_list
        p = Popen(cmd, cwd="./", stdin=PIPE, stdout=PIPE, stderr=PIPE)

        # Return R output or error
        output, error = p.communicate()
        CP.info(output.decode('utf-8'))
        if p.returncode == 0:
            g_dict = json.load(open(dir_path + "/results/tcdf_result.txt"))
            for key in g_dict.keys():
                key_list = []
                for elem in g_dict[key]:
                    key_list.append(tuple(elem))
                g_dict[key] = key_list
            utils.clean(dir_path)
            self.CM = self._to_DAG(g_dict)

            if self.resfolder is not None: self.logger.close()
            return self.CM
        else:
            utils.clean(dir_path)
            CP.warning('Python Error:\n {0}'.format(error))
            exit(0)


    def _to_DAG(self, graph):
        """
        Re-elaborate the result in a DAG.

        Args:
            graph (dict): graph to convert into a DAG

        Returns:
            (DAG): result re-elaborated.
        """
        tmp_dag = DAG(self.data.features, self.min_lag, self.max_lag, self.neglect_only_autodep)
        tmp_dag.sys_context = dict()
        for t in graph.keys():
            for s in graph[t]:
                lag = abs(s[1])
                if lag >= self.min_lag and lag <= self.max_lag:
                    tmp_dag.add_source(t, s[0], utils.DSCORE, 0, s[1])
        return tmp_dag

__init__(data, min_lag, max_lag, verbosity, resfolder=None, neglect_only_autodep=False, clean_cls=True)

Class constructor.

Parameters:

Name Type Description Default
data Data

data to analyse.

required
min_lag int

minimum time lag.

required
max_lag int

maximum time lag.

required
verbosity CPLevel

verbosity level.

required
resfolder string

result folder to create. Defaults to None.

None
neglect_only_autodep bool

Bit for neglecting variables with only autodependency. Defaults to False.

False
clean_cls bool

Clean console bit. Default to True.

True
Source code in causalflow/causal_discovery/baseline/TCDF.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(self, 
             data, 
             min_lag,
             max_lag, 
             verbosity, 
             resfolder = None,
             neglect_only_autodep = False,
             clean_cls = True):
    """
    Class constructor.

    Args:
        data (Data): data to analyse.
        min_lag (int): minimum time lag.
        max_lag (int): maximum time lag.
        verbosity (CPLevel): verbosity level.
        resfolder (string, optional): result folder to create. Defaults to None.
        neglect_only_autodep (bool, optional): Bit for neglecting variables with only autodependency. Defaults to False.
        clean_cls (bool): Clean console bit. Default to True.
    """
    super().__init__(data, min_lag, max_lag, verbosity, resfolder=resfolder, neglect_only_autodep=neglect_only_autodep, clean_cls=clean_cls)

run(epochs=1000, kernel_size=4, dilation_coefficient=4, hidden_layers=0, learning_rate=0.01, cuda=False)

Run causal discovery algorithm.

Returns:

Type Description
DAG

estimated causal model.

Source code in causalflow/causal_discovery/baseline/TCDF.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def run(self, 
        epochs=1000,  
        kernel_size=4, 
        dilation_coefficient=4, 
        hidden_layers=0, 
        learning_rate=0.01,
        cuda=False) -> DAG:
    """
    Run causal discovery algorithm.

    Returns:
        (DAG): estimated causal model.
    """
    # Remove all arguments from directory
    dir_path = os.path.dirname(os.path.realpath(__file__))
    Path(dir_path+"/args").mkdir(exist_ok=True)
    Path(dir_path+"/results").mkdir(exist_ok=True)
    script = dir_path + "/pkgs/TCDF-master/runTCDF" + ".py"
    r_arg_list = []
    r_arg_list.append("--epochs")
    r_arg_list.append(str(epochs))
    r_arg_list.append("--kernel_size")
    r_arg_list.append(str(kernel_size))
    r_arg_list.append("--dilation_coefficient")
    r_arg_list.append(str(dilation_coefficient))
    r_arg_list.append("--hidden_layers")
    r_arg_list.append(str(hidden_layers))
    r_arg_list.append("--learning_rate")
    r_arg_list.append(str(learning_rate))
    r_arg_list.append("--significance")
    r_arg_list.append(str(0.8))
    self.data.d.to_csv(dir_path + "/args/data.csv", index=False)
    r_arg_list.append("--data")
    r_arg_list.append(dir_path + "/args/data.csv")            
    if cuda: r_arg_list.append("--cuda")
    r_arg_list.append("--path")
    r_arg_list.append(dir_path)

    cmd = ["python", script] + r_arg_list
    p = Popen(cmd, cwd="./", stdin=PIPE, stdout=PIPE, stderr=PIPE)

    # Return R output or error
    output, error = p.communicate()
    CP.info(output.decode('utf-8'))
    if p.returncode == 0:
        g_dict = json.load(open(dir_path + "/results/tcdf_result.txt"))
        for key in g_dict.keys():
            key_list = []
            for elem in g_dict[key]:
                key_list.append(tuple(elem))
            g_dict[key] = key_list
        utils.clean(dir_path)
        self.CM = self._to_DAG(g_dict)

        if self.resfolder is not None: self.logger.close()
        return self.CM
    else:
        utils.clean(dir_path)
        CP.warning('Python Error:\n {0}'.format(error))
        exit(0)

This module provides the tsFCI class.

Classes

tsFCI: class containing the tsFCI causal discovery algorithm.

tsFCI

Bases: CausalDiscoveryMethod

tsFCI causal discovery method.

Source code in causalflow/causal_discovery/baseline/tsFCI.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class tsFCI(CausalDiscoveryMethod):
    """tsFCI causal discovery method."""

    def __init__(self, 
                 data, 
                 min_lag,
                 max_lag, 
                 verbosity, 
                 alpha = 0.05, 
                 resfolder = None,
                 neglect_only_autodep = False,
                 clean_cls = True):
        """
        Class constructor.

        Args:
            data (Data): data to analyse.
            min_lag (int): minimum time lag.
            max_lag (int): maximum time lag.
            verbosity (CPLevel): verbosity level.
            alpha (float, optional): PCMCI significance level. Defaults to 0.05.
            resfolder (string, optional): result folder to create. Defaults to None.
            neglect_only_autodep (bool, optional): Bit for neglecting variables with only autodependency. Defaults to False.
            clean_cls (bool): Clean console bit. Default to True.
        """
        super().__init__(data, min_lag, max_lag, verbosity, alpha, resfolder, neglect_only_autodep, clean_cls)


    def run(self) -> DAG:
        """
        Run causal discovery algorithm.

        Returns:
            (DAG): estimated causal model.
        """
        # Remove all arguments from directory
        dir_path = os.path.dirname(os.path.realpath(__file__))
        Path(dir_path + "/args").mkdir(exist_ok=True)
        Path(dir_path + "/results").mkdir(exist_ok=True)

        script = dir_path + "/pkgs/tsfci.R"
        r_arg_list = []

        # COMMAND WITH ARGUMENTS
        self.data.d.to_csv(dir_path + "/args/data.csv", index=False)
        r_arg_list.append(dir_path + "/args/data.csv")
        r_arg_list.append(str(self.alpha))
        r_arg_list.append(str(self.max_lag))

        r_arg_list.append(dir_path)
        cmd = ["Rscript", script] + r_arg_list

        p = Popen(cmd, cwd="./", stdin=PIPE, stdout=PIPE, stderr=PIPE)

        # Return R output or error
        output, error = p.communicate()
        print(output.decode('utf-8'))
        if p.returncode == 0:
            g_df = pd.read_csv(dir_path + "/results/result.csv", header=0, index_col=0)
            g_dict = self.ts_fci_dataframe_to_dict(g_df, self.data.features, self.max_lag)
            self.CM = self._to_DAG(g_dict)
            utils.clean(dir_path)

            if self.resfolder is not None: self.logger.close()
            return self.CM

        else:
            utils.clean(dir_path)
            print('R Error:\n {0}'.format(error.decode('utf-8')))
            exit(0)


    def _to_DAG(self, graph):
        """
        Re-elaborate the result in a DAG.

        Args:
            graph (dict): graph to convert into a DAG

        Returns:
            (DAG): result re-elaborated.
        """
        tmp_dag = DAG(self.data.features, self.min_lag, self.max_lag, self.neglect_only_autodep)
        tmp_dag.sys_context = dict()
        for t in graph.keys():
            for s in graph[t]:
                lag = abs(s[1])
                if lag >= self.min_lag and lag <= self.max_lag:
                    tmp_dag.add_source(t, s[0], utils.DSCORE, 0, s[1])
        return tmp_dag


    def ts_fci_dataframe_to_dict(self, df, names, nlags) -> dict:
        """
        Convert tsFCI result into a dict for _to_DAG.

        Args:
            df (DataFrame): graph.
            names (list[str]): variables' name.
            nlags (int): max time lag.

        Returns:
            dict: dict graph.
        """
        # todo: check if its correct
        for i in range(df.shape[1]):
            for j in range(i+1, df.shape[1]):
                if df[df.columns[i]].loc[df.columns[j]] == 2:
                    if df[df.columns[j]].loc[df.columns[i]] == 2:
                        print(df.columns[i] + " <-> " + df.columns[j])

        g_dict = dict()
        for name_y in names:
            g_dict[name_y] = []
        for ty in range(nlags):
            for name_y in names:
                t_name_y = df.columns[ty*len(names)+names.index(name_y)]
                for tx in range(nlags):
                    for name_x in names:
                        t_name_x = df.columns[tx * len(names) + names.index(name_x)]
                        if df[t_name_y].loc[t_name_x] == 2:
                            if (name_x, tx-ty) not in g_dict[name_y]:
                                g_dict[name_y].append((name_x, tx - ty))
        print(g_dict)
        return g_dict

__init__(data, min_lag, max_lag, verbosity, alpha=0.05, resfolder=None, neglect_only_autodep=False, clean_cls=True)

Class constructor.

Parameters:

Name Type Description Default
data Data

data to analyse.

required
min_lag int

minimum time lag.

required
max_lag int

maximum time lag.

required
verbosity CPLevel

verbosity level.

required
alpha float

PCMCI significance level. Defaults to 0.05.

0.05
resfolder string

result folder to create. Defaults to None.

None
neglect_only_autodep bool

Bit for neglecting variables with only autodependency. Defaults to False.

False
clean_cls bool

Clean console bit. Default to True.

True
Source code in causalflow/causal_discovery/baseline/tsFCI.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(self, 
             data, 
             min_lag,
             max_lag, 
             verbosity, 
             alpha = 0.05, 
             resfolder = None,
             neglect_only_autodep = False,
             clean_cls = True):
    """
    Class constructor.

    Args:
        data (Data): data to analyse.
        min_lag (int): minimum time lag.
        max_lag (int): maximum time lag.
        verbosity (CPLevel): verbosity level.
        alpha (float, optional): PCMCI significance level. Defaults to 0.05.
        resfolder (string, optional): result folder to create. Defaults to None.
        neglect_only_autodep (bool, optional): Bit for neglecting variables with only autodependency. Defaults to False.
        clean_cls (bool): Clean console bit. Default to True.
    """
    super().__init__(data, min_lag, max_lag, verbosity, alpha, resfolder, neglect_only_autodep, clean_cls)

run()

Run causal discovery algorithm.

Returns:

Type Description
DAG

estimated causal model.

Source code in causalflow/causal_discovery/baseline/tsFCI.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def run(self) -> DAG:
    """
    Run causal discovery algorithm.

    Returns:
        (DAG): estimated causal model.
    """
    # Remove all arguments from directory
    dir_path = os.path.dirname(os.path.realpath(__file__))
    Path(dir_path + "/args").mkdir(exist_ok=True)
    Path(dir_path + "/results").mkdir(exist_ok=True)

    script = dir_path + "/pkgs/tsfci.R"
    r_arg_list = []

    # COMMAND WITH ARGUMENTS
    self.data.d.to_csv(dir_path + "/args/data.csv", index=False)
    r_arg_list.append(dir_path + "/args/data.csv")
    r_arg_list.append(str(self.alpha))
    r_arg_list.append(str(self.max_lag))

    r_arg_list.append(dir_path)
    cmd = ["Rscript", script] + r_arg_list

    p = Popen(cmd, cwd="./", stdin=PIPE, stdout=PIPE, stderr=PIPE)

    # Return R output or error
    output, error = p.communicate()
    print(output.decode('utf-8'))
    if p.returncode == 0:
        g_df = pd.read_csv(dir_path + "/results/result.csv", header=0, index_col=0)
        g_dict = self.ts_fci_dataframe_to_dict(g_df, self.data.features, self.max_lag)
        self.CM = self._to_DAG(g_dict)
        utils.clean(dir_path)

        if self.resfolder is not None: self.logger.close()
        return self.CM

    else:
        utils.clean(dir_path)
        print('R Error:\n {0}'.format(error.decode('utf-8')))
        exit(0)

ts_fci_dataframe_to_dict(df, names, nlags)

Convert tsFCI result into a dict for _to_DAG.

Parameters:

Name Type Description Default
df DataFrame

graph.

required
names list[str]

variables' name.

required
nlags int

max time lag.

required

Returns:

Name Type Description
dict dict

dict graph.

Source code in causalflow/causal_discovery/baseline/tsFCI.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def ts_fci_dataframe_to_dict(self, df, names, nlags) -> dict:
    """
    Convert tsFCI result into a dict for _to_DAG.

    Args:
        df (DataFrame): graph.
        names (list[str]): variables' name.
        nlags (int): max time lag.

    Returns:
        dict: dict graph.
    """
    # todo: check if its correct
    for i in range(df.shape[1]):
        for j in range(i+1, df.shape[1]):
            if df[df.columns[i]].loc[df.columns[j]] == 2:
                if df[df.columns[j]].loc[df.columns[i]] == 2:
                    print(df.columns[i] + " <-> " + df.columns[j])

    g_dict = dict()
    for name_y in names:
        g_dict[name_y] = []
    for ty in range(nlags):
        for name_y in names:
            t_name_y = df.columns[ty*len(names)+names.index(name_y)]
            for tx in range(nlags):
                for name_x in names:
                    t_name_x = df.columns[tx * len(names) + names.index(name_x)]
                    if df[t_name_y].loc[t_name_x] == 2:
                        if (name_x, tx-ty) not in g_dict[name_y]:
                            g_dict[name_y].append((name_x, tx - ty))
    print(g_dict)
    return g_dict

This module provides the VarLiNGAM class.

Classes

VarLiNGAM: class containing the VarLiNGAM causal discovery algorithm.

VarLiNGAM

Bases: CausalDiscoveryMethod

VarLiNGAM causal discovery method.

Source code in causalflow/causal_discovery/baseline/VarLiNGAM.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class VarLiNGAM(CausalDiscoveryMethod):
    """VarLiNGAM causal discovery method."""

    def __init__(self, 
                 data, 
                 min_lag,
                 max_lag, 
                 verbosity, 
                 alpha = 0.05, 
                 resfolder = None,
                 neglect_only_autodep = False,
                 clean_cls = True):
        """
        Class constructor.

        Args:
            data (Data): data to analyse.
            min_lag (int): minimum time lag.
            max_lag (int): maximum time lag.
            verbosity (CPLevel): verbosity level.
            alpha (float, optional): PCMCI significance level. Defaults to 0.05.
            resfolder (string, optional): result folder to create. Defaults to None.
            neglect_only_autodep (bool, optional): Bit for neglecting variables with only autodependency. Defaults to False.
            clean_cls (bool): Clean console bit. Default to True.
        """
        super().__init__(data, min_lag, max_lag, verbosity, alpha, resfolder, neglect_only_autodep, clean_cls)


    def run(self) -> DAG:
        """
        Run causal discovery algorithm.

        Returns:
            (DAG): estimated causal model.
        """
        split_by_causal_effect_sign = True

        model = VARLiNGAM(lags = self.max_lag, criterion='bic', prune=True)
        model.fit(self.data.d)

        m = model._adjacency_matrices
        am = np.concatenate([*m], axis=1)

        dag = np.abs(am) > self.alpha

        if split_by_causal_effect_sign:
            direction = np.array(np.where(dag))
            signs = np.zeros_like(dag).astype('int64')
            for i, j in direction.T:
                signs[i][j] = np.sign(am[i][j]).astype('int64')
            dag = signs

        dag = np.abs(dag)
        names = self.data.features
        res_dict = dict()
        for e in range(dag.shape[0]):
            res_dict[names[e]] = []
        for c in range(dag.shape[0]):
            for te in range(dag.shape[1]):
                if dag[c][te] == 1:
                    e = te%dag.shape[0]
                    t = te//dag.shape[0]
                    res_dict[names[e]].append((names[c], -t))
        self.CM = self._to_DAG(res_dict)

        if self.resfolder is not None: self.logger.close()
        return self.CM

    def _to_DAG(self, graph):
        """
        Re-elaborates the result in a DAG.

        Args:
            graph (dict): graph to convert into a DAG

        Returns:
            (DAG): result re-elaborated.
        """
        tmp_dag = DAG(self.data.features, self.min_lag, self.max_lag, self.neglect_only_autodep)
        tmp_dag.sys_context = dict()
        for t in graph.keys():
            for s in graph[t]:
                lag = abs(s[1])
                if lag >= self.min_lag and lag <= self.max_lag:
                    tmp_dag.add_source(t, s[0], utils.DSCORE, 0, s[1])
        return tmp_dag

__init__(data, min_lag, max_lag, verbosity, alpha=0.05, resfolder=None, neglect_only_autodep=False, clean_cls=True)

Class constructor.

Parameters:

Name Type Description Default
data Data

data to analyse.

required
min_lag int

minimum time lag.

required
max_lag int

maximum time lag.

required
verbosity CPLevel

verbosity level.

required
alpha float

PCMCI significance level. Defaults to 0.05.

0.05
resfolder string

result folder to create. Defaults to None.

None
neglect_only_autodep bool

Bit for neglecting variables with only autodependency. Defaults to False.

False
clean_cls bool

Clean console bit. Default to True.

True
Source code in causalflow/causal_discovery/baseline/VarLiNGAM.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(self, 
             data, 
             min_lag,
             max_lag, 
             verbosity, 
             alpha = 0.05, 
             resfolder = None,
             neglect_only_autodep = False,
             clean_cls = True):
    """
    Class constructor.

    Args:
        data (Data): data to analyse.
        min_lag (int): minimum time lag.
        max_lag (int): maximum time lag.
        verbosity (CPLevel): verbosity level.
        alpha (float, optional): PCMCI significance level. Defaults to 0.05.
        resfolder (string, optional): result folder to create. Defaults to None.
        neglect_only_autodep (bool, optional): Bit for neglecting variables with only autodependency. Defaults to False.
        clean_cls (bool): Clean console bit. Default to True.
    """
    super().__init__(data, min_lag, max_lag, verbosity, alpha, resfolder, neglect_only_autodep, clean_cls)

run()

Run causal discovery algorithm.

Returns:

Type Description
DAG

estimated causal model.

Source code in causalflow/causal_discovery/baseline/VarLiNGAM.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def run(self) -> DAG:
    """
    Run causal discovery algorithm.

    Returns:
        (DAG): estimated causal model.
    """
    split_by_causal_effect_sign = True

    model = VARLiNGAM(lags = self.max_lag, criterion='bic', prune=True)
    model.fit(self.data.d)

    m = model._adjacency_matrices
    am = np.concatenate([*m], axis=1)

    dag = np.abs(am) > self.alpha

    if split_by_causal_effect_sign:
        direction = np.array(np.where(dag))
        signs = np.zeros_like(dag).astype('int64')
        for i, j in direction.T:
            signs[i][j] = np.sign(am[i][j]).astype('int64')
        dag = signs

    dag = np.abs(dag)
    names = self.data.features
    res_dict = dict()
    for e in range(dag.shape[0]):
        res_dict[names[e]] = []
    for c in range(dag.shape[0]):
        for te in range(dag.shape[1]):
            if dag[c][te] == 1:
                e = te%dag.shape[0]
                t = te//dag.shape[0]
                res_dict[names[e]].append((names[c], -t))
    self.CM = self._to_DAG(res_dict)

    if self.resfolder is not None: self.logger.close()
    return self.CM