This dataset contains house sale prices for King County, which includes Seattle. It includes homes sold between May 2014 and May 2015.
Task: Try to estimate the price based on given features.
import numpy as np
import pandas as pd
import seaborn as sns
sns.set(color_codes=True)
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
import os
import time
# random state
SEED = 0
RNG = np.random.RandomState(SEED)
# Jupyter notebook settings for pandas
pd.set_option('display.max_columns', 200)
pd.set_option('display.max_rows', 100) # None for all the rows
pd.set_option('display.max_colwidth', 50)
print([(x.__name__,x.__version__) for x in [np, pd,sns,matplotlib]])
[('numpy', '1.17.1'), ('pandas', '0.25.1'), ('seaborn', '0.9.0'), ('matplotlib', '3.1.1')]
%%javascript
IPython.OutputArea.auto_scroll_threshold = 9999;
# pyspark
import pyspark
spark = pyspark.sql.SparkSession.builder.appName('bhishan').getOrCreate()
print([(x.__name__,x.__version__) for x in [np, pd, pyspark]])
[('numpy', '1.17.1'), ('pandas', '0.25.1'), ('pyspark', '2.4.4')]
# pyspark sql
from pyspark.sql.functions import col
from pyspark.sql.functions import udf # @udf("integer") def myfunc(x,y): return x - y
from pyspark.sql import functions as F # stddev format_number date_format, dayofyear, when
from pyspark.sql.types import StructField, StringType, IntegerType, FloatType, StructType, DateType
# pyspark ml feature
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.feature import OneHotEncoder,OneHotEncoderEstimator
from pyspark.ml.feature import Bucketizer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
# regressors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
# cross validation
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import CrossValidatorModel
# model evaluation regression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RegressionMetrics
def show_method_attributes(method, ncols=2):
""" Show all the attributes of a given method.
Example:
========
show_method_attributes(list)
"""
x = [I for I in dir(method) if I[0].islower()]
x = [I for I in x if I not in 'os np pd sys time psycopg2'.split()]
return pd.DataFrame(np.array_split(x,ncols)).T.fillna('')
df = spark.read.csv('../data/raw/kc_house_data.csv', header=True, inferSchema=True).cache()
print('nrows = ', df.count(), 'ncols = ', len(df.columns))
df.limit(5).toPandas()
nrows = 21613 ncols = 21
id | date | price | bedrooms | bathrooms | sqft_living | sqft_lot | floors | waterfront | view | condition | grade | sqft_above | sqft_basement | yr_built | yr_renovated | zipcode | lat | long | sqft_living15 | sqft_lot15 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 7129300520 | 20141013T000000 | 221900 | 3 | 1.00 | 1180 | 5650 | 1.0 | 0 | 0 | 3 | 7 | 1180 | 0 | 1955 | 0 | 98178 | 47.5112 | -122.257 | 1340 | 5650 |
1 | 6414100192 | 20141209T000000 | 538000 | 3 | 2.25 | 2570 | 7242 | 2.0 | 0 | 0 | 3 | 7 | 2170 | 400 | 1951 | 1991 | 98125 | 47.7210 | -122.319 | 1690 | 7639 |
2 | 5631500400 | 20150225T000000 | 180000 | 2 | 1.00 | 770 | 10000 | 1.0 | 0 | 0 | 3 | 6 | 770 | 0 | 1933 | 0 | 98028 | 47.7379 | -122.233 | 2720 | 8062 |
3 | 2487200875 | 20141209T000000 | 604000 | 4 | 3.00 | 1960 | 5000 | 1.0 | 0 | 0 | 5 | 7 | 1050 | 910 | 1965 | 0 | 98136 | 47.5208 | -122.393 | 1360 | 5000 |
4 | 1954400510 | 20150218T000000 | 510000 | 3 | 2.00 | 1680 | 8080 | 1.0 | 0 | 0 | 3 | 8 | 1680 | 0 | 1987 | 0 | 98074 | 47.6168 | -122.045 | 1800 | 7503 |
df.printSchema()
root |-- id: long (nullable = true) |-- date: string (nullable = true) |-- price: decimal(7,0) (nullable = true) |-- bedrooms: integer (nullable = true) |-- bathrooms: double (nullable = true) |-- sqft_living: integer (nullable = true) |-- sqft_lot: integer (nullable = true) |-- floors: double (nullable = true) |-- waterfront: integer (nullable = true) |-- view: integer (nullable = true) |-- condition: integer (nullable = true) |-- grade: integer (nullable = true) |-- sqft_above: integer (nullable = true) |-- sqft_basement: integer (nullable = true) |-- yr_built: integer (nullable = true) |-- yr_renovated: integer (nullable = true) |-- zipcode: integer (nullable = true) |-- lat: double (nullable = true) |-- long: double (nullable = true) |-- sqft_living15: integer (nullable = true) |-- sqft_lot15: integer (nullable = true)
print(df.columns)
['id', 'date', 'price', 'bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'waterfront', 'view', 'condition', 'grade', 'sqft_above', 'sqft_basement', 'yr_built', 'yr_renovated', 'zipcode', 'lat', 'long', 'sqft_living15', 'sqft_lot15']
selected_features = [ 'bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'waterfront', 'view', 'condition', 'grade', 'sqft_above', 'sqft_basement', 'yr_built', 'yr_renovated', 'zipcode', 'lat', 'long', 'sqft_living15', 'sqft_lot15']
assembler = VectorAssembler(
inputCols= selected_features,
outputCol='features'
)
df = assembler.transform(df)
df.printSchema()
root |-- id: long (nullable = true) |-- date: string (nullable = true) |-- price: decimal(7,0) (nullable = true) |-- bedrooms: integer (nullable = true) |-- bathrooms: double (nullable = true) |-- sqft_living: integer (nullable = true) |-- sqft_lot: integer (nullable = true) |-- floors: double (nullable = true) |-- waterfront: integer (nullable = true) |-- view: integer (nullable = true) |-- condition: integer (nullable = true) |-- grade: integer (nullable = true) |-- sqft_above: integer (nullable = true) |-- sqft_basement: integer (nullable = true) |-- yr_built: integer (nullable = true) |-- yr_renovated: integer (nullable = true) |-- zipcode: integer (nullable = true) |-- lat: double (nullable = true) |-- long: double (nullable = true) |-- sqft_living15: integer (nullable = true) |-- sqft_lot15: integer (nullable = true) |-- features: vector (nullable = true)
train, test = df.randomSplit([0.8,0.2], seed=random_state)
train.count(), test.count()
(17323, 4290)
from pyspark.ml.regression import RandomForestRegressor
pred_col = 'prediction_rf'
model = RandomForestRegressor(featuresCol='features',
labelCol="price",
predictionCol=pred_col,
seed=random_state
)
model = model.fit(train)
preds_rf = model.transform(test)
preds_and_labels = preds_rf.select([pred_col,'price'])
metrics = ['rmse','mae','r2']
metrics_values = [ RegressionEvaluator(predictionCol=pred_col,
labelCol='price',
metricName=m).evaluate(preds_and_labels)
for m in metrics ]
df_metrics = pd.DataFrame({'metric': metrics,
'value': metrics_values})
df_metrics
metric | value | |
---|---|---|
0 | rmse | 171490.447891 |
1 | mae | 105460.153789 |
2 | r2 | 0.778316 |
print(train.columns)
['id', 'date', 'price', 'bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'waterfront', 'view', 'condition', 'grade', 'sqft_above', 'sqft_basement', 'yr_built', 'yr_renovated', 'zipcode', 'lat', 'long', 'sqft_living15', 'sqft_lot15', 'features']
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
import time
t0 = time.time()
model_rf = RandomForestRegressor(labelCol = "price", numTrees = 50)
grid_depths = [10,20]
paramGrid = ParamGridBuilder()\
.addGrid(model_rf.maxDepth, grid_depths)\
.addGrid(model_rf.minInstancesPerNode, grid_depths)\
.build()
pipeline = Pipeline(stages = [model_rf])
evaluator = RegressionEvaluator(labelCol = "price",
predictionCol = "prediction",
metricName = "r2")
crossval = CrossValidator(estimator = pipeline,
estimatorParamMaps = paramGrid,
evaluator = evaluator,
seed = random_state,
numFolds = 5)
model_cv = crossval.fit(train)
t1 = time.time() - t0
print('Time taken: {:.0f} min {:.0f} secs'.format(*divmod(t1,60)))
Time taken: 5 min 14 secs
evaluator.evaluate(model_cv.transform(test))
0.8717655339631416
model_cv.bestModel.stages
[RandomForestRegressionModel (uid=RandomForestRegressor_06afffc7fa1c) with 50 trees]
prediction = model_cv.transform(test)
print(prediction.columns)
['id', 'date', 'price', 'bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'waterfront', 'view', 'condition', 'grade', 'sqft_above', 'sqft_basement', 'yr_built', 'yr_renovated', 'zipcode', 'lat', 'long', 'sqft_living15', 'sqft_lot15', 'features', 'prediction']
model_cv.params
[Param(parent='CrossValidatorModel_7d1d2b9cacda', name='estimator', doc='estimator to be cross-validated'), Param(parent='CrossValidatorModel_7d1d2b9cacda', name='estimatorParamMaps', doc='estimator param maps'), Param(parent='CrossValidatorModel_7d1d2b9cacda', name='evaluator', doc='evaluator used to select hyper-parameters that maximize the validator metric'), Param(parent='CrossValidatorModel_7d1d2b9cacda', name='seed', doc='random seed.')]
pred_col = 'prediction'
preds_and_labels = prediction.select([pred_col,'price'])
metrics = ['rmse','mae','r2']
metrics_values = [ RegressionEvaluator(predictionCol=pred_col,
labelCol='price',
metricName=m).evaluate(preds_and_labels)
for m in metrics ]
df_metrics = pd.DataFrame({'metric': metrics,
'value': metrics_values})
df_metrics
metric | value | |
---|---|---|
0 | rmse | 130429.439401 |
1 | mae | 73749.773746 |
2 | r2 | 0.871766 |