Transition path theory

Transition path theory (TPT) [1][2] is a method to study the ensemble of reactive trajectories, i.e., trajectories which come from a defined set of states AA and go next to BB. It can answer at which rate they occur, as well as depict parallel pathways, traps, sequences of events, etc. Furthermore it introduces the notion of ‘committor functions’, which deals with probabilities of ending up in set AA or BB given the trajectory starts at some state potentially outside ABA\cup B.

The implementation is based on [3]. Coarse-graining by path decomposition is presented in [3] and [4].

To demonstrate the TPT API (API docs here), in the following the example of a drunkard’s walk is presented. The example is motivated by [5] and [6], where a drunkard is placed on a network of states and two special states, home and the bar. When the drunkard reaches either of these special states the trajectory stays there with high probability. One can then ask which paths can be taken and also with which probability the drunkard is going to reach either of the states given a certain current position.

To this end, import numpy for general numerical operations.

[1]:
import numpy as np

We can create a DrunkardsWalk simulator by specifying bar and home locations. As the drunkard lives on a 2-dimensional grid, the locations are given in terms of integer coordinates. Internally, this is related back to width×height\mathrm{width}\times\mathrm{height} states.

There are four kinds of states: - home states: states which denote the location of the home - bar states: states which denote the location of the bar - barrier states: states which either cannot be crossed or can only be crossed by overcoming a potential (i.e., it is less likely to encounter these states in a trajectory) - normal states: the drunkard can move freely by taking a step onto one of the adjacent grid cells with uniform probability unless it is a barrier.

[2]:
from deeptime.data import drunkards_walk

sim = drunkards_walk(grid_size=(10, 10),
                     bar_location=[(0, 0), (0, 1), (1, 0), (1, 1)],
                     home_location=[(8, 8), (8, 9), (9, 8), (9, 9)])

We can add hard and soft barriers by specifying start and end points of the barrier. If no weight is given, the barrier is hard, i.e., cannot be crossed by a trajectory. This enters the jump probabilities from adjacent cells (i,j)(i,j) as

P(xt+1=barrierxt=(i,j))={p1/weight, if weight positive,0, otherwise.\mathbb{P}(x_{t+1}=\mathrm{barrier} \mid x_t = (i, j)) = \begin{cases} p \leq 1/\mathrm{weight} &\text{, if weight positive,}\\ 0 &\text{, otherwise.} \end{cases}

That means the larger the weight, the smaller the probability to cross the barrier.

[3]:
sim.add_barrier((5, 1), (5, 5))
sim.add_barrier((0, 9), (5, 8))
sim.add_barrier((9, 2), (7, 6))
sim.add_barrier((2, 6), (5, 6))

sim.add_barrier((7, 9), (7, 7), weight=5.)
sim.add_barrier((8, 7), (9, 7), weight=5.)

sim.add_barrier((0, 2), (2, 2), weight=5.)
sim.add_barrier((2, 0), (2, 1), weight=5.)

Now we can simulate a trajectory on this grid by specifying a starting point and a number of simulation steps. The effective length of the trajectory might be lower than the number of simulation steps as the simulation stops if the state is home or bar.

[4]:
start = (7, 2)
walk = sim.walk(start=start, n_steps=250, seed=40)
print("Number of steps in the walk:", len(walk))
Number of steps in the walk: 74

The trajectory can be visualized with a few helper functions attached to the simulator:

[5]:
import matplotlib as mpl
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable

fig, ax = plt.subplots(figsize=(10, 10))

ax.scatter(*start, marker='*', label='Start', c='cyan', s=150, zorder=5)
sim.plot_path(ax, walk)
handles, labels = sim.plot_2d_map(ax)
ax.legend(handles=handles, labels=labels);
../_images/notebooks_tpt_10_0.png

Here, the darker red squares denote hard barriers, the lighter red squares denote barriers which can be crossed. So one could imagine, that both home and bar are on a hill.

The simulator internally holds a MarkovStateModel on which one can call reactive_flux(A, B). This computes some quantities related to the ensemble of reactive trajectories between sets of states AA and BB.

[6]:
print(f"Internal Markov state model with {sim.msm.n_states} states to cover the {sim.grid_size} grid.")
print(f"Compute reactive flux from A={sim.home_state} to B={sim.bar_state}")
flux = sim.msm.reactive_flux(sim.home_state, sim.bar_state)
Internal Markov state model with 100 states to cover the (10, 10) grid.
Compute reactive flux from A=[88, 98, 89, 99] to B=[0, 10, 1, 11]

The committor

One of the questions that can be answered with TPT is: If the man is at some state (i,j)(i,j), what is the probability that he reaches the bar before home? If he is already already at the bar, the probability is 11, i.e.,

P((i,j))=0  (i,j)Home\mathbb{P}((i,j)) = 0\;\forall (i,j)\in\mathrm{Home}

and vice versa

P((i,j))=1  (i,j)Bar.\mathbb{P}((i,j)) = 1\;\forall (i,j)\in\mathrm{Bar}.

The law of total probability tells us that to know the probability of an event AA with a known and at most countably infinite set of mutually exclusive events {Cn:nN}\{C_n : n\in\mathbb{N}\} so that iP(Ci)=1\sum_i \mathbb{P}(C_i) = 1, one can evaluate

P(A)=nP(ACn)P(Cn).\mathbb{P}(A) = \sum_n \mathbb{P}(A\mid C_n)\mathbb{P}(C_n).

In our example and ignoring borders and barriers, this means that for a current state s=(i,j)s=(i,j) and A="Home from state s"A=\text{"Home from state }s\text{"} the sample space consists of direct neighbor states, i.e.,

C={move from s to s+(k,l):(k,l){1,0,1}2 and (k,l)(0,0)}}.C = \{ \text{move from }s\text{ to }s+(k,l) : (k,l)\in \{ -1, 0, 1 \}^2 \text{ and } (k,l)\neq (0, 0) \} \}.

Then,

P(Home from s)=C(k,l)CP(Home from s+(k,l))P(C(k,l)).\mathbb{P}(\text{Home from }s) = \sum_{C_{(k,l)}\in C} \mathbb{P}(\text{Home from }s+(k,l))\mathbb{P}(C_{(k,l)}).

But the P(C(k,l))\mathbb{P}(C_{(k,l)}) are exactly the transition probabilities to move from state ss to s+(k,l)s+(k,l), giving rise to the forward committor

qs(+)=q(i,j)(+)=(k,l)(0,0)q(i+k,j+l)(+)p(i,j),(i+k,j+l),q_s^{(+)} = q_{(i,j)}^{(+)} = \sum_{(k,l)\neq(0, 0)} q_{(i+k, j+l)}^{(+)}p_{(i,j),(i+k, j+l)},

where p(i,j),(i+k,j+l)p_{(i,j),(i+k, j+l)} is the probability to transition from state s=(i,j)s=(i,j) to one of its neighboring states.

More formally, the forward committor probability is the probability to reach set of states AA before BB. Using the first hitting time of a set SS given by

TS=inf{t0:XtS},T_{S}=\inf\{t \geq 0 : X_t \in S \},

it can be defined as

qi(+)=Pi(TA<TB).q^{(+)}_i=\mathbb{P}_{i}(T_{A} < T_{B}).

It also satisfies the boundary value problem (BVP)

qi(+)=0, for all iA,qi(+)=1, for all iB,jLijqj(+)=0, for all i∉AB,\begin{aligned} q_i^{(+)} &= 0 &&\text{, for all }i\in A,\\ q_i^{(+)} &= 1 &&\text{, for all }i\in B,\\ \sum_j L_{ij} q^{(+)}_{j}&=0 &&\text{, for all }i\not\in A\cup B,\\ \end{aligned}

where L=PIL=P-\mathbb{I} is the generator matrix and PP the transition matrix. The BVP-formulation is used to numerically find the forward committor.

There is also the notion of a backward committor, which is the probability that given state ii, the system has previously been in set AA rather than BB. With detailed balance, the backward committor is given by qi()=1qi(+)q_i^{(-)} = 1 - q_i^{(+)}.

[7]:
fig, axes = plt.subplots(1, 2, figsize=(15, 10))
dividers = [make_axes_locatable(axes[i]) for i in range(len(axes))]
caxes = [divider.append_axes("right", size="5%", pad=0.05) for divider in dividers]
titles = ["Forward committor", "Backward committor"]

for i, ax in enumerate(axes):
    ax.set_title(titles[i])
    ax.scatter(*start, marker='*', label='Start', c='cyan', s=150, zorder=5)
    handles, labels = sim.plot_2d_map(ax, barrier_mode='hollow')

    if i == 0:
        Q = flux.forward_committor.reshape(sim.grid_size)
    else:
        Q = flux.backward_committor.reshape(sim.grid_size)
    cb = ax.imshow(Q, interpolation='nearest', origin='lower', cmap='coolwarm')
    fig.colorbar(cb, cax=caxes[i])
    if i == 0:
        fig.delaxes(fig.axes[2])

    ax.legend(handles=handles, labels=labels)


plt.tight_layout()
../_images/notebooks_tpt_14_0.png

As one can observe, the forward committor probabilities increase gradually when moving from home to bar, vice versa for the backward committor. This is expected as the Markov state model is reversible.

Reactive probability flux

If one is interested in the current of trajectories making their way from a set of states AA to a set of states BB (without trajectories that, e.g., leave AA just to go back to AA and only then enter BB), TPT offers the reactive flux. The relevant gross flux between two states ii and jj is then given by

fijAB={qi()πipijqj(+), if ij,0, otherwise.f_{ij}^{AB} = \begin{cases}q_i^{(-)}\pi_i p_{ij}q_j^{(+)} &\text{, if }i\neq j,\\ 0 &\text{, otherwise.}\end{cases}

The flux is conserved between intermediate states

jfijABfjiAB=0  i∉AB,\sum_j f_{ij}^{AB} - f_{ji}^{AB} = 0\;\forall i\not\in A\cup B,

and also throughout the entire network

iA,jnAfijAB=i∉B,jBfijAB.\sum_{i\in A,j\not in A}f_{ij}^{AB} = \sum_{i\not\in B, j\in B} f_{ij}^{AB}.

The considered gross flux can include detours of the form AijijBA\rightarrow\ldots\rightarrow i\rightarrow j\rightarrow i\rightarrow j\rightarrow\ldots\rightarrow B, which can be (in the case of detailed balance) excluded by considering the net flux

fijAB,+=max{0,fijABfjiAB}.f_{ij}^{AB,+} = \max\{ 0, f_{ij}^{AB}-f_{ji}^{AB} \}.

In general and without detailed balance, the net flux might still contain such detours.

In deeptime, the gross and net flux are accessible from the ReactiveFlux object as (nstates×nstates)(n_\mathrm{states}\times n_\mathrm{states}) numpy arrays, where the first dimension corresponds to state ii, the second dimension corresponds to state jj.

[8]:
print(f"Gross flux shape {flux.gross_flux.shape}, net flux shape {flux.net_flux.shape}")
Gross flux shape (100, 100), net flux shape (100, 100)

The example simulator offers plotting functionality so that the fluxes can be visualized:

[9]:
fig, axes = plt.subplots(1, 2, figsize=(18, 10))
dividers = [make_axes_locatable(axes[i]) for i in range(len(axes))]
caxes = [divider.append_axes("right", size="5%", pad=0.05) for divider in dividers]

titles = ["Gross flux", "Net flux"]
fluxes = [flux.gross_flux, flux.net_flux]

cmap = plt.cm.copper_r
thresh = [0, 1e-12]

for i in range(len(axes)):
    ax = axes[i]
    F = fluxes[i]
    ax.set_title(titles[i])

    vmin = np.min(F[np.nonzero(F)])
    vmax = np.max(F)

    sim.plot_2d_map(ax)
    sim.plot_network(ax, F, cmap=cmap, connection_threshold=thresh[i])
    norm = mpl.colors.LogNorm(vmin=vmin, vmax=vmax)
    fig.colorbar(plt.cm.ScalarMappable(norm=norm, cmap=cmap), cax=caxes[i]);
../_images/notebooks_tpt_20_0.png

From the gross flux one can derive the total flux, the total number of reactive ABA\rightarrow B trajectories per time unit

ftotAB=iA,j∉AfijAB,f_{\mathrm{tot}}^{AB} = \sum_{i\in A, j\not\in A}f_{ij}^{AB},

which can be accessed by:

[10]:
print(f"Total flux = {flux.total_flux:.3e}/step")
Total flux = 6.165e-05/step

This quantity gives rise to the total transition rate, which is the number of events given we start in AA

kAB=ftotABiπiqi()k_{AB} = \frac{f_\mathrm{tot}^{AB}}{\sum_i\pi_iq_i^{(-)}}
[11]:
print(f'Rate = {flux.rate:.3e}/step')
Rate = 1.264e-04/step

which is also the inverse ABA\to B mean first passage time mfpt=1/kAB\mathrm{mfpt} = 1/k_{AB}

[12]:
print(f'MFPT = {flux.mfpt:.3f} steps')
MFPT = 7911.887 steps

Coarse-graining of fluxes

For better interpretability, one can cluster microstates defined in a Markov state model into metastable sets using, e.g., PCCA+.

In our example, we might select six clusters:

[13]:
pcca = sim.msm.pcca(6)

and depict the membership probabilities for each state

[14]:
fig, axes = plt.subplots(2, 3, figsize=(15, 10), sharex=True, sharey=True)

for i, ax in enumerate(axes.flatten()):
    ax.set_title(f"Memberships for metastable set {i+1}")
    handles, labels = sim.plot_2d_map(ax, barrier_mode='hollow')

    Q = pcca.memberships[:, i].reshape(sim.grid_size)
    cb = ax.imshow(Q, interpolation='nearest', origin='lower', cmap=plt.cm.Blues);
norm = mpl.colors.Normalize(vmin=0, vmax=1)
fig.colorbar(plt.cm.ScalarMappable(norm=norm, cmap=plt.cm.Blues), ax=axes, shrink=.8);
../_images/notebooks_tpt_30_0.png

Also reactive fluxes can be coarse-grained by lumping together states - without systematic error as in the PCCA case. For simplicity, we coarse grain onto AA, BB, and the remainder subdivided into the upper half (j5j\geq 5) and the lower half.

[15]:
remainder_upper = []
remainder_lower = []
for i in range(sim.grid_size[0]):
    for j in range(sim.grid_size[1]):
        state = sim.coordinate_to_state((i, j))
        if state not in sim.home_state + sim.bar_state:
            if j >= 5:
                remainder_upper.append(state)
            else:
                remainder_lower.append(state)
[16]:
sets, tpt = flux.coarse_grain([sim.home_state, sim.bar_state, remainder_upper, remainder_lower])

As expected we obtain four sets which are exactly the home states, the bar states, and the subdivided remainder.

[17]:
# enumerating assignments
assignments = np.zeros(sim.n_states)
for i, flux_set in enumerate(sets):
    assignments[np.array(list(flux_set))] = i
assignments = assignments.reshape(sim.grid_size)

# plot assignments
fig, ax = plt.subplots(1, 1)
cmap = plt.cm.get_cmap('Spectral', 4)
cb = ax.imshow(assignments, interpolation='nearest', origin='lower',
               cmap=cmap, vmin=-.5, vmax=3.5)
cbar = fig.colorbar(cb, ticks=np.arange(4))
cbar.ax.set_yticklabels(['Set 0', 'Set 1', 'Set 2', 'Set 3']);
../_images/notebooks_tpt_35_0.png

The return value also contains the fluxes plus derived quantities: The gross flux has a forward-backward cycle between upper and lower set, while the net flux lacks this cycle.

[18]:
import networkx as nx

fig, axes = plt.subplots(1, 2, figsize=(15, 10))
graphs = [nx.DiGraph() for _ in range(len(axes))]

for i in range(len(axes)):
    ax = axes[i]

    ax.set_title("Gross flux" if i == 0 else "Net flux")

    F = tpt.gross_flux if i == 0 else tpt.net_flux
    G = graphs[i]
    for i in range(len(sets)):
        G.add_node(i, title=f"Set {i+1}")
    for i in range(len(sets)):
        for j in range(len(sets)):
            if F[i, j] > 0:
                G.add_edge(i, j, title=f"{F[i, j]:.3e}")

    edge_labels = nx.get_edge_attributes(G, 'title')
    pos = nx.circular_layout(G)
    nx.draw_networkx_nodes(G, pos, ax=ax, node_size=1100)
    nx.draw_networkx_labels(G, pos, ax=ax, labels=nx.get_node_attributes(G, 'title'));
    fragments = nx.draw_networkx_edge_labels(G, pos, ax=ax, edge_labels=edge_labels)
    nx.draw_networkx_edges(G, pos, ax=ax, arrowstyle='-|>', arrowsize=30,
                           connectionstyle='arc3, rad=0.3');
../_images/notebooks_tpt_37_0.png

Pathway decomposition

A pathway is a sequence of states P=(s1,s2,,sk)P=(s_1,s_2,\ldots,s_k) so that s1As_1\in A and skBs_k\in B. One can associate a capacity (the minimal current) to a pathway via

f(P)=min{fsisi+1AB:i=1,,k1}.f(P) = \min\{ f_{s_i s_{i+1}}^{AB} : i=1,\ldots, k-1\}.

A pathway decomposition is then the repeated choosing of a path and subsequent removal of its capacity along its edges until no flux remains.

The decomposition is not unique and depends on the order in which the paths are choosen. In deeptime, a pathway decomposition is implemented by iteratively removing the currently strongest pathway (i.e., with largest capacity).

A call to pathways() performs said operation. One can optionally give a fraction (default 1.0) which stops the decomposition when the given fraction of the total flux is reached with the decomposition. The default tries to the flux into all dominant pathways which can be computationally intensive for large networks. Furthermore one can set a hard limit on the number of computed pathways via the maxiter argument.

[19]:
paths, capacities = flux.pathways(fraction=.3, maxiter=1000)
print(len(paths))
4

In this example, we can represent 30%30\% of the total flux with four pathways.

[20]:
import matplotlib as mpl
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable

fig, ax = plt.subplots(figsize=(10, 10))

ax.scatter(*start, marker='*', label='Start', c='cyan', s=150, zorder=5)

for capacity, path in zip((capacities / np.array(capacities).sum())[:10], paths[:10]):
    path = np.array([sim.state_to_coordinate(state) for state in path])
    sim.plot_path(ax, path, lw=capacity*10, intermediates=False,
                  color_lerp=False, label=f"Capacity {capacity:.2e}")
    ax.scatter(*path.T, marker='x')

handles, labels = sim.plot_2d_map(ax)
ax.legend(handles=handles, labels=labels);
../_images/notebooks_tpt_42_0.png

Example from trajectories

Using the same example we can generate timeseries which can be used to re-estimate a Markov state model and perform a TPT analysis. To this end, we simulate 1000 trajectories all starting from the same point and return the micro states directly rather than (i,j)(i, j) coordinates.

[21]:
trajs = []
for _ in range(1000):
    trajs.append(sim.walk(start=start, n_steps=2000, return_states=True, stop=False))

From these trajectories we can count state transitions based on the prior knowledge that there are actually 100100 states and select the largest connected submodel - which due to the barriers does not contain all the states.

[22]:
from deeptime.markov import TransitionCountEstimator

count_model = TransitionCountEstimator(1, 'sliding', n_states=sim.n_states) \
    .fit(trajs).fetch_model()
count_model = count_model.submodel_largest()
[23]:
print(f"States not included in the count model: "
      f"{set(range(count_model.n_states_full)) - set(count_model.state_symbols)}")
States not included in the count model: {15, 25, 28, 29, 35, 38, 45, 47, 48, 55, 57, 62, 63, 64, 65, 67, 83, 84, 85, 90, 91, 92, 93}

Based on this we can estimate a Markov state model:

[24]:
from deeptime.markov.msm import MaximumLikelihoodMSM

mlmsm = MaximumLikelihoodMSM().fit(count_model).fetch_model()

While we represent all counts in the markov state model, only a fraction of the states is represented - as expected:

[25]:
print("Count fraction:", mlmsm.count_fraction)
print("State fraction:", mlmsm.state_fraction)
Count fraction: 1.0
State fraction: 0.77

On this we can compute the reactive flux:

[26]:
flux = mlmsm.reactive_flux(
     mlmsm.count_model.symbols_to_states(sim.home_state),
     mlmsm.count_model.symbols_to_states(sim.bar_state))

And perform the above described analyses, for example a pathway decomposition and the forward committor:

[27]:
paths, capacities = flux.pathways(.3)

fig, ax = plt.subplots(figsize=(10, 10))

ax.set_title("Forward committor and pathway decomposition.")

for capacity, path in zip((capacities / np.array(capacities).sum())[:10], paths[:10]):
    path = mlmsm.count_model.states_to_symbols(path)
    path = np.array([sim.state_to_coordinate(state) for state in path])
    sim.plot_path(ax, path, lw=capacity*10, label=f"Capacity {capacity:.2e}",
                  intermediates=False, color_lerp=False)
    ax.scatter(*path.T, marker='x')

handles, labels = sim.plot_2d_map(ax, barriers=False)

Q = np.ones((sim.n_states))*np.nan
Q[mlmsm.state_symbols()] = flux.forward_committor
Q = Q.reshape(sim.grid_size)

cb = ax.imshow(Q, interpolation='nearest', origin='lower')
fig.colorbar(cb, ax=ax)

ax.legend(handles=handles, labels=labels);
../_images/notebooks_tpt_55_0.png

Comparing this plot with the ones above one can notice that the hard barriers are ‘missing’, meaning that they were not sampled in the trajectories and now show up as white cubes due to the image being initialized as 10×1010\times 10 array filled with np.nan.

References