The datasets contains transactions made by credit cards in September 2013 by european cardholders.
This dataset presents transactions that occurred in two days, where we have 492 frauds out of 284,807 transactions.
The dataset is highly unbalanced, the positive class (frauds) account for 0.172% of all transactions.
It contains only numerical input variables which are the result of a PCA transformation.
Unfortunately, due to confidentiality issues, we cannot provide the original features and more background information about the data.
Features V1, V2, ... V28 are the principal components obtained with PCA, the only features which have not been transformed with PCA are 'Time' and 'Amount'.
Feature 'Time' contains the seconds elapsed between each transaction and the first transaction in the dataset. The feature 'Amount' is the transaction Amount, this feature can be used for example-dependant cost-senstive learning.
Feature 'Class' is the response variable and it takes value 1 in case of fraud and 0 otherwise.
Remember that Recall = TP / (TP + FN)
. In case of fraud detection,
classifying a fraud as
non-fraud (FN) is more risky so we use the metric recall
to compare the
performances of the models. Higher the recall, better is the model.
The dataset is highly imbalanced. It has 284k non-frauds and 1k frauds. This means out of 1000 transatiosn, 998 are normal and 2 are fraud cases.
Also, we should note that the data is just of two days, we implicitly assume that these two days are represent of the whole trend and reflects the property of the population properly.
The could have been more or less fraudulent transactions in those particular days, but we would not take that into consideration and we generalizes the result. Or, we can say that based on the data from these two days we reached following conclusion and the result is appropriate for the population where the data distribution is similar to that of these two days.
We are more interestd in finding the Fraud cases. i.e. FN (False Negative) cases, predicting fraud as non-fraud is riskier than predicting non-fraud as fraud. So, the suitable metric of model evaluation is RECALL.
In banking, it is always the case that there are a lot of normal transactions, and only few of them are fraudulent. We may train our model with any transformation of the training data, but when testing the model the test set should look like real life, i.e., it has lots of normal cases and very few fraudulent cases.
This means we can train our model using imbalanced or balanced (undersamples or oversampled) but we should test our model on IMBALANCED dataset.
import numpy as np
import pandas as pd
import seaborn as sns
sns.set(color_codes=True)
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
import os
import time
# random state
SEED = 0
RNG = np.random.RandomState(SEED)
# Jupyter notebook settings for pandas
pd.set_option('display.max_columns', 200)
pd.set_option('display.max_rows', 100) # None for all the rows
pd.set_option('display.max_colwidth', 50)
print([(x.__name__,x.__version__) for x in [np, pd,sns,matplotlib]])
[('numpy', '1.17.1'), ('pandas', '0.25.1'), ('seaborn', '0.9.0'), ('matplotlib', '3.1.1')]
%%javascript
IPython.OutputArea.auto_scroll_threshold = 9999;
# pyspark
import pyspark
spark = pyspark.sql.SparkSession.builder.appName('bhishan').getOrCreate()
print([(x.__name__,x.__version__) for x in [np, pd, pyspark]])
[('numpy', '1.17.1'), ('pandas', '0.25.1'), ('pyspark', '2.4.4')]
# pyspark sql
from pyspark.sql.functions import col
from pyspark.sql.functions import udf # @udf("integer") def myfunc(x,y): return x - y
from pyspark.sql import functions as F # stddev format_number date_format, dayofyear, when
from pyspark.sql.types import StructField, StringType, IntegerType, FloatType, StructType, DateType
# pyspark ml feature
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.feature import OneHotEncoder,OneHotEncoderEstimator
from pyspark.ml.feature import Bucketizer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
# classifiers
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.clustering import KMeans
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
# cross validation
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import CrossValidatorModel
# model evaluation
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import ClusteringEvaluator
def show_method_attributes(method, ncols=7):
""" Show all the attributes of a given method.
Example:
========
show_method_attributes(list)
"""
x = [i for i in dir(method) if i[0]!='_' ]
x = [i for i in x
if i not in 'os np pd sys time psycopg2'.split() ]
return pd.DataFrame(np.array_split(x,ncols)).T.fillna('')
df_eval = pd.DataFrame({
'model_name':[],
'desc':[],
'f1':[],
'weightedPrecision':[],
'weightedRecall':[],
'accuracy':[],
'areaUnderROC':[],
'areaUnderPR':[]
})
df_eval
model_name | desc | f1 | weightedPrecision | weightedRecall | accuracy | areaUnderROC | areaUnderPR |
---|
%%bash
# unzip ../data/raw/creditcard.csv.zip -d ../data/raw/
ls ../data/raw
creditcard.csv creditcard.csv.zip
%%bash
head -3 ../data/raw/creditcard.csv
"Time","V1","V2","V3","V4","V5","V6","V7","V8","V9","V10","V11","V12","V13","V14","V15","V16","V17","V18","V19","V20","V21","V22","V23","V24","V25","V26","V27","V28","Amount","Class" 0,-1.3598071336738,-0.0727811733098497,2.53634673796914,1.37815522427443,-0.338320769942518,0.462387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.0257905801985591,0.403992960255733,0.251412098239705,-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215,149.62,"0" 0,1.19185711131486,0.26615071205963,0.16648011335321,0.448154078460911,0.0600176492822243,-0.0823608088155687,-0.0788029833323113,0.0851016549148104,-0.255425128109186,-0.166974414004614,1.61272666105479,1.06523531137287,0.48909501589608,-0.143772296441519,0.635558093258208,0.463917041022171,-0.114804663102346,-0.183361270123994,-0.145783041325259,-0.0690831352230203,-0.225775248033138,-0.638671952771851,0.101288021253234,-0.339846475529127,0.167170404418143,0.125894532368176,-0.00898309914322813,0.0147241691924927,2.69,"0"
%%bash
head -5 ../data/raw/creditcard.csv | cut -d ',' -f 1
tail -5 ../data/raw/creditcard.csv | cut -d ',' -f 1
"Time" 0 0 1 1 172786 172787 172788 172788 172792
df = spark.read.csv('../data/raw/creditcard.csv', header=True, inferSchema=True).cache()
print('nrows = ', df.count(), 'ncols = ', len(df.columns))
df.limit(5).toPandas()
nrows = 284807 ncols = 31
Time | V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 | V11 | V12 | V13 | V14 | V15 | V16 | V17 | V18 | V19 | V20 | V21 | V22 | V23 | V24 | V25 | V26 | V27 | V28 | Amount | Class | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | -1.359807 | -0.072781 | 2.536347 | 1.378155 | -0.338321 | 0.462388 | 0.239599 | 0.098698 | 0.363787 | 0.090794 | -0.551600 | -0.617801 | -0.991390 | -0.311169 | 1.468177 | -0.470401 | 0.207971 | 0.025791 | 0.403993 | 0.251412 | -0.018307 | 0.277838 | -0.110474 | 0.066928 | 0.128539 | -0.189115 | 0.133558 | -0.021053 | 149.62 | 0 |
1 | 0 | 1.191857 | 0.266151 | 0.166480 | 0.448154 | 0.060018 | -0.082361 | -0.078803 | 0.085102 | -0.255425 | -0.166974 | 1.612727 | 1.065235 | 0.489095 | -0.143772 | 0.635558 | 0.463917 | -0.114805 | -0.183361 | -0.145783 | -0.069083 | -0.225775 | -0.638672 | 0.101288 | -0.339846 | 0.167170 | 0.125895 | -0.008983 | 0.014724 | 2.69 | 0 |
2 | 1 | -1.358354 | -1.340163 | 1.773209 | 0.379780 | -0.503198 | 1.800499 | 0.791461 | 0.247676 | -1.514654 | 0.207643 | 0.624501 | 0.066084 | 0.717293 | -0.165946 | 2.345865 | -2.890083 | 1.109969 | -0.121359 | -2.261857 | 0.524980 | 0.247998 | 0.771679 | 0.909412 | -0.689281 | -0.327642 | -0.139097 | -0.055353 | -0.059752 | 378.66 | 0 |
3 | 1 | -0.966272 | -0.185226 | 1.792993 | -0.863291 | -0.010309 | 1.247203 | 0.237609 | 0.377436 | -1.387024 | -0.054952 | -0.226487 | 0.178228 | 0.507757 | -0.287924 | -0.631418 | -1.059647 | -0.684093 | 1.965775 | -1.232622 | -0.208038 | -0.108300 | 0.005274 | -0.190321 | -1.175575 | 0.647376 | -0.221929 | 0.062723 | 0.061458 | 123.50 | 0 |
4 | 2 | -1.158233 | 0.877737 | 1.548718 | 0.403034 | -0.407193 | 0.095921 | 0.592941 | -0.270533 | 0.817739 | 0.753074 | -0.822843 | 0.538196 | 1.345852 | -1.119670 | 0.175121 | -0.451449 | -0.237033 | -0.038195 | 0.803487 | 0.408542 | -0.009431 | 0.798278 | -0.137458 | 0.141267 | -0.206010 | 0.502292 | 0.219422 | 0.215153 | 69.99 | 0 |
df.printSchema()
root |-- Time: decimal(10,0) (nullable = true) |-- V1: double (nullable = true) |-- V2: double (nullable = true) |-- V3: double (nullable = true) |-- V4: double (nullable = true) |-- V5: double (nullable = true) |-- V6: double (nullable = true) |-- V7: double (nullable = true) |-- V8: double (nullable = true) |-- V9: double (nullable = true) |-- V10: double (nullable = true) |-- V11: double (nullable = true) |-- V12: double (nullable = true) |-- V13: double (nullable = true) |-- V14: double (nullable = true) |-- V15: double (nullable = true) |-- V16: double (nullable = true) |-- V17: double (nullable = true) |-- V18: double (nullable = true) |-- V19: double (nullable = true) |-- V20: double (nullable = true) |-- V21: double (nullable = true) |-- V22: double (nullable = true) |-- V23: double (nullable = true) |-- V24: double (nullable = true) |-- V25: double (nullable = true) |-- V26: double (nullable = true) |-- V27: double (nullable = true) |-- V28: double (nullable = true) |-- Amount: double (nullable = true) |-- Class: integer (nullable = true)
print(df.columns)
['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount', 'Class']
feat = 'Amount'
target = 'Class'
df.select([feat,target]).show(5)
+------+-----+ |Amount|Class| +------+-----+ |149.62| 0| | 2.69| 0| |378.66| 0| | 123.5| 0| | 69.99| 0| +------+-----+ only showing top 5 rows
df.select(F.mean(feat), F.max(feat)).show()
+----------------+-----------+ | avg(Amount)|max(Amount)| +----------------+-----------+ |88.3496192509521| 25691.16| +----------------+-----------+
df.select([x(df[feat])
for x in [F.max, F.min ]]).show()
+-----------+-----------+ |max(Amount)|min(Amount)| +-----------+-----------+ | 25691.16| 0.0| +-----------+-----------+
df.select(target).distinct().show()
+-----+ |Class| +-----+ | 1| | 0| +-----+
df.groupBy(target).agg(F.max(feat)).show()
+-----+-----------+ |Class|max(Amount)| +-----+-----------+ | 1| 2125.87| | 0| 25691.16| +-----+-----------+
df.filter(df['Amount']>10000).agg({'Class': 'count'}).show()
+------------+ |count(Class)| +------------+ | 7| +------------+
df.select('Time', 'Amount').filter(df['Class'].isin(0,1)).show(2)
+----+------+ |Time|Amount| +----+------+ | 0|149.62| | 0| 2.69| +----+------+ only showing top 2 rows
grp = df.groupBy("Class").count().sort("Class", ascending=False)
grp.toPandas().plot.bar()
<matplotlib.axes._subplots.AxesSubplot at 0x120a3cb10>
df.drop('Time','Amount','Class').describe().toPandas().set_index('summary').astype(float).round(3)
V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 | V11 | V12 | V13 | V14 | V15 | V16 | V17 | V18 | V19 | V20 | V21 | V22 | V23 | V24 | V25 | V26 | V27 | V28 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
summary | ||||||||||||||||||||||||||||
count | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 | 284807.000 |
mean | 0.000 | 0.000 | -0.000 | 0.000 | 0.000 | 0.000 | -0.000 | -0.000 | -0.000 | 0.000 | -0.000 | -0.000 | 0.000 | 0.000 | 0.000 | 0.000 | -0.000 | 0.000 | 0.000 | 0.000 | 0.000 | -0.000 | 0.000 | 0.000 | 0.000 | 0.000 | -0.000 | -0.000 |
stddev | 1.959 | 1.651 | 1.516 | 1.416 | 1.380 | 1.332 | 1.237 | 1.194 | 1.099 | 1.089 | 1.021 | 0.999 | 0.995 | 0.959 | 0.915 | 0.876 | 0.849 | 0.838 | 0.814 | 0.771 | 0.735 | 0.726 | 0.624 | 0.606 | 0.521 | 0.482 | 0.404 | 0.330 |
min | -56.408 | -72.716 | -48.326 | -5.683 | -113.743 | -26.161 | -43.557 | -73.217 | -13.434 | -24.588 | -4.797 | -18.684 | -5.792 | -19.214 | -4.499 | -14.130 | -25.163 | -9.499 | -7.214 | -54.498 | -34.830 | -10.933 | -44.808 | -2.837 | -10.295 | -2.605 | -22.566 | -15.430 |
max | 2.455 | 22.058 | 9.383 | 16.875 | 34.802 | 73.302 | 120.589 | 20.007 | 15.595 | 23.745 | 12.019 | 7.848 | 7.127 | 10.527 | 8.878 | 17.315 | 9.254 | 5.041 | 5.592 | 39.421 | 27.203 | 10.503 | 22.528 | 4.585 | 7.520 | 3.517 | 31.612 | 33.848 |
show_method_attributes(df.select('Time'))
0 | 1 | 2 | 3 | 4 | 5 | 6 | |
---|---|---|---|---|---|---|---|
0 | agg | createOrReplaceGlobalTempView | dtypes | hint | printSchema | selectExpr | toLocalIterator |
1 | alias | createOrReplaceTempView | exceptAll | intersect | randomSplit | show | toPandas |
2 | approxQuantile | createTempView | explain | intersectAll | rdd | sort | union |
3 | cache | crossJoin | fillna | isLocal | registerTempTable | sortWithinPartitions | unionAll |
4 | checkpoint | crosstab | filter | isStreaming | repartition | sql_ctx | unionByName |
5 | coalesce | cube | first | is_cached | repartitionByRange | stat | unpersist |
6 | colRegex | describe | foreach | join | replace | storageLevel | where |
7 | collect | distinct | foreachPartition | limit | rollup | subtract | withColumn |
8 | columns | drop | freqItems | localCheckpoint | sample | summary | withColumnRenamed |
9 | corr | dropDuplicates | groupBy | na | sampleBy | take | withWatermark |
10 | count | drop_duplicates | groupby | orderBy | schema | toDF | write |
11 | cov | dropna | head | persist | select | toJSON | writeStream |
12 | createGlobalTempView |
df.select('Time').show(3)
+----+ |Time| +----+ | 0| | 0| | 1| +----+ only showing top 3 rows
df.select('Time').describe().show()
+-------+------------------+ |summary| Time| +-------+------------------+ | count| 284807| | mean| 94813.8596| | stddev|47488.145954566324| | min| 0| | max| 172792| +-------+------------------+
time_max = df.select(F.max('Time')).collect()[0][0]
time_max / 60 / 60 / 24 # we have data of two days.
Decimal('1.999907407407407407407407408')
from pyspark.sql.functions import expr, hour
df_time_hour = (df.select('Time').withColumn('Date',
expr("timestamp(unix_timestamp('2019-01-01 00:00:00') + Time)"))
.withColumn('hour', hour('Date')))
df_time_hour.show(n=5,truncate=False)
+----+-------------------+----+ |Time|Date |hour| +----+-------------------+----+ |0 |2019-01-01 00:00:00|0 | |0 |2019-01-01 00:00:00|0 | |1 |2019-01-01 00:00:01|0 | |1 |2019-01-01 00:00:01|0 | |2 |2019-01-01 00:00:02|0 | +----+-------------------+----+ only showing top 5 rows
grp = df_time_hour.groupBy('hour').count().sort('hour',ascending=False)
grp.toPandas().plot.bar()
<matplotlib.axes._subplots.AxesSubplot at 0x120b13390>
df.select('Amount').toPandas().plot.hist(bins=100)
<matplotlib.axes._subplots.AxesSubplot at 0x120ab6c10>
df.select('Amount').describe().show()
+-------+-----------------+ |summary| Amount| +-------+-----------------+ | count| 284807| | mean| 88.3496192509521| | stddev|250.1201092401885| | min| 0.0| | max| 25691.16| +-------+-----------------+
# there is no easy way to get max or min in pyspark
amt_min = df.agg({"Amount": "min"}).collect()[0][0]
amt_max = df.agg({"Amount": "max"}).collect()[0][0]
amt_min, amt_max
(0.0, 25691.16)
splits = np.linspace(amt_min, amt_max+1, 10)
splits
array([ 0. , 2854.68444444, 5709.36888889, 8564.05333333, 11418.73777778, 14273.42222222, 17128.10666667, 19982.79111111, 22837.47555556, 25692.16 ])
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=splits,inputCol="Amount", outputCol="Amount_cat")
df = bucketizer.setHandleInvalid("keep").transform(df)
df.select('Amount','Amount_cat').show(5)
+------+----------+ |Amount|Amount_cat| +------+----------+ |149.62| 0.0| | 2.69| 0.0| |378.66| 0.0| | 123.5| 0.0| | 69.99| 0.0| +------+----------+ only showing top 5 rows
cols_cat = ['Amount_cat']
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index")
for column in cols_cat]
encoder = OneHotEncoderEstimator(
inputCols=[indexer.getOutputCol() for indexer in indexers],
outputCols=["{0}_encoded".format(indexer.getOutputCol()) for indexer in indexers]
)
assembler = VectorAssembler(
inputCols=encoder.getOutputCols(),
outputCol="Amount_cat_vec"
)
pipeline = Pipeline(stages=indexers + [encoder, assembler])
df = pipeline.fit(df).transform(df)
df.printSchema()
root |-- Time: decimal(10,0) (nullable = true) |-- V1: double (nullable = true) |-- V2: double (nullable = true) |-- V3: double (nullable = true) |-- V4: double (nullable = true) |-- V5: double (nullable = true) |-- V6: double (nullable = true) |-- V7: double (nullable = true) |-- V8: double (nullable = true) |-- V9: double (nullable = true) |-- V10: double (nullable = true) |-- V11: double (nullable = true) |-- V12: double (nullable = true) |-- V13: double (nullable = true) |-- V14: double (nullable = true) |-- V15: double (nullable = true) |-- V16: double (nullable = true) |-- V17: double (nullable = true) |-- V18: double (nullable = true) |-- V19: double (nullable = true) |-- V20: double (nullable = true) |-- V21: double (nullable = true) |-- V22: double (nullable = true) |-- V23: double (nullable = true) |-- V24: double (nullable = true) |-- V25: double (nullable = true) |-- V26: double (nullable = true) |-- V27: double (nullable = true) |-- V28: double (nullable = true) |-- Amount: double (nullable = true) |-- Class: integer (nullable = true) |-- Amount_cat: double (nullable = true) |-- Amount_cat_index: double (nullable = false) |-- Amount_cat_index_encoded: vector (nullable = true) |-- Amount_cat_vec: vector (nullable = true)
feat = 'Amount'
df.select(feat,f'{feat}_cat',f'{feat}_cat_index',f'{feat}_cat_index_encoded').show(5)
+------+----------+----------------+------------------------+ |Amount|Amount_cat|Amount_cat_index|Amount_cat_index_encoded| +------+----------+----------------+------------------------+ |149.62| 0.0| 0.0| (6,[0],[1.0])| | 2.69| 0.0| 0.0| (6,[0],[1.0])| |378.66| 0.0| 0.0| (6,[0],[1.0])| | 123.5| 0.0| 0.0| (6,[0],[1.0])| | 69.99| 0.0| 0.0| (6,[0],[1.0])| +------+----------+----------------+------------------------+ only showing top 5 rows
We generally perform log or boxcox transformation of features with large number to make it look like more Gaussian.
df.select('Amount').withColumn('log1p_Amount', F.log1p("Amount")).show(2)
+------+------------------+ |Amount| log1p_Amount| +------+------------------+ |149.62| 5.014760108673205| | 2.69|1.3056264580524357| +------+------------------+ only showing top 2 rows
np.log1p(149.62)
5.014760108673205
cols_log = ['Time', 'Amount']
for c in cols_log:
df = df.withColumn('log1p_' + c , F.log1p(c))
print(df.columns)
['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount', 'Class', 'Amount_cat', 'Amount_cat_index', 'Amount_cat_index_encoded', 'Amount_cat_vec', 'log1p_Time', 'log1p_Amount']
df.select(['log1p_Time', 'log1p_Amount']).show(2)
+----------+------------------+ |log1p_Time| log1p_Amount| +----------+------------------+ | 0.0| 5.014760108673205| | 0.0|1.3056264580524357| +----------+------------------+ only showing top 2 rows
cols_after_log = ['log1p_' + i for i in cols_log]
cols_after_log
['log1p_Time', 'log1p_Amount']
print(df.columns)
['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount', 'Class', 'Amount_cat', 'Amount_cat_index', 'Amount_cat_index_encoded', 'Amount_cat_vec', 'log1p_Time', 'log1p_Amount']
selected_features = [ 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9',
'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17',
'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25',
'V26', 'V27', 'V28',
'log1p_Time', 'log1p_Amount']
assembler = VectorAssembler(
inputCols= selected_features,
outputCol='features'
)
df = assembler.transform(df)
df.printSchema()
root |-- Time: decimal(10,0) (nullable = true) |-- V1: double (nullable = true) |-- V2: double (nullable = true) |-- V3: double (nullable = true) |-- V4: double (nullable = true) |-- V5: double (nullable = true) |-- V6: double (nullable = true) |-- V7: double (nullable = true) |-- V8: double (nullable = true) |-- V9: double (nullable = true) |-- V10: double (nullable = true) |-- V11: double (nullable = true) |-- V12: double (nullable = true) |-- V13: double (nullable = true) |-- V14: double (nullable = true) |-- V15: double (nullable = true) |-- V16: double (nullable = true) |-- V17: double (nullable = true) |-- V18: double (nullable = true) |-- V19: double (nullable = true) |-- V20: double (nullable = true) |-- V21: double (nullable = true) |-- V22: double (nullable = true) |-- V23: double (nullable = true) |-- V24: double (nullable = true) |-- V25: double (nullable = true) |-- V26: double (nullable = true) |-- V27: double (nullable = true) |-- V28: double (nullable = true) |-- Amount: double (nullable = true) |-- Class: integer (nullable = true) |-- Amount_cat: double (nullable = true) |-- Amount_cat_index: double (nullable = false) |-- Amount_cat_index_encoded: vector (nullable = true) |-- Amount_cat_vec: vector (nullable = true) |-- log1p_Time: double (nullable = true) |-- log1p_Amount: double (nullable = true) |-- features: vector (nullable = true)
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol='features', outputCol='features_scaled')
df = scaler.fit(df).transform(df)
df.select(["features","features_scaled"]).limit(2).toPandas()
features | features_scaled | |
---|---|---|
0 | [-1.3598071336738, -0.0727811733098497, 2.5363... | [-0.6942411021638845, -0.04407484719355172, 1.... |
1 | [1.19185711131486, 0.26615071205963, 0.1664801... | [0.6084952594310945, 0.16117563692663966, 0.10... |
train, test = df.randomSplit([0.8,0.2], seed=random_state)
train.count(), test.count()
(227519, 57288)
from pyspark.ml.classification import LogisticRegression
clf_lr = LogisticRegression(featuresCol='features_scaled',
labelCol='Class',
predictionCol='prediction',
maxIter=10).fit(train)
show_method_attributes(clf_lr,7)
0 | 1 | 2 | 3 | 4 | 5 | 6 | |
---|---|---|---|---|---|---|---|
0 | aggregationDepth | explainParams | hasDefault | labelCol | params | set | transform |
1 | coefficientMatrix | extractParamMap | hasParam | load | predictionCol | standardization | uid |
2 | coefficients | family | hasSummary | lowerBoundsOnCoefficients | probabilityCol | summary | upperBoundsOnCoefficients |
3 | copy | featuresCol | intercept | lowerBoundsOnIntercepts | rawPredictionCol | threshold | upperBoundsOnIntercepts |
4 | elasticNetParam | fitIntercept | interceptVector | maxIter | read | thresholds | weightCol |
5 | evaluate | getOrDefault | isDefined | numClasses | regParam | tol | write |
6 | explainParam | getParam | isSet | numFeatures | save |
results_lr = clf_lr.transform(test)
preds_lr = results_lr.select('prediction')
preds_lr.groupBy('prediction').count().show()
+----------+-----+ |prediction|count| +----------+-----+ | 0.0|57215| | 1.0| 73| +----------+-----+
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/evaluation.html
Binary Evualuator metrics: areaUnderROC|areaUnderPR
Multiclass Evaluator metrics: f1|weightedPrecision|weightedRecall|accuracy
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import ClusteringEvaluator
def get_classification_eval_metrics(results,label_col,pred_col='prediction'):
"""Get Classification Evaluation Metrics for PySpark Modelling.
"""
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# binary classification
bc_metrics = ['areaUnderROC', 'areaUnderPR']
bc_metrics_values = [BinaryClassificationEvaluator(
rawPredictionCol=pred_col,
labelCol=label_col,
metricName=m).evaluate(results)
for m in bc_metrics ]
bc_df_metrics = pd.DataFrame({'metric': bc_metrics,
'value': bc_metrics_values})
# multiclass classification
mc_metrics = ['f1', 'weightedPrecision', 'weightedRecall', 'accuracy']
mc_metrics_values = [ MulticlassClassificationEvaluator(
predictionCol=pred_col,
labelCol=label_col,
metricName=m).evaluate(results)
for m in mc_metrics ]
mc_df_metrics = pd.DataFrame({'metric': mc_metrics,
'value': mc_metrics_values})
# combine two dataframes
df_metrics = bc_df_metrics.append(mc_df_metrics).set_index('metric')
return df_metrics
dfe = get_classification_eval_metrics(results_lr,'Class',pred_col='prediction')
row = dfe.values.ravel().tolist()
df_eval.loc[len(df_eval)] = ['Logistic Regression','log features'] + row
df_eval.drop_duplicates().sort_values('weightedRecall')
value | |
---|---|
metric | |
areaUnderROC | 0.846066 |
areaUnderPR | 0.730487 |
f1 | 0.999300 |
weightedPrecision | 0.999294 |
weightedRecall | 0.999337 |
accuracy | 0.999337 |
Evaluator for Clustering results expects
two input columns: prediction and features.
The metric computes the Silhouette measure using the squared Euclidean distance.
The Silhouette is a measure for the validation of the consistency
within clusters. It ranges between 1 and -1, where a value close to
1 means that the points in a cluster are close to the other points
in the same cluster and far from the points of the other clusters.
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
clf_knn = KMeans(featuresCol='features_scaled',
#labelCol='Class', # labelCol does not exist for KMeans
seed=random_state,
predictionCol='prediction',
k=2).fit(train)
results_knn = clf_knn.transform(test)
preds_knn = results_knn.select('prediction')
preds_knn.groupBy('prediction').count().show()
+----------+-----+ |prediction|count| +----------+-----+ | 1|53836| | 0| 3452| +----------+-----+
cluster_eval = ClusteringEvaluator(featuresCol='features_scaled',
#labelCol='Class', # label col does not exist for KNN
predictionCol='prediction',
distanceMeasure="squaredEuclidean",
metricName="silhouette")
cluster_eval.evaluate(results_knn)
0.16745838107517577
from pyspark.ml.classification import DecisionTreeClassifier
clf_dt = DecisionTreeClassifier(featuresCol='features_scaled',
labelCol='Class',
seed=random_state,
predictionCol='prediction',
).fit(train)
show_method_attributes(clf_dt)
0 | 1 | 2 | 3 | 4 | 5 | 6 | |
---|---|---|---|---|---|---|---|
0 | cacheNodeIds | extractParamMap | hasParam | maxBins | numFeatures | rawPredictionCol | thresholds |
1 | checkpointInterval | featureImportances | impurity | maxDepth | numNodes | read | toDebugString |
2 | copy | featuresCol | isDefined | maxMemoryInMB | params | save | transform |
3 | depth | getOrDefault | isSet | minInfoGain | predictionCol | seed | uid |
4 | explainParam | getParam | labelCol | minInstancesPerNode | probabilityCol | set | write |
5 | explainParams | hasDefault | load | numClasses |
results_dt = clf_dt.transform(test)
preds_dt = results_dt.select('prediction')
preds_dt.groupBy('prediction').count().show()
+----------+-----+ |prediction|count| +----------+-----+ | 0.0|57212| | 1.0| 76| +----------+-----+
dfe = get_classification_eval_metrics(results_dt,'Class',pred_col='prediction')
row = dfe.values.ravel().tolist()
df_eval.loc[len(df_eval)] = ['Decision Tree Classifier','log features'] + row
df_eval.drop_duplicates().sort_values('weightedRecall')
model_name | desc | f1 | weightedPrecision | weightedRecall | accuracy | areaUnderROC | areaUnderPR | |
---|---|---|---|---|---|---|---|---|
0 | Logistic Regression | log features | 0.846066 | 0.730487 | 0.999300 | 0.999294 | 0.999337 | 0.999337 |
3 | Decision Tree Classifier | log features | 0.868053 | 0.765536 | 0.999398 | 0.999393 | 0.999424 | 0.999424 |
df_feature_imp = pd.DataFrame(clf_dt.featureImportances.toArray(),
columns=['feature_importance'],
index=selected_features)
df_feature_imp.loc[lambda x: x['feature_importance'] >0]\
.sort_values('feature_importance',ascending=False)\
.plot.bar()
<matplotlib.axes._subplots.AxesSubplot at 0x122807d90>
from pyspark.ml.classification import RandomForestClassifier
clf_rfc = RandomForestClassifier(featuresCol='features_scaled',
labelCol='Class',
seed=random_state,
predictionCol='prediction',
).fit(train)
show_method_attributes(clf_rfc)
0 | 1 | 2 | 3 | 4 | 5 | 6 | |
---|---|---|---|---|---|---|---|
0 | cacheNodeIds | featureSubsetStrategy | impurity | maxDepth | numTrees | save | totalNumNodes |
1 | checkpointInterval | featuresCol | isDefined | maxMemoryInMB | params | seed | transform |
2 | copy | getNumTrees | isSet | minInfoGain | predictionCol | set | treeWeights |
3 | explainParam | getOrDefault | labelCol | minInstancesPerNode | probabilityCol | subsamplingRate | trees |
4 | explainParams | getParam | load | numClasses | rawPredictionCol | thresholds | uid |
5 | extractParamMap | hasDefault | maxBins | numFeatures | read | toDebugString | write |
6 | featureImportances | hasParam |
results_rfc = clf_rfc.transform(test)
preds_rfc = results_rfc.select('prediction')
preds_rfc.groupBy('prediction').count().show()
+----------+-----+ |prediction|count| +----------+-----+ | 0.0|57209| | 1.0| 79| +----------+-----+
dfe = get_classification_eval_metrics(results_rfc,'Class',pred_col='prediction')
row = dfe.values.ravel().tolist()
df_eval.loc[len(df_eval)] = ['Random Forest Classifier','log features'] + row
df_eval.drop_duplicates().sort_values('weightedRecall')
model_name | desc | f1 | weightedPrecision | weightedRecall | accuracy | areaUnderROC | areaUnderPR | |
---|---|---|---|---|---|---|---|---|
0 | Logistic Regression | log features | 0.846066 | 0.730487 | 0.999300 | 0.999294 | 0.999337 | 0.999337 |
3 | Decision Tree Classifier | log features | 0.868053 | 0.765536 | 0.999398 | 0.999393 | 0.999424 | 0.999424 |
4 | Random Forest Classifier | log features | 0.890040 | 0.800147 | 0.999494 | 0.999490 | 0.999511 | 0.999511 |
df_feature_imp = pd.DataFrame(clf_rfc.featureImportances.toArray(),
columns=['feature_importance'],
index=selected_features)
df_feature_imp.loc[lambda x: x['feature_importance'] >0]\
.sort_values('feature_importance',ascending=False)\
.plot.bar()
<matplotlib.axes._subplots.AxesSubplot at 0x122803490>
print(train.columns)
['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount', 'Class', 'Amount_cat', 'Amount_cat_index', 'Amount_cat_index_encoded', 'Amount_cat_vec', 'log1p_Time', 'log1p_Amount', 'features', 'features_scaled']
print(selected_features)
['V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'log1p_Time', 'log1p_Amount']
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
import time
t0 = time.time()
clf_rfc = RandomForestClassifier(featuresCol='features_scaled',
labelCol = "Class",
numTrees = 50,
seed=random_state)
grid_depths = [10,20]
paramGrid = ParamGridBuilder()\
.addGrid(clf_rfc.maxDepth, grid_depths)\
.addGrid(clf_rfc.minInstancesPerNode, grid_depths)\
.build()
pipeline = Pipeline(stages = [clf_rfc])
evaluator = MulticlassClassificationEvaluator(labelCol = "Class",
metricName = "weightedRecall")
crossval = CrossValidator(estimator = pipeline,
estimatorParamMaps = paramGrid,
evaluator = evaluator,
seed = random_state,
parallelism=50, # number of threads to use
numFolds = 5)
# model_cv = crossval.fit(train)
t1 = time.time() - t0
print('Time taken: {:.0f} min {:.0f} secs'.format(*divmod(t1,60)))
# Time taken: Time taken: 8 min 16 secs
Exception ignored in: <function JavaWrapper.__del__ at 0x11ddf3440> Traceback (most recent call last): File "/Users/poudel/miniconda3/envs/spk/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 40, in __del__ if SparkContext._active_spark_context and self._java_obj is not None: AttributeError: 'MulticlassClassificationEvaluator' object has no attribute '_java_obj'
Time taken: 8 min 16 secs
show_method_attributes(model_cv,7)
0 | 1 | 2 | 3 | 4 | 5 | 6 | |
---|---|---|---|---|---|---|---|
0 | avgMetrics | evaluator | getEstimatorParamMaps | hasDefault | params | setEstimator | subModels |
1 | bestModel | explainParam | getEvaluator | hasParam | read | setEstimatorParamMaps | transform |
2 | copy | explainParams | getOrDefault | isDefined | save | setEvaluator | uid |
3 | estimator | extractParamMap | getParam | isSet | seed | setSeed | write |
4 | estimatorParamMaps | getEstimator | getSeed | load | set |
# model_cv.save('../models/serialization/pyspark_clf_rfc_grid_search.pkl')
# AttributeError: 'Pipeline' object has no attribute '_transfer_param_map_to_java'
results_rfc_grid = model_cv.transform(test)
print(results_rfc_grid.columns)
['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount', 'Class', 'Amount_cat', 'Amount_cat_index', 'Amount_cat_index_encoded', 'Amount_cat_vec', 'log1p_Time', 'log1p_Amount', 'features', 'features_scaled', 'rawPrediction', 'probability', 'prediction']
evaluator.evaluate(results_rfc_grid)
0.9995636084345761
model_cv.bestModel.stages
[RandomForestClassificationModel (uid=RandomForestClassifier_dae08062b786) with 50 trees]
model_cv.params
[Param(parent='CrossValidatorModel_e443802db3db', name='estimator', doc='estimator to be cross-validated'), Param(parent='CrossValidatorModel_e443802db3db', name='estimatorParamMaps', doc='estimator param maps'), Param(parent='CrossValidatorModel_e443802db3db', name='evaluator', doc='evaluator used to select hyper-parameters that maximize the validator metric'), Param(parent='CrossValidatorModel_e443802db3db', name='seed', doc='random seed.')]
dfe = get_classification_eval_metrics(results_rfc_grid,'Class',pred_col='prediction')
row = dfe.values.ravel().tolist()
df_eval.loc[len(df_eval)] = ['Random Forest Classifier','log features, grid search'] + row
df_eval.drop_duplicates().sort_values('weightedRecall')
model_name | desc | f1 | weightedPrecision | weightedRecall | accuracy | areaUnderROC | areaUnderPR | |
---|---|---|---|---|---|---|---|---|
0 | Logistic Regression | log features | 0.846066 | 0.730487 | 0.999300 | 0.999294 | 0.999337 | 0.999337 |
3 | Decision Tree Classifier | log features | 0.868053 | 0.765536 | 0.999398 | 0.999393 | 0.999424 | 0.999424 |
4 | Random Forest Classifier | log features | 0.890040 | 0.800147 | 0.999494 | 0.999490 | 0.999511 | 0.999511 |
6 | Random Forest Classifier | log features, grid search | 0.884580 | 0.836982 | 0.999541 | 0.999548 | 0.999564 | 0.999564 |
model_cv.bestModel.stages[-1].featureImportances
SparseVector(30, {0: 0.0022, 1: 0.0085, 2: 0.0628, 3: 0.0334, 4: 0.0116, 5: 0.0056, 6: 0.1099, 7: 0.0146, 8: 0.0774, 9: 0.0748, 10: 0.0384, 11: 0.1133, 12: 0.0055, 13: 0.0883, 14: 0.008, 15: 0.0818, 16: 0.1451, 17: 0.0257, 18: 0.0024, 19: 0.0077, 20: 0.0058, 21: 0.0085, 22: 0.0058, 23: 0.0087, 24: 0.0019, 25: 0.0145, 26: 0.006, 27: 0.0077, 28: 0.0122, 29: 0.012})
prediction.schema["features"].metadata["ml_attr"]["attrs"]
{'numeric': [{'idx': 0, 'name': 'V1'}, {'idx': 1, 'name': 'V2'}, {'idx': 2, 'name': 'V3'}, {'idx': 3, 'name': 'V4'}, {'idx': 4, 'name': 'V5'}, {'idx': 5, 'name': 'V6'}, {'idx': 6, 'name': 'V7'}, {'idx': 7, 'name': 'V8'}, {'idx': 8, 'name': 'V9'}, {'idx': 9, 'name': 'V10'}, {'idx': 10, 'name': 'V11'}, {'idx': 11, 'name': 'V12'}, {'idx': 12, 'name': 'V13'}, {'idx': 13, 'name': 'V14'}, {'idx': 14, 'name': 'V15'}, {'idx': 15, 'name': 'V16'}, {'idx': 16, 'name': 'V17'}, {'idx': 17, 'name': 'V18'}, {'idx': 18, 'name': 'V19'}, {'idx': 19, 'name': 'V20'}, {'idx': 20, 'name': 'V21'}, {'idx': 21, 'name': 'V22'}, {'idx': 22, 'name': 'V23'}, {'idx': 23, 'name': 'V24'}, {'idx': 24, 'name': 'V25'}, {'idx': 25, 'name': 'V26'}, {'idx': 26, 'name': 'V27'}, {'idx': 27, 'name': 'V28'}, {'idx': 28, 'name': 'log1p_Time'}, {'idx': 29, 'name': 'log1p_Amount'}]}
# https://stackoverflow.com/questions/42935914/how-to-map-features-from-the-output-of-a-vectorassembler-back-to-the-column-name
def get_feature_importance_df(model_cv, dataset, featuresCol):
feat_imp = model_cv.bestModel.stages[-1].featureImportances
list_extract = []
for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
list_extract += dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
pandas_df_feat_imp = pd.DataFrame(list_extract)
pandas_df_feat_imp['importance'] = pandas_df_feat_imp['idx'].apply(lambda x: feat_imp[x])
pandas_df_feat_imp = pandas_df_feat_imp.sort_values('importance', ascending = False)
return pandas_df_feat_imp
pandas_df_feat_imp = get_feature_importance_df(model_cv, prediction, "features")
pandas_df_feat_imp.style.background_gradient(subset=['importance'])
idx | name | importance | |
---|---|---|---|
16 | 16 | V17 | 0.1451 |
11 | 11 | V12 | 0.1133 |
6 | 6 | V7 | 0.109919 |
13 | 13 | V14 | 0.088303 |
15 | 15 | V16 | 0.0818398 |
8 | 8 | V9 | 0.0773547 |
9 | 9 | V10 | 0.0747522 |
2 | 2 | V3 | 0.0628027 |
10 | 10 | V11 | 0.0383619 |
3 | 3 | V4 | 0.0334011 |
17 | 17 | V18 | 0.0256938 |
7 | 7 | V8 | 0.0145529 |
25 | 25 | V26 | 0.0145493 |
28 | 28 | log1p_Time | 0.0121996 |
29 | 29 | log1p_Amount | 0.0120428 |
4 | 4 | V5 | 0.0115613 |
23 | 23 | V24 | 0.00865824 |
1 | 1 | V2 | 0.00851461 |
21 | 21 | V22 | 0.00848692 |
14 | 14 | V15 | 0.00796113 |
19 | 19 | V20 | 0.00774602 |
27 | 27 | V28 | 0.00773218 |
26 | 26 | V27 | 0.00596056 |
20 | 20 | V21 | 0.00583038 |
22 | 22 | V23 | 0.00577398 |
5 | 5 | V6 | 0.00555172 |
12 | 12 | V13 | 0.00552945 |
18 | 18 | V19 | 0.00241755 |
0 | 0 | V1 | 0.00218484 |
24 | 24 | V25 | 0.00191832 |
results_rfc.select('prediction','Class').show(5)
+----------+-----+ |prediction|Class| +----------+-----+ | 0.0| 0| | 0.0| 0| | 0.0| 0| | 0.0| 0| | 0.0| 0| +----------+-----+ only showing top 5 rows
def pyspark_confusion_matrix(model_name, results,
label_col='Class',pred_col='prediction'):
tp = results[(results[label_col] == 1) & (results[pred_col] == 1.0)].count()
tn = results[(results[label_col] == 0) & (results[pred_col] == 0.0)].count()
fp = results[(results[label_col] == 0) & (results[pred_col] == 1.0)].count()
fn = results[(results[label_col] == 1) & (results[pred_col] == 0.0)].count()
total = tp + tn + fp + fn
recall = float(tp)/(tp + fn)
precision = float(tp) / (tp + fp)
print('Model name : ', model_name)
print("True Positive : ", tp)
print("True Negatives : ", tn)
print()
print("False Positives: ", fp)
print("False Negatives: ", fn)
print()
print("Total : ", total)
print("Precison : ", round(precision,3))
print("Recall : ", round(recall,3))
pyspark_confusion_matrix('Random Forest, grid search', results_rfc_grid)
Model name : Random Forest, grid search True Positive : 70 True Negatives : 57193 False Positives: 4 False Negatives: 21 Total : 57288 Precison : 0.946 Recall : 0.769
cm = np.array([[tn,fp],
[fn,tp]
])
cm
array([[57189, 8], [ 20, 71]])
from sklearn.metrics import confusion_matrix
original_ytest = results_rfc.select('Class').toPandas().values
y_preds = results_rfc.select('prediction').toPandas().values.astype(int)
confusion_matrix(original_ytest,y_preds)
array([[57189, 8], [ 20, 71]])
# help(confusion_matrix)