The Allstate Corporation is an American insurance company that is in the United States. The company also has personal lines insurance operations in Canada.
Data Source Kaggle: https://www.kaggle.com/c/allstate-claims-severity/data
import numpy as np
import pandas as pd
import seaborn as sns
pd.plotting.register_matplotlib_converters()
from tqdm import tqdm_notebook as tqdm
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
plt.style.use('ggplot')
# random state
SEED=100
np.random.seed(SEED)
[(x.__name__,x.__version__) for x in [np,pd,sns]]
import re
from datetime import datetime
# pyspark
import pyspark
spark = pyspark.sql\
.SparkSession\
.builder\
.appName('app')\
.getOrCreate()
# sql
from pyspark.sql.functions import col as _col
from pyspark.sql.functions import udf
# @udf("integer") def myfunc(x,y): return x - y
# stddev format_number date_format, dayofyear, when
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import (mean as _mean, min as _min,
max as _max, avg as _avg,
when as _when
)
from pyspark.sql.types import (StructField,StringType,
IntegerType, FloatType,
DoubleType,StructType)
from pyspark import SparkConf, SparkContext, SQLContext
sc = spark.sparkContext
sqlContext = SQLContext(sc)
# spark_df = sqlContext.createDataFrame(pandas_df)
# pyspark ml feature
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.feature import OneHotEncoder,OneHotEncoderEstimator
from pyspark.ml.feature import Bucketizer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline, PipelineModel
# regressors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
# cross validation
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import CrossValidatorModel
# model evaluation regression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RegressionMetrics
%load_ext autoreload
%autoreload 2
# my personal module
from bhishan import bp
!ls ../data
!head -2 ../data/raw/train.csv
df_train_raw = spark.read.csv('../data/raw/train.csv',
header=True,
inferSchema=True).cache()
print(f"""nrows = {df_train_raw.count():,d}
ncols = {len(df_train_raw.columns):,d}""" )
df_train_raw.limit(5).toPandas()
# df_train_raw.printSchema()
print(df_train_raw.columns)
df_test_raw = spark.read.csv('../data/raw/test.csv',
header=True,
inferSchema=True).cache()
print(f"""nrows = {df_test_raw.count():,d}
ncols = {len(df_test_raw.columns):,d}""" )
df_test_raw.limit(5).toPandas()
df_train_raw = df_train_raw.withColumnRenamed("loss", "label")
train, valid = df_train_raw.randomSplit([0.8,0.2], seed=SEED)
train = train.cache()
valid = valid.cache()
train.count(), valid.count()
test = df_test_raw.cache()
is_cat = lambda c: c.startswith("cat")
make_new_cat = lambda c: "idx_{0}".format(c) if (is_cat(c)) else c
indexers = map(
lambda c: StringIndexer(inputCol=c,
outputCol=make_new_cat(c)
).fit(
df_train_raw.select(c)
.union(df_test_raw.select(c))
)
, filter(is_cat, train.columns)
)
indexers = list(indexers)
indexers[0]
mycols = df_train_raw.columns
# mycols
df_train_raw.select('cat1').distinct().show()
df_train_raw.select('cat1').distinct().count()
cols_cat = [i for i in mycols if i.startswith('cat')]
len(cols_cat)
%%time
n_distinct_cats = [df_train_raw.select(c).distinct().count()
for c in cols_cat]
len(n_distinct_cats)
# df_distinct = pd.DataFrame({
# 'feature': cols_cat,
# 'n_distinct': n_distinct_cats
# })
# df_distinct.tail()
# df_train.select('cat116').distinct().count()
# large_cats = df_distinct['feature'][df_distinct['n_distinct']>50].tolist()
# large_cats
large_cats = [cols_cat[i] for i,n in enumerate(n_distinct_cats)
if n>50]
large_cats
for c in large_cats:
n = df_train_raw.select(c).distinct().count()
print(f"{c} ==> {n}")
cats_selected = [i for i in cols_cat
if i not in large_cats]
cats_idx = ['idx_'+i for i in cats_selected]
# continuous features
cols_cont = [i for i in train.columns
if i.startswith('cont')]
print(cols_cont)
# we do not include id and label column in selected features
features_selected = cats_idx + cols_cont
print(features_selected)
assembler = VectorAssembler(inputCols=features_selected,
outputCol="features")
from pyspark.ml.regression import RandomForestRegressor
model = RandomForestRegressor(featuresCol="features",
labelCol="label")
model
stages = indexers
stages.append(assembler)
stages.append(model)
pipeline = Pipeline(stages=stages)
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
train.count()
# help(train.sample)
train = train.sample(fraction=0.1,seed=SEED)
valid = valid.sample(fraction=0.1,seed=SEED)
test = test.sample(fraction=0.1,seed=SEED)
%%time
# numTrees = [5, 20]
# maxDepth = [4, 6]
# maxBins = [32]
# numFolds = 3
numTrees = [5]
maxDepth = [2]
maxBins = [32]
numFolds = 2
paramGrid = (
ParamGridBuilder()
.addGrid(model.numTrees, numTrees)
.addGrid(model.maxDepth, maxDepth)
.addGrid(model.maxBins, maxBins)
.build()
)
cv = CrossValidator(estimator=pipeline,
evaluator=RegressionEvaluator(),
estimatorParamMaps=paramGrid,
numFolds=numFolds)
cvModel = cv.fit(train)
bestModel = cvModel.bestModel
from pyspark.mllib.evaluation import RegressionMetrics
train_preds_and_labels = cvModel.transform(train)\
.select("label", "prediction").rdd
valid_preds_and_labels = cvModel.transform(valid)\
.select("label", "prediction").rdd
metrics = RegressionMetrics(train_preds_and_labels)
bp.show_method_attributes(metrics,4)
def get_metrics(preds_and_labels):
metrics = RegressionMetrics(preds_and_labels)
df_metrics = pd.DataFrame(
data= [ metrics.meanSquaredError,
metrics.rootMeanSquaredError,
metrics.r2,
metrics.meanAbsoluteError,
metrics.explainedVariance],
index= ['Mean Squared Error','root MSE','R-squared',
'MAE','Explained Variance']
)
return df_metrics
df_metrics_train = get_metrics(train_preds_and_labels)
df_metrics_valid = get_metrics(valid_preds_and_labels)
df_metrics = pd.concat([df_metrics_train, df_metrics_valid],axis=1)
df_metrics.columns = ['Train','Validation']
df_metrics.round(2)
featureImportances = bestModel.stages[-1].featureImportances.toArray()
df_feat_imp = pd.DataFrame({
'Feature': features_selected,
'Importance': featureImportances
})
df_feat_imp[df_feat_imp['Importance']>0]
cvModel.transform(test)\
.select("id", "prediction")\
.withColumnRenamed("prediction", "loss")\
.coalesce(1)\
.write.format("csv")\
.option("header", "true")\
.save("../data/outputs/test_output_dir")
lst_ofile = !ls ../data/outputs/test_output_dir/*.csv
ofile = lst_ofile[0]
ofile
!head -2 $ofile