Scalable cohort sampler
Use Dask to paralellize python
I have a some binary cohorts and need to sample for each cohort where the target label 1
is matching a observation from a dataframe where the target is 0
.
sample data
First, I will generate some sample data.
import tqdm
import numpy as np
import pandas as pd
tqdm.pandas()
seed = 47
np.random.seed(seed)
size = 10000
df = pd.DataFrame({i: np.random.randint(1,100,size=size) for i in ['metric']})
df['label'] = np.random.randint(0,2, size=size)
df['group_1'] = pd.Series(np.random.randint(1,12, size=size)).astype(object)
df['group_2'] = pd.Series(np.random.randint(1,10, size=size)).astype(object)
display(df.head())
group_0 = df[df['label'] == 0]
group_0 = group_0.reset_index(drop=True)
group_0 = group_0.rename(index=str, columns={"metric": "metric_group_0"})
join_columns_enrich = ['group_1', 'group_2']
join_real = ['metric_group_0']
join_real.extend(join_columns_enrich)
group_0 = group_0[join_real]
display(group_0.head())
group_1 = df[df['label'] == 1]
group_1 = group_1.reset_index(drop=True)
# group_1['metric_group_0'] = np.nan
display(group_1.head())
This gives two dataframes:
metric_group_0 group_1 group_2
0 87 10 4
1 25 7 9
2 67 3 5
3 37 9 1
4 9 6 7
metric label group_1 group_2
0 92 1 4 6
1 86 1 4 8
2 95 1 6 3
3 97 1 1 3
4 90 1 2 9
In python a naive and slow an implementation to sample the data might look like:
k = 3
resulting_df = None
def knnJoinSingle(original_element, group_0, join_columns, random_state):
limits_dict = original_element[join_columns_enrich].to_dict()
query = ' & '.join([f"{k} == {v}" for k, v in limits_dict.items()])
candidates = group_0.query(query)
if len(candidates) > 0:
to_join = candidates.sample(n=1, random_state=random_state)['metric_group_0'].values[0]
else:
to_join = np.nan
original_element['metric_group_0'] = to_join
return original_element
for i in range(1, k+1):
print(i)
# WARNING:not setting random state, otherwise always the same record is picked
# in case of same values from group selection variables. Is there a better way?
rand_knn1_enriched = group_1.progress_apply(lambda x: knnJoinSingle(x, group_0, join_columns_enrich, random_state=None), axis = 1)
rand_knn1_enriched['run'] = i
if resulting_df is None:
resulting_df = rand_knn1_enriched
else:
resulting_df = pd.concat([resulting_df, rand_knn1_enriched])
resulting_df['difference'] = resulting_df['metric'] - resulting_df['metric_group_0']
resulting_df['differenceAbs'] = np.abs(resulting_df['difference'])
display(resulting_df.head())
print(len(resulting_df))
print(resulting_df.difference.isnull().sum())
It takes about one minute to sample the records. Can the performance be improved?
paralellize the computation.
Dask is a library in native Python to simply scale computations.
a minimal and contrived example
Here a minimal example which outlines how dask neatly parallelizes pandas:
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
df_first = pd.DataFrame({'foo':[1,2,3]})
df_second = pd.DataFrame({'bar':[1,2,3], 'baz':[1,2,3]})
df_first_scattered = client.scatter(df_first, broadcast=True)
display(df_first)
def foo(row, lookup):
# some computation which relies on the lookup
return lookup['foo'].iloc[2]
df_second_dask = dd.from_pandas(df_second, npartitions=2)
df_second_dask['foo'] = df_second_dask.apply(foo, args=[df_first_scattered], axis = 1, meta=('baz', 'int64'))
display(df_second_dask.visualize())
df_second_dask.compute()
There are not many new elements.
Initially, the data is broadcasted to the worker nodes: client.scatter(df_first, broadcast=True)
.
Dask never triggers the computation. It is lazy and only generates a graph df_second_dask.visualize()
. This graph can be visualized. Only when finally pressing the button to actually execute df_second_dask.compute()
, then the computation is initiated.
rewriting the sampler in dask
import dask.dataframe as dd
from dask.distributed import Client
import dask
client = Client()
display(client)
client.cluster
def knnJoinSingle_series(original_element, group_0_scattered, join_columns, random_state):
limits_dict = original_element[join_columns_enrich].to_dict()
query = ' & '.join([f"{k} == {v}" for k, v in limits_dict.items()])
candidates = group_0_scattered.query(query)
if len(candidates) > 0:
return candidates.sample(n=1, random_state=random_state)['metric_group_0'].values[0]
else:
return np.nan
def compute_dfs(gr_0, gr_1, k):
resulting_df = None
group_0_scattered = client.scatter(gr_0, broadcast=True)
group_1_dask = dd.from_pandas(gr_1, npartitions=16)
group_1_dask = client.persist(group_1_dask)
for i in range(1, k+1):
print(i)
dask_result_series = group_1_dask.apply(knnJoinSingle_series,
args=[group_0_scattered, join_columns_enrich, None],
axis = 1,
meta=('metric_group_0', 'int64'))
display(dask_result_series.visualize())
local_result_series = dask_result_series.compute()
group_1['metric_group_0'] = local_result_series
group_1['run'] = i
if resulting_df is None:
resulting_df = group_1
else:
resulting_df = pd.concat([resulting_df, group_1])
resulting_df['difference'] = resulting_df['metric'] - resulting_df['metric_group_0']
resulting_df['differenceAbs'] = np.abs(resulting_df['difference'])
display(resulting_df.head())
print(len(resulting_df))
print(resulting_df.difference.isnull().sum())
return resulting_df
resulting_df = compute_dfs(group_0, group_1, 10)
So what is the difference? Let’s look at the code i a bit more detail:
Instead of a regular pandas.DataFrmae
a parallelized dask.DataFrame
is created. Then the group_0
dataframe is broadcasted to all worker nodes group_0_scattered = client.scatter(gr_0, broadcast=True)
. The original Python function (knnJoinSingle_series
) does not need to be changed.
So far, nothing has happend. Only a graph of operations which need to be calculated has been set up. Now: group_1 = group_1_dask.compute()
to compute the results.
As the for loop is iterating over group_1_dask
for k times, it makes sense to persist the dataframe on the worker nodes: group_1_dask = client.persist(group_1_dask)
ensures that one does not need to re-transform the original pandas data frame over and over to a dask dataframe.
conclusions
Dask can easily release the GIL and not only parallelize on a single node. This nicely speeds up the sampling process. In fact instead of sampling for 3 runs in dask 10 runs are sampled and take about one minute and 16 seconds.
If you have an even better implementation of the sampler, feel free to add a comment with your improvements.
However, you still should rethink if there is not a even better option: as for loops of for loops do not offer the best runtime. Using plain pandas, one could simply sample multiple times from the other dataframe:
def randomMatchingCondition(original_element, group_0, join_columns, k, random_state):
limits_dict = original_element[join_columns_enrich].to_dict()
query = ' & '.join([f"{k} == {v}" for k, v in limits_dict.items()])
candidates = group_0.query(query)
if len(candidates) > 0:
return candidates.sample(n=k, random_state=random_state, replace=True)['metric_group_0'].values
else:
return np.nan
k = 3
group_1['metric_group_0'] = group_1.progress_apply(randomMatchingCondition,
args=[group_0, join_columns_enrich, k, None],
axis = 1)
print(group_1.isnull().sum())
group_1 = group_1[~group_1.metric_group_0.isnull()]
display(group_1.head())
s=pd.DataFrame({'metric_group_0':np.concatenate(group_1.metric_group_0.values)},index=group_1.index.repeat(group_1.metric_group_0.str.len()))
s = s.join(group_1.drop('metric_group_0',1),how='left')
s['pos_in_array'] = s.groupby(s.index).cumcount()
s.head()