Description

The Allstate Corporation is an American insurance company that is in the United States. The company also has personal lines insurance operations in Canada.

Data Source Kaggle: https://www.kaggle.com/c/allstate-claims-severity/data

Imports

In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
pd.plotting.register_matplotlib_converters()

from tqdm import tqdm_notebook as tqdm
import matplotlib.pyplot as plt

%matplotlib inline
%config InlineBackend.figure_format = 'retina'
plt.style.use('ggplot') 

# random state
SEED=100
np.random.seed(SEED)

[(x.__name__,x.__version__) for x in [np,pd,sns]]
/Users/poudel/miniconda3/envs/dataSc/lib/python3.7/site-packages/statsmodels/tools/_testing.py:19: FutureWarning: pandas.util.testing is deprecated. Use the functions in the public API at pandas.testing instead.
  import pandas.util.testing as tm
Out[1]:
[('numpy', '1.18.1'), ('pandas', '1.0.1'), ('seaborn', '0.9.0')]
In [2]:
import re
from datetime import datetime
In [3]:
# pyspark
import pyspark
spark = pyspark.sql\
          .SparkSession\
          .builder\
          .appName('app')\
          .getOrCreate()

# sql
from pyspark.sql.functions import col as _col
from pyspark.sql.functions import udf

# @udf("integer") def myfunc(x,y): return x - y
# stddev format_number date_format, dayofyear, when
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import (mean as _mean, min as _min,
                                   max as _max, avg as _avg,
                                   when as _when
                                  )

from pyspark.sql.types import (StructField,StringType,
                               IntegerType, FloatType,
                               DoubleType,StructType)

from pyspark import SparkConf, SparkContext, SQLContext

sc = spark.sparkContext
sqlContext = SQLContext(sc) 
# spark_df = sqlContext.createDataFrame(pandas_df)
In [4]:
# pyspark ml feature
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.feature import OneHotEncoder,OneHotEncoderEstimator
from pyspark.ml.feature import Bucketizer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline, PipelineModel
In [5]:
# regressors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
In [6]:
# cross validation
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import CrossValidatorModel
In [7]:
# model evaluation regression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RegressionMetrics
In [8]:
%load_ext autoreload
In [9]:
%autoreload 2
In [10]:
# my personal module
from bhishan import bp

Load the data

In [11]:
!ls ../data
outputs   processed raw
In [12]:
!head -2 ../data/raw/train.csv
id,cat1,cat2,cat3,cat4,cat5,cat6,cat7,cat8,cat9,cat10,cat11,cat12,cat13,cat14,cat15,cat16,cat17,cat18,cat19,cat20,cat21,cat22,cat23,cat24,cat25,cat26,cat27,cat28,cat29,cat30,cat31,cat32,cat33,cat34,cat35,cat36,cat37,cat38,cat39,cat40,cat41,cat42,cat43,cat44,cat45,cat46,cat47,cat48,cat49,cat50,cat51,cat52,cat53,cat54,cat55,cat56,cat57,cat58,cat59,cat60,cat61,cat62,cat63,cat64,cat65,cat66,cat67,cat68,cat69,cat70,cat71,cat72,cat73,cat74,cat75,cat76,cat77,cat78,cat79,cat80,cat81,cat82,cat83,cat84,cat85,cat86,cat87,cat88,cat89,cat90,cat91,cat92,cat93,cat94,cat95,cat96,cat97,cat98,cat99,cat100,cat101,cat102,cat103,cat104,cat105,cat106,cat107,cat108,cat109,cat110,cat111,cat112,cat113,cat114,cat115,cat116,cont1,cont2,cont3,cont4,cont5,cont6,cont7,cont8,cont9,cont10,cont11,cont12,cont13,cont14,loss
1,A,B,A,B,A,A,A,A,B,A,B,A,A,A,A,A,A,A,A,A,A,A,B,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,A,B,A,D,B,B,D,D,B,D,C,B,D,B,A,A,A,A,A,D,B,C,E,A,C,T,B,G,A,A,I,E,G,J,G,BU,BC,C,AS,S,A,O,LB,0.7263,0.245921,0.187583,0.789639,0.310061,0.718367,0.33506,0.3026,0.67135,0.8351,0.569745,0.594646,0.822493,0.714843,2213.18
In [13]:
df_train_raw = spark.read.csv('../data/raw/train.csv',
                          header=True,
                          inferSchema=True).cache()

print(f"""nrows = {df_train_raw.count():,d}
          ncols =  {len(df_train_raw.columns):,d}""" )

df_train_raw.limit(5).toPandas()
nrows = 188,318
          ncols =  132
Out[13]:
id cat1 cat2 cat3 cat4 cat5 cat6 cat7 cat8 cat9 ... cont6 cont7 cont8 cont9 cont10 cont11 cont12 cont13 cont14 loss
0 1 A B A B A A A A B ... 0.718367 0.335060 0.30260 0.67135 0.83510 0.569745 0.594646 0.822493 0.714843 2213.18
1 2 A B A A A A A A B ... 0.438917 0.436585 0.60087 0.35127 0.43919 0.338312 0.366307 0.611431 0.304496 1283.60
2 5 A B A A B A A A B ... 0.289648 0.315545 0.27320 0.26076 0.32446 0.381398 0.373424 0.195709 0.774425 3005.09
3 10 B B A B A A A A B ... 0.440945 0.391128 0.31796 0.32128 0.44467 0.327915 0.321570 0.605077 0.602642 939.85
4 11 A B A B A A A A B ... 0.178193 0.247408 0.24564 0.22089 0.21230 0.204687 0.202213 0.246011 0.432606 2763.85

5 rows × 132 columns

In [14]:
# df_train_raw.printSchema()
In [15]:
print(df_train_raw.columns)
['id', 'cat1', 'cat2', 'cat3', 'cat4', 'cat5', 'cat6', 'cat7', 'cat8', 'cat9', 'cat10', 'cat11', 'cat12', 'cat13', 'cat14', 'cat15', 'cat16', 'cat17', 'cat18', 'cat19', 'cat20', 'cat21', 'cat22', 'cat23', 'cat24', 'cat25', 'cat26', 'cat27', 'cat28', 'cat29', 'cat30', 'cat31', 'cat32', 'cat33', 'cat34', 'cat35', 'cat36', 'cat37', 'cat38', 'cat39', 'cat40', 'cat41', 'cat42', 'cat43', 'cat44', 'cat45', 'cat46', 'cat47', 'cat48', 'cat49', 'cat50', 'cat51', 'cat52', 'cat53', 'cat54', 'cat55', 'cat56', 'cat57', 'cat58', 'cat59', 'cat60', 'cat61', 'cat62', 'cat63', 'cat64', 'cat65', 'cat66', 'cat67', 'cat68', 'cat69', 'cat70', 'cat71', 'cat72', 'cat73', 'cat74', 'cat75', 'cat76', 'cat77', 'cat78', 'cat79', 'cat80', 'cat81', 'cat82', 'cat83', 'cat84', 'cat85', 'cat86', 'cat87', 'cat88', 'cat89', 'cat90', 'cat91', 'cat92', 'cat93', 'cat94', 'cat95', 'cat96', 'cat97', 'cat98', 'cat99', 'cat100', 'cat101', 'cat102', 'cat103', 'cat104', 'cat105', 'cat106', 'cat107', 'cat108', 'cat109', 'cat110', 'cat111', 'cat112', 'cat113', 'cat114', 'cat115', 'cat116', 'cont1', 'cont2', 'cont3', 'cont4', 'cont5', 'cont6', 'cont7', 'cont8', 'cont9', 'cont10', 'cont11', 'cont12', 'cont13', 'cont14', 'loss']
In [16]:
df_test_raw = spark.read.csv('../data/raw/test.csv',
                         header=True,
                         inferSchema=True).cache()

print(f"""nrows = {df_test_raw.count():,d}
          ncols =  {len(df_test_raw.columns):,d}""" )

df_test_raw.limit(5).toPandas()
nrows = 125,546
          ncols =  131
Out[16]:
id cat1 cat2 cat3 cat4 cat5 cat6 cat7 cat8 cat9 ... cont5 cont6 cont7 cont8 cont9 cont10 cont11 cont12 cont13 cont14
0 4 A B A A A A A A B ... 0.281143 0.466591 0.317681 0.61229 0.34365 0.38016 0.377724 0.369858 0.704052 0.392562
1 6 A B A B A A A A B ... 0.836443 0.482425 0.443760 0.71330 0.51890 0.60401 0.689039 0.675759 0.453468 0.208045
2 9 A B A B B A B A B ... 0.718531 0.212308 0.325779 0.29758 0.34365 0.30529 0.245410 0.241676 0.258586 0.297232
3 12 A A A A B A A A A ... 0.397069 0.369930 0.342355 0.40028 0.33237 0.31480 0.348867 0.341872 0.592264 0.555955
4 15 B A A A A B A A A ... 0.302678 0.398862 0.391833 0.23688 0.43731 0.50556 0.359572 0.352251 0.301535 0.825823

5 rows × 131 columns

Train validation split

In [17]:
df_train_raw = df_train_raw.withColumnRenamed("loss", "label")
In [18]:
train, valid = df_train_raw.randomSplit([0.8,0.2], seed=SEED)

train = train.cache()
valid = valid.cache()

train.count(), valid.count()
Out[18]:
(150506, 37812)
In [19]:
test = df_test_raw.cache()

Feature Engineering

Encoding Categorical Features

  • StringIndexer
  • OneHotEncoder
In [20]:
is_cat = lambda c: c.startswith("cat")
make_new_cat = lambda c: "idx_{0}".format(c) if (is_cat(c)) else c

indexers = map(
    lambda c: StringIndexer(inputCol=c,
        outputCol=make_new_cat(c)
        ).fit(
             df_train_raw.select(c)
             .union(df_test_raw.select(c))
            )
    , filter(is_cat, train.columns)
)

indexers = list(indexers)
indexers[0]
Out[20]:
StringIndexer_0440f2f83818
In [21]:
mycols = df_train_raw.columns
# mycols
In [22]:
df_train_raw.select('cat1').distinct().show()
+----+
|cat1|
+----+
|   B|
|   A|
+----+

In [23]:
df_train_raw.select('cat1').distinct().count()
Out[23]:
2
In [24]:
cols_cat = [i for i in mycols if i.startswith('cat')]
len(cols_cat)
Out[24]:
116
In [25]:
%%time
n_distinct_cats = [df_train_raw.select(c).distinct().count()
                   for c in cols_cat]

len(n_distinct_cats)
CPU times: user 140 ms, sys: 30.6 ms, total: 170 ms
Wall time: 1min 32s
Out[25]:
116
In [26]:
# df_distinct = pd.DataFrame({
#     'feature': cols_cat,
#     'n_distinct': n_distinct_cats
# })

# df_distinct.tail()
In [27]:
# df_train.select('cat116').distinct().count()
In [28]:
# large_cats = df_distinct['feature'][df_distinct['n_distinct']>50].tolist()

# large_cats
In [29]:
large_cats = [cols_cat[i] for i,n in enumerate(n_distinct_cats)
              if n>50]
large_cats
Out[29]:
['cat109', 'cat110', 'cat112', 'cat113', 'cat116']
In [30]:
for c in large_cats:
    n = df_train_raw.select(c).distinct().count()
    print(f"{c} ==> {n}")
cat109 ==> 84
cat110 ==> 131
cat112 ==> 51
cat113 ==> 61
cat116 ==> 326
In [31]:
cats_selected = [i for i in cols_cat
                     if i not in large_cats]

cats_idx = ['idx_'+i for i in cats_selected]
In [32]:
# continuous features
cols_cont = [i for i in train.columns
            if i.startswith('cont')]

print(cols_cont)
['cont1', 'cont2', 'cont3', 'cont4', 'cont5', 'cont6', 'cont7', 'cont8', 'cont9', 'cont10', 'cont11', 'cont12', 'cont13', 'cont14']
In [33]:
# we do not include id and label column in selected features

features_selected = cats_idx + cols_cont

print(features_selected)
['idx_cat1', 'idx_cat2', 'idx_cat3', 'idx_cat4', 'idx_cat5', 'idx_cat6', 'idx_cat7', 'idx_cat8', 'idx_cat9', 'idx_cat10', 'idx_cat11', 'idx_cat12', 'idx_cat13', 'idx_cat14', 'idx_cat15', 'idx_cat16', 'idx_cat17', 'idx_cat18', 'idx_cat19', 'idx_cat20', 'idx_cat21', 'idx_cat22', 'idx_cat23', 'idx_cat24', 'idx_cat25', 'idx_cat26', 'idx_cat27', 'idx_cat28', 'idx_cat29', 'idx_cat30', 'idx_cat31', 'idx_cat32', 'idx_cat33', 'idx_cat34', 'idx_cat35', 'idx_cat36', 'idx_cat37', 'idx_cat38', 'idx_cat39', 'idx_cat40', 'idx_cat41', 'idx_cat42', 'idx_cat43', 'idx_cat44', 'idx_cat45', 'idx_cat46', 'idx_cat47', 'idx_cat48', 'idx_cat49', 'idx_cat50', 'idx_cat51', 'idx_cat52', 'idx_cat53', 'idx_cat54', 'idx_cat55', 'idx_cat56', 'idx_cat57', 'idx_cat58', 'idx_cat59', 'idx_cat60', 'idx_cat61', 'idx_cat62', 'idx_cat63', 'idx_cat64', 'idx_cat65', 'idx_cat66', 'idx_cat67', 'idx_cat68', 'idx_cat69', 'idx_cat70', 'idx_cat71', 'idx_cat72', 'idx_cat73', 'idx_cat74', 'idx_cat75', 'idx_cat76', 'idx_cat77', 'idx_cat78', 'idx_cat79', 'idx_cat80', 'idx_cat81', 'idx_cat82', 'idx_cat83', 'idx_cat84', 'idx_cat85', 'idx_cat86', 'idx_cat87', 'idx_cat88', 'idx_cat89', 'idx_cat90', 'idx_cat91', 'idx_cat92', 'idx_cat93', 'idx_cat94', 'idx_cat95', 'idx_cat96', 'idx_cat97', 'idx_cat98', 'idx_cat99', 'idx_cat100', 'idx_cat101', 'idx_cat102', 'idx_cat103', 'idx_cat104', 'idx_cat105', 'idx_cat106', 'idx_cat107', 'idx_cat108', 'idx_cat111', 'idx_cat114', 'idx_cat115', 'cont1', 'cont2', 'cont3', 'cont4', 'cont5', 'cont6', 'cont7', 'cont8', 'cont9', 'cont10', 'cont11', 'cont12', 'cont13', 'cont14']
In [34]:
assembler = VectorAssembler(inputCols=features_selected,
                            outputCol="features")

Modelling: Random Forest Regressor

Random Forest Regressor

In [35]:
from pyspark.ml.regression import RandomForestRegressor
In [36]:
model = RandomForestRegressor(featuresCol="features",
                              labelCol="label")
model
Out[36]:
RandomForestRegressor_a620f9834c2e
In [37]:
stages = indexers
stages.append(assembler)
stages.append(model)

pipeline = Pipeline(stages=stages)

Cross-Validation

In [38]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
In [42]:
train.count()
Out[42]:
150506
In [46]:
# help(train.sample)
In [49]:
train = train.sample(fraction=0.1,seed=SEED)
valid = valid.sample(fraction=0.1,seed=SEED)
test = test.sample(fraction=0.1,seed=SEED)
In [50]:
%%time

# numTrees = [5, 20]
# maxDepth = [4, 6]
# maxBins = [32]
# numFolds = 3

numTrees = [5]
maxDepth = [2]
maxBins = [32]
numFolds = 2

paramGrid = (
ParamGridBuilder()
.addGrid(model.numTrees, numTrees)
.addGrid(model.maxDepth, maxDepth)
.addGrid(model.maxBins, maxBins)
.build()
)

cv = CrossValidator(estimator=pipeline,
                    evaluator=RegressionEvaluator(),
                    estimatorParamMaps=paramGrid,
                    numFolds=numFolds)

cvModel = cv.fit(train)
CPU times: user 5.92 s, sys: 1.1 s, total: 7.02 s
Wall time: 5min 38s
In [51]:
bestModel = cvModel.bestModel

Model Evaluation

In [52]:
from pyspark.mllib.evaluation import RegressionMetrics
In [53]:
train_preds_and_labels = cvModel.transform(train)\
                     .select("label", "prediction").rdd

valid_preds_and_labels = cvModel.transform(valid)\
                     .select("label", "prediction").rdd
In [65]:
metrics = RegressionMetrics(train_preds_and_labels)
bp.show_method_attributes(metrics,4)
Out[65]:
0 1 2 3
0 call meanAbsoluteError r2 rootMeanSquaredError
1 explainedVariance meanSquaredError
In [61]:
def get_metrics(preds_and_labels):
    metrics = RegressionMetrics(preds_and_labels)
    df_metrics = pd.DataFrame(
        data= [ metrics.meanSquaredError,
                metrics.rootMeanSquaredError,
                metrics.r2,
                metrics.meanAbsoluteError,
                metrics.explainedVariance],
        index= ['Mean Squared Error','root MSE','R-squared',
                  'MAE','Explained Variance']
    )

    return df_metrics
In [62]:
df_metrics_train = get_metrics(train_preds_and_labels)
df_metrics_valid = get_metrics(valid_preds_and_labels)
In [70]:
df_metrics = pd.concat([df_metrics_train, df_metrics_valid],axis=1)
df_metrics.columns = ['Train','Validation']

df_metrics.round(2)
Out[70]:
Train Validation
Mean Squared Error 5980130.81 6909007.27
root MSE 2445.43 2628.50
R-squared -3.03 -3.51
MAE 1547.70 1618.27
Explained Variance 8719876.99 9784700.87

Feature Importances

In [71]:
featureImportances = bestModel.stages[-1].featureImportances.toArray()
In [74]:
df_feat_imp = pd.DataFrame({
    'Feature': features_selected,
    'Importance': featureImportances
})

df_feat_imp[df_feat_imp['Importance']>0]
Out[74]:
Feature Importance
11 idx_cat12 0.086293
56 idx_cat57 0.180114
78 idx_cat79 0.169548
79 idx_cat80 0.446550
88 idx_cat89 0.026806
99 idx_cat100 0.059660
106 idx_cat107 0.014781
107 idx_cat108 0.006957
121 cont11 0.009292

Save the output

In [76]:
cvModel.transform(test)\
.select("id", "prediction")\
.withColumnRenamed("prediction", "loss")\
.coalesce(1)\
.write.format("csv")\
.option("header", "true")\
.save("../data/outputs/test_output_dir")
In [88]:
lst_ofile = !ls ../data/outputs/test_output_dir/*.csv

ofile = lst_ofile[0]
ofile
Out[88]:
'../data/outputs/test_output_dir/part-00000-194c5ce7-e205-496d-b523-5aeb107e0d72-c000.csv'
In [87]:
!head -2 $ofile
id,loss
9,7027.552637111308
In [ ]: