When the data fits perfectly in local computer we should not use dask, instead we should use pandas. But when data is larger than RAM (eg. > 16GB) we can use dask. Pandas may take upto 10 times the RAM than the data size. (e.g if data is 2GB, pandas may crash on a computer of 16GB and we may need to use dask.
Dask does not have its own data dype, it uses pandas. But many pandas operations are not available in dask. Dask just distributes the tasks among the workers and makes the task lazy and uses DAG to do the computations.
If data is hundreds of GB, then we need to use pyspark not dask. Dask may not
support big data (~100GB). We can use spark for any size of data.
But spark is based on Scala not python. There is
python wrapper pyspark
and another module koalas
which provide some of the
functionalities of spark using python syntax but not all the functionalities.
If the code fails we see all the java script errors and its genuinely difficult
to debug the code.
References:
Here, in this notebook I use the big data analysiz tool called dask.
Dask utilizes multiple cores and perform distrubuted operaton. This uses lazy operations, meaning evaluation is done only if necessary and creates a DAG to perform the operations.
import numpy as np
import pandas as pd
import seaborn as sns
sns.set(color_codes=True)
import matplotlib.pyplot as plt
import os
import time
import collections
import joblib
# settings
%matplotlib inline
# random state
SEED = 0
RNG = np.random.RandomState(SEED)
from tqdm import tqdm, tqdm_notebook, tnrange
import ipywidgets as widgets
# my personal library
from bhishan import bp
# dask
import dask
import dask_ml
import dask.array as da
import dask.dataframe as dd
from dask_ml.xgboost import XGBClassifier
import xgboost
import dask_xgboost
from dask_ml.model_selection import train_test_split
# versions
import watermark
%load_ext watermark
%watermark -a "Bhishan Poudel" -d -v -m
print()
%watermark -iv
The watermark extension is already loaded. To reload it, use: %reload_ext watermark Bhishan Poudel 2020-10-30 CPython 3.7.7 IPython 7.18.1 compiler : Clang 4.0.1 (tags/RELEASE_401/final) system : Darwin release : 19.6.0 machine : x86_64 processor : i386 CPU cores : 4 interpreter: 64bit joblib 0.17.0 pandas 1.1.3 watermark 2.0.2 dask 2.30.0 seaborn 0.10.1 dask_ml 1.7.0 dask_xgboost 0.1.10 xgboost 1.1.1 ipywidgets 7.5.1 numpy 1.18.5
df_eval = pd.DataFrame({'Model': [],
'Description':[],
'Accuracy':[],
'Precision':[],
'Recall':[],
'F1':[],
'AUC':[],
})
df_eval
Model | Description | Accuracy | Precision | Recall | F1 | AUC |
---|
from dask.distributed import Client
client = Client(n_workers=4)
client
Client
|
Cluster
|
bp.show_methods(dask,6)
0 | 1 | 2 | 3 | 4 | 5 | |
---|---|---|---|---|---|---|
0 | array | compute | datasets | highlevelgraph | optimization | system |
1 | base | config | delayed | is_dask_collection | optimize | threaded |
2 | blockwise | context | distributed | istask | order | utils |
3 | bytes | core | get | local | persist | utils_test |
4 | callbacks | dataframe | hashing | multiprocessing | sizeof | visualize |
5 | compatibility |
bp.show_methods(dask_ml)
0 | 1 | 2 | |
---|---|---|---|
0 | metrics | utils | xgboost |
1 | model_selection | wrappers |
bp.show_methods(dd,6)
0 | 1 | 2 | 3 | 4 | 5 | |
---|---|---|---|---|---|---|
0 | Aggregation | concat | get_dummies | multi | read_parquet | to_datetime |
1 | DataFrame | core | groupby | numeric | read_sql_table | to_hdf |
2 | Index | demo | io | optimize | read_table | to_json |
3 | Series | extensions | isna | pivot_table | repartition | to_numeric |
4 | accessor | from_array | map_partitions | read_csv | reshape | to_parquet |
5 | assert_eq | from_bcolz | melt | read_fwf | rolling | to_records |
6 | backends | from_dask_array | merge | read_hdf | shuffle | to_sql |
7 | categorical | from_delayed | merge_asof | read_json | to_bag | to_timedelta |
8 | compute | from_pandas | methods | read_orc | to_csv | utils |
%%bash
# unzip ../data/raw/creditcard.csv.zip -d ../data/raw/
ls ../data/raw
creditcard.csv.zip
ifile = "../data/raw/creditcard.csv.zip"
# peek at data using pandas
dfx = pd.read_csv(ifile,compression='zip',nrows=10)
dfx
Time | V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | ... | V21 | V22 | V23 | V24 | V25 | V26 | V27 | V28 | Amount | Class | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | -1.359807 | -0.072781 | 2.536347 | 1.378155 | -0.338321 | 0.462388 | 0.239599 | 0.098698 | 0.363787 | ... | -0.018307 | 0.277838 | -0.110474 | 0.066928 | 0.128539 | -0.189115 | 0.133558 | -0.021053 | 149.62 | 0 |
1 | 0 | 1.191857 | 0.266151 | 0.166480 | 0.448154 | 0.060018 | -0.082361 | -0.078803 | 0.085102 | -0.255425 | ... | -0.225775 | -0.638672 | 0.101288 | -0.339846 | 0.167170 | 0.125895 | -0.008983 | 0.014724 | 2.69 | 0 |
2 | 1 | -1.358354 | -1.340163 | 1.773209 | 0.379780 | -0.503198 | 1.800499 | 0.791461 | 0.247676 | -1.514654 | ... | 0.247998 | 0.771679 | 0.909412 | -0.689281 | -0.327642 | -0.139097 | -0.055353 | -0.059752 | 378.66 | 0 |
3 | 1 | -0.966272 | -0.185226 | 1.792993 | -0.863291 | -0.010309 | 1.247203 | 0.237609 | 0.377436 | -1.387024 | ... | -0.108300 | 0.005274 | -0.190321 | -1.175575 | 0.647376 | -0.221929 | 0.062723 | 0.061458 | 123.50 | 0 |
4 | 2 | -1.158233 | 0.877737 | 1.548718 | 0.403034 | -0.407193 | 0.095921 | 0.592941 | -0.270533 | 0.817739 | ... | -0.009431 | 0.798278 | -0.137458 | 0.141267 | -0.206010 | 0.502292 | 0.219422 | 0.215153 | 69.99 | 0 |
5 | 2 | -0.425966 | 0.960523 | 1.141109 | -0.168252 | 0.420987 | -0.029728 | 0.476201 | 0.260314 | -0.568671 | ... | -0.208254 | -0.559825 | -0.026398 | -0.371427 | -0.232794 | 0.105915 | 0.253844 | 0.081080 | 3.67 | 0 |
6 | 4 | 1.229658 | 0.141004 | 0.045371 | 1.202613 | 0.191881 | 0.272708 | -0.005159 | 0.081213 | 0.464960 | ... | -0.167716 | -0.270710 | -0.154104 | -0.780055 | 0.750137 | -0.257237 | 0.034507 | 0.005168 | 4.99 | 0 |
7 | 7 | -0.644269 | 1.417964 | 1.074380 | -0.492199 | 0.948934 | 0.428118 | 1.120631 | -3.807864 | 0.615375 | ... | 1.943465 | -1.015455 | 0.057504 | -0.649709 | -0.415267 | -0.051634 | -1.206921 | -1.085339 | 40.80 | 0 |
8 | 7 | -0.894286 | 0.286157 | -0.113192 | -0.271526 | 2.669599 | 3.721818 | 0.370145 | 0.851084 | -0.392048 | ... | -0.073425 | -0.268092 | -0.204233 | 1.011592 | 0.373205 | -0.384157 | 0.011747 | 0.142404 | 93.20 | 0 |
9 | 9 | -0.338262 | 1.119593 | 1.044367 | -0.222187 | 0.499361 | -0.246761 | 0.651583 | 0.069539 | -0.736727 | ... | -0.246914 | -0.633753 | -0.120794 | -0.385050 | -0.069733 | 0.094199 | 0.246219 | 0.083076 | 3.68 | 0 |
10 rows × 31 columns
# dfx.dtypes.to_dict()
dtypes = {'Time': np.dtype('float32'),
'V1': np.dtype('float32'),
'V2': np.dtype('float32'),
'V3': np.dtype('float32'),
'V4': np.dtype('float32'),
'V5': np.dtype('float32'),
'V6': np.dtype('float32'),
'V7': np.dtype('float32'),
'V8': np.dtype('float32'),
'V9': np.dtype('float32'),
'V10': np.dtype('float32'),
'V11': np.dtype('float32'),
'V12': np.dtype('float32'),
'V13': np.dtype('float32'),
'V14': np.dtype('float32'),
'V15': np.dtype('float32'),
'V16': np.dtype('float32'),
'V17': np.dtype('float32'),
'V18': np.dtype('float32'),
'V19': np.dtype('float32'),
'V20': np.dtype('float32'),
'V21': np.dtype('float32'),
'V22': np.dtype('float32'),
'V23': np.dtype('float32'),
'V24': np.dtype('float32'),
'V25': np.dtype('float32'),
'V26': np.dtype('float32'),
'V27': np.dtype('float32'),
'V28': np.dtype('float32'),
'Amount': np.dtype('float32'),
'Class': np.dtype('int8')}
# there are some bad values
# dfx = pd.read_csv(ifile,compression='zip',dtype=dtypes)
# if we use time as np.int32 it will fail, use float32
ddf = dd.read_csv(ifile,compression='zip',
blocksize=None,
dtype=dtypes,
assume_missing=True)
print(ddf.shape)
ddf.head()
(Delayed('int-5f655894-5586-4a71-bd1a-5b1ceaba8a02'), 31)
Time | V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | ... | V21 | V22 | V23 | V24 | V25 | V26 | V27 | V28 | Amount | Class | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0.0 | -1.359807 | -0.072781 | 2.536347 | 1.378155 | -0.338321 | 0.462388 | 0.239599 | 0.098698 | 0.363787 | ... | -0.018307 | 0.277838 | -0.110474 | 0.066928 | 0.128539 | -0.189115 | 0.133558 | -0.021053 | 149.619995 | 0 |
1 | 0.0 | 1.191857 | 0.266151 | 0.166480 | 0.448154 | 0.060018 | -0.082361 | -0.078803 | 0.085102 | -0.255425 | ... | -0.225775 | -0.638672 | 0.101288 | -0.339846 | 0.167170 | 0.125895 | -0.008983 | 0.014724 | 2.690000 | 0 |
2 | 1.0 | -1.358354 | -1.340163 | 1.773209 | 0.379780 | -0.503198 | 1.800499 | 0.791461 | 0.247676 | -1.514654 | ... | 0.247998 | 0.771679 | 0.909412 | -0.689281 | -0.327642 | -0.139097 | -0.055353 | -0.059752 | 378.660004 | 0 |
3 | 1.0 | -0.966272 | -0.185226 | 1.792993 | -0.863291 | -0.010309 | 1.247203 | 0.237609 | 0.377436 | -1.387024 | ... | -0.108300 | 0.005274 | -0.190321 | -1.175575 | 0.647376 | -0.221929 | 0.062723 | 0.061458 | 123.500000 | 0 |
4 | 2.0 | -1.158233 | 0.877737 | 1.548718 | 0.403034 | -0.407193 | 0.095921 | 0.592941 | -0.270533 | 0.817739 | ... | -0.009431 | 0.798279 | -0.137458 | 0.141267 | -0.206010 | 0.502292 | 0.219422 | 0.215153 | 69.989998 | 0 |
5 rows × 31 columns
ddf.npartitions
1
# keep data on memory
ddf = client.persist(ddf)
ddf.tail()
Time | V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | ... | V21 | V22 | V23 | V24 | V25 | V26 | V27 | V28 | Amount | Class | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
284802 | 172786.0 | -11.881118 | 10.071785 | -9.834784 | -2.066656 | -5.364473 | -2.606837 | -4.918215 | 7.305334 | 1.914428 | ... | 0.213454 | 0.111864 | 1.014480 | -0.509348 | 1.436807 | 0.250034 | 0.943651 | 0.823731 | 0.770000 | 0 |
284803 | 172787.0 | -0.732789 | -0.055080 | 2.035030 | -0.738589 | 0.868229 | 1.058415 | 0.024330 | 0.294869 | 0.584800 | ... | 0.214205 | 0.924384 | 0.012463 | -1.016226 | -0.606624 | -0.395255 | 0.068472 | -0.053527 | 24.790001 | 0 |
284804 | 172788.0 | 1.919565 | -0.301254 | -3.249640 | -0.557828 | 2.630515 | 3.031260 | -0.296827 | 0.708417 | 0.432454 | ... | 0.232045 | 0.578229 | -0.037501 | 0.640134 | 0.265745 | -0.087371 | 0.004455 | -0.026561 | 67.879997 | 0 |
284805 | 172788.0 | -0.240440 | 0.530483 | 0.702510 | 0.689799 | -0.377961 | 0.623708 | -0.686180 | 0.679145 | 0.392087 | ... | 0.265245 | 0.800049 | -0.163298 | 0.123205 | -0.569159 | 0.546668 | 0.108821 | 0.104533 | 10.000000 | 0 |
284806 | 172792.0 | -0.533413 | -0.189733 | 0.703337 | -0.506271 | -0.012546 | -0.649617 | 1.577006 | -0.414650 | 0.486180 | ... | 0.261057 | 0.643078 | 0.376777 | 0.008797 | -0.473649 | -0.818267 | -0.002415 | 0.013649 | 217.000000 | 0 |
5 rows × 31 columns
len(ddf), ddf.shape
(284807, (Delayed('int-6415b2e2-10eb-4910-bf5a-c5801a82b715'), 31))
bp.show_methods(ddf,8,exclude=['V'+str(i) for i in range(1,29)])
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | |
---|---|---|---|---|---|---|---|---|
0 | Amount | columns | drop_duplicates | idxmin | max | pipe | rmul | tail |
1 | Class | combine | dropna | iloc | mean | pivot_table | rolling | to_bag |
2 | Time | combine_first | dtypes | index | melt | pop | round | to_csv |
3 | abs | compute | empty | info | memory_usage | pow | rpow | to_dask_array |
4 | add | copy | eq | isin | memory_usage_per_partition | prod | rsub | to_delayed |
5 | align | corr | eval | isna | merge | quantile | rtruediv | to_hdf |
6 | all | count | explode | isnull | min | query | sample | to_html |
7 | any | cov | ffill | items | mod | radd | select_dtypes | to_json |
8 | append | cummax | fillna | iterrows | mode | random_split | sem | to_parquet |
9 | apply | cummin | first | itertuples | mul | rdiv | set_index | to_records |
10 | applymap | cumprod | floordiv | join | ndim | reduction | shape | to_sql |
11 | assign | cumsum | ge | known_divisions | ne | rename | shift | to_string |
12 | astype | dask | get_dtype_counts | last | nlargest | repartition | shuffle | to_timestamp |
13 | bfill | describe | get_ftype_counts | le | notnull | replace | size | truediv |
14 | categorize | diff | get_partition | loc | npartitions | resample | squeeze | values |
15 | clear_divisions | div | groupby | lt | nsmallest | reset_index | std | var |
16 | clip | divide | gt | map_overlap | nunique_approx | rfloordiv | sub | visualize |
17 | clip_lower | divisions | head | map_partitions | partitions | rmod | sum | where |
18 | clip_upper | drop | idxmax | mask | persist |
# make sure there are no nans
ddf.isna().sum().sum().compute()
0
from dask_ml.model_selection import train_test_split
# train_test_split?
target = 'Class'
ddf_Xtrain, ddf_Xtest, dser_ytrain, dser_ytest = train_test_split(
ddf.drop(target,axis=1),
ddf[target],
test_size=0.2,
random_state=SEED,
shuffle=True
)
Xtr = ddf_Xtrain
Xtx = ddf_Xtest
ytr = dser_ytrain
ytx = dser_ytest
type(ddf_Xtrain), type(dser_ytrain)
(dask.dataframe.core.DataFrame, dask.dataframe.core.Series)
# dser_ytrain.value_counts().compute() # normalize parameter does not exit
# dser_ytest.value_counts().compute()
dser_ytrain.compute().value_counts(normalize=True) # make pandas and do value counts
0 0.998246 1 0.001754 Name: Class, dtype: float64
dser_ytest.compute().value_counts(normalize=True)
0 0.998377 1 0.001623 Name: Class, dtype: float64
# dask does not have stratify, but still stratify looks good here.
# train has 99.82% zeros and test has 99.83% zeros.
from dask_ml.xgboost import XGBClassifier
model = XGBClassifier(n_jobs=-1, random_state=SEED,
objective='binary:logistic')
%%time
model.fit(Xtr,ytr)
ypreds = model.predict(Xtx).compute()
ctr = collections.Counter(ypreds)
print(ctr)
CPU times: user 2.44 s, sys: 246 ms, total: 2.69 s Wall time: 1min 1s
XGBClassifier(base_score=0.5, booster='gbtree', colsample_bylevel=1, colsample_bynode=1, colsample_bytree=1, gamma=0, gpu_id=-1, importance_type='gain', interaction_constraints='', learning_rate=0.300000012, max_delta_step=0, max_depth=6, min_child_weight=1, missing=nan, monotone_constraints='()', n_estimators=100, n_jobs=-1, num_parallel_tree=1, random_state=100, reg_alpha=0, reg_lambda=1, scale_pos_weight=1, subsample=1, tree_method='approx', validate_parameters=1, verbosity=None)
from sklearn import metrics
ytx = ytx.compute()
# model evaluation
average = 'binary'
row_eval = ['dask xgboost','default',
metrics.accuracy_score(ytx, ypreds),
metrics.precision_score(ytx, ypreds, average=average),
metrics.recall_score(ytx, ypreds, average=average),
metrics.f1_score(ytx, ypreds, average=average),
metrics.roc_auc_score(ytx, ypreds),
]
df_eval.loc[len(df_eval)] = row_eval
df_eval = df_eval.drop_duplicates()
df_eval
Model | Description | Accuracy | Precision | Recall | F1 | AUC | |
---|---|---|---|---|---|---|---|
0 | dask xgboost | default | 0.001623 | 0.001623 | 1.0 | 0.00324 | 0.5 |
ctr = collections.Counter(ytr)
ctr
Counter({0: 227708, 1: 400})
spw = ctr[0.0]/ctr[1.0]
spw
569.27
%%time
# parameters
params = dict(
learning_rate =0.1,
n_estimators=1000,
max_depth=5,
min_child_weight=1,
gamma=0,
subsample=0.8,
colsample_bytree=0.8,
scale_pos_weight=600,
reg_alpha=0.005,
seed=100
)
# build the model
model = XGBClassifier(n_jobs=-1, random_state=SEED,
objective='binary:logistic',**params)
# fit the model
model.fit(Xtr,ytr)
# prediction
ypreds = model.predict(Xtx).compute()
# model evaluation
acc = metrics.accuracy_score(ytx, ypreds)
f1 = metrics.f1_score(ytx, ypreds, average=average)
print(f'test : {collections.Counter(ytx)}')
print(f'prediction : {collections.Counter(ypreds)}')
print(f'acc={acc:.4f}, f1={f1:.4f}')
print()
test : Counter({0: 56607, 1: 92}) prediction : Counter({1: 56699}) acc=0.0016, f1=0.0032 CPU times: user 17.5 s, sys: 1.69 s, total: 19.2 s Wall time: 7min 27s