import time
time_start_notebook = time.time()
# my local library
import sys
sys.path.append("/Users/poudel/Dropbox/a00_Bhishan_Modules/")
sys.path.append("/Users/poudel/Dropbox/a00_Bhishan_Modules/bhishan")
from bhishan import bp
import numpy as np
import pandas as pd
import functools # reduce
import findspark, pyspark
%load_ext watermark
%watermark -a "Bhishan Poudel" -d -v -m
print()
%watermark -iv
Author: Bhishan Poudel Python implementation: CPython Python version : 3.7.7 IPython version : 7.16.1 Compiler : Clang 4.0.1 (tags/RELEASE_401/final) OS : Darwin Release : 19.6.0 Machine : x86_64 Processor : i386 CPU cores : 4 Architecture: 64bit numpy : 1.17.5 pyspark : 3.1.2 bhishan : 0.4 findspark: 1.4.2 sys : 3.7.7 (default, May 6 2020, 04:59:01) [Clang 4.0.1 (tags/RELEASE_401/final)] pandas : 1.0.5
# pyspark settings
import os
import findspark
HOME = os.path.expanduser('~')
findspark.init(HOME + "/Softwares/Spark/spark-3.1.2-bin-hadoop3.2")
# Set spark environments
os.environ['PYSPARK_PYTHON'] = f'{HOME}/opt/miniconda3/envs/spk/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = f'{HOME}/opt/miniconda3/envs/spk/bin/python'
import pyspark
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import col as _col
from pyspark.sql.functions import udf # @udf("integer") def myfunc(x,y): return x - y
from pyspark.sql import functions as F # stddev format_number date_format, dayofyear, when
from pyspark.sql.window import Window
from pyspark.sql.types import StructField, StringType, IntegerType, FloatType, StructType
from pyspark.sql.functions import (mean as _mean, min as _min,
max as _max, avg as _avg,
when as _when
)
spark = SparkSession\
.builder\
.appName("bhishan")\
.getOrCreate()
sc = spark.sparkContext
sqlContext = pyspark.SQLContext(sc) # spark_df = sqlContext.createDataFrame(pandas_df)
sc.setLogLevel("INFO")
print(pyspark.__version__)
3.1.2
# postgres configs
import os, json
data = json.load(open(os.path.expanduser('~/.config/postgres/credentials.json')))
pw,port = data['password'], data['port'] # port is 5432
dbname = 'wbwarehouse' # make sure pgadmin is running and given database is there.
%load_ext sql
%sql postgres://postgres:$pw@localhost:$port/$dbname
%%sql
DROP TABLE IF EXISTS Warehouses CASCADE;
DROP TABLE IF EXISTS Boxes CASCADE;
CREATE TABLE Warehouses (Code INTEGER PRIMARY KEY NOT NULL,
LOCATION TEXT NOT NULL,
Capacity INTEGER NOT NULL);
CREATE TABLE Boxes (Code TEXT PRIMARY KEY NOT NULL,
CONTENTS TEXT NOT NULL,
Value REAL NOT NULL,
Warehouse INTEGER NOT NULL,
CONSTRAINT fk_Warehouses_Code
FOREIGN KEY (Warehouse) REFERENCES Warehouses(Code));
INSERT INTO Warehouses(Code, LOCATION, Capacity)
VALUES(1,'Chicago',3);
INSERT INTO Warehouses(Code, LOCATION, Capacity)
VALUES(2,'Chicago',4);
INSERT INTO Warehouses(Code, LOCATION, Capacity)
VALUES(3,'New York',7);
INSERT INTO Warehouses(Code, LOCATION, Capacity)
VALUES(4,'Los Angeles',2);
INSERT INTO Warehouses(Code, LOCATION, Capacity)
VALUES(5,'San Francisco',8);
INSERT INTO Boxes(Code, CONTENTS, Value, Warehouse)
VALUES('0MN7','Rocks',180,3);
INSERT INTO Boxes(Code, CONTENTS, Value, Warehouse)
VALUES('4H8P','Rocks',250,1);
INSERT INTO Boxes(Code, CONTENTS, Value, Warehouse)
VALUES('4RT3','Scissors',190,4);
INSERT INTO Boxes(Code, CONTENTS, Value, Warehouse)
VALUES('7G3H','Rocks',200,1);
INSERT INTO Boxes(Code, CONTENTS, Value, Warehouse)
VALUES('8JN6','Papers',75,1);
INSERT INTO Boxes(Code, CONTENTS, Value, Warehouse)
VALUES('8Y6U','Papers',50,3);
INSERT INTO Boxes(Code, CONTENTS, Value, Warehouse)
VALUES('9J6F','Papers',175,2);
INSERT INTO Boxes(Code, CONTENTS, Value, Warehouse)
VALUES('LL08','Rocks',140,4);
INSERT INTO Boxes(Code, CONTENTS, Value, Warehouse)
VALUES('P0H6','Scissors',125,1);
INSERT INTO Boxes(Code, CONTENTS, Value, Warehouse)
VALUES('P2T6','Scissors',150,2);
INSERT INTO Boxes(Code, CONTENTS, Value, Warehouse)
VALUES('TU55','Papers',90,5);
SELECT *
FROM Warehouses;
* postgres://postgres:***@localhost:5432/wbwarehouse Done. Done. Done. Done. 1 rows affected. 1 rows affected. 1 rows affected. 1 rows affected. 1 rows affected. 1 rows affected. 1 rows affected. 1 rows affected. 1 rows affected. 1 rows affected. 1 rows affected. 1 rows affected. 1 rows affected. 1 rows affected. 1 rows affected. 1 rows affected. 5 rows affected.
code | location | capacity |
---|---|---|
1 | Chicago | 3 |
2 | Chicago | 4 |
3 | New York | 7 |
4 | Los Angeles | 2 |
5 | San Francisco | 8 |
x = %sql select * from Warehouses;
dfw = x.DataFrame()
dfw
* postgres://postgres:***@localhost:5432/wbwarehouse 5 rows affected.
code | location | capacity | |
---|---|---|---|
0 | 1 | Chicago | 3 |
1 | 2 | Chicago | 4 |
2 | 3 | New York | 7 |
3 | 4 | Los Angeles | 2 |
4 | 5 | San Francisco | 8 |
dfw.dtypes
code int64 location object capacity int64 dtype: object
schema = StructType([
StructField('code',IntegerType(),True),
StructField('location',StringType(),True),
StructField('capacity',IntegerType(),True),
])
sdfw = sqlContext.createDataFrame(dfw, schema)
sdfw.show()
+----+-------------+--------+ |code| location|capacity| +----+-------------+--------+ | 1| Chicago| 3| | 2| Chicago| 4| | 3| New York| 7| | 4| Los Angeles| 2| | 5|San Francisco| 8| +----+-------------+--------+
x = %sql select * from Boxes;
dfb = x.DataFrame()
dfb
* postgres://postgres:***@localhost:5432/wbwarehouse 11 rows affected.
code | contents | value | warehouse | |
---|---|---|---|---|
0 | 0MN7 | Rocks | 180.0 | 3 |
1 | 4H8P | Rocks | 250.0 | 1 |
2 | 4RT3 | Scissors | 190.0 | 4 |
3 | 7G3H | Rocks | 200.0 | 1 |
4 | 8JN6 | Papers | 75.0 | 1 |
5 | 8Y6U | Papers | 50.0 | 3 |
6 | 9J6F | Papers | 175.0 | 2 |
7 | LL08 | Rocks | 140.0 | 4 |
8 | P0H6 | Scissors | 125.0 | 1 |
9 | P2T6 | Scissors | 150.0 | 2 |
10 | TU55 | Papers | 90.0 | 5 |
dfb.dtypes
code object contents object value float64 warehouse int64 dtype: object
schema = StructType([
StructField('code',StringType(),True),
StructField('contents',StringType(),True),
StructField('value',FloatType(),True),
StructField('warehouse',IntegerType(),True),
])
sdfb = sqlContext.createDataFrame(dfb, schema)
sdfb.show()
+----+--------+-----+---------+ |code|contents|value|warehouse| +----+--------+-----+---------+ |0MN7| Rocks|180.0| 3| |4H8P| Rocks|250.0| 1| |4RT3|Scissors|190.0| 4| |7G3H| Rocks|200.0| 1| |8JN6| Papers| 75.0| 1| |8Y6U| Papers| 50.0| 3| |9J6F| Papers|175.0| 2| |LL08| Rocks|140.0| 4| |P0H6|Scissors|125.0| 1| |P2T6|Scissors|150.0| 2| |TU55| Papers| 90.0| 5| +----+--------+-----+---------+
df = dfw.merge(dfb,left_on='code', right_on='warehouse',suffixes=['_ware','_box'])
df
code_ware | location | capacity | code_box | contents | value | warehouse | |
---|---|---|---|---|---|---|---|
0 | 1 | Chicago | 3 | 4H8P | Rocks | 250.0 | 1 |
1 | 1 | Chicago | 3 | 7G3H | Rocks | 200.0 | 1 |
2 | 1 | Chicago | 3 | 8JN6 | Papers | 75.0 | 1 |
3 | 1 | Chicago | 3 | P0H6 | Scissors | 125.0 | 1 |
4 | 2 | Chicago | 4 | 9J6F | Papers | 175.0 | 2 |
5 | 2 | Chicago | 4 | P2T6 | Scissors | 150.0 | 2 |
6 | 3 | New York | 7 | 0MN7 | Rocks | 180.0 | 3 |
7 | 3 | New York | 7 | 8Y6U | Papers | 50.0 | 3 |
8 | 4 | Los Angeles | 2 | 4RT3 | Scissors | 190.0 | 4 |
9 | 4 | Los Angeles | 2 | LL08 | Rocks | 140.0 | 4 |
10 | 5 | San Francisco | 8 | TU55 | Papers | 90.0 | 5 |
cols_w = sdfw.columns
cols_b = sdfb.columns
cols_common = set(cols_w) & set(cols_b)
cols_w, cols_b, cols_common
(['code', 'location', 'capacity'], ['code', 'contents', 'value', 'warehouse'], {'code'})
# rename common columns of first spark dataframe.
sdfw2 = sdfw.withColumnRenamed('code','code_ware')
sdfw2.show(2)
+---------+--------+--------+ |code_ware|location|capacity| +---------+--------+--------+ | 1| Chicago| 3| | 2| Chicago| 4| +---------+--------+--------+ only showing top 2 rows
# rename common columns of second spark dataframe.
sdfb2 = sdfb.withColumnRenamed('code','code_box')
sdfb2.show(2)
+--------+--------+-----+---------+ |code_box|contents|value|warehouse| +--------+--------+-----+---------+ | 0MN7| Rocks|180.0| 3| | 4H8P| Rocks|250.0| 1| +--------+--------+-----+---------+ only showing top 2 rows
# now join two spark dataframes which have different column names.
sdf = sdfw2.join(sdfb2,sdfw2.code_ware==sdfb2.warehouse)
sdf.show()
+---------+-------------+--------+--------+--------+-----+---------+ |code_ware| location|capacity|code_box|contents|value|warehouse| +---------+-------------+--------+--------+--------+-----+---------+ | 1| Chicago| 3| 4H8P| Rocks|250.0| 1| | 1| Chicago| 3| 7G3H| Rocks|200.0| 1| | 1| Chicago| 3| 8JN6| Papers| 75.0| 1| | 1| Chicago| 3| P0H6|Scissors|125.0| 1| | 3| New York| 7| 0MN7| Rocks|180.0| 3| | 3| New York| 7| 8Y6U| Papers| 50.0| 3| | 5|San Francisco| 8| TU55| Papers| 90.0| 5| | 4| Los Angeles| 2| 4RT3|Scissors|190.0| 4| | 4| Los Angeles| 2| LL08| Rocks|140.0| 4| | 2| Chicago| 4| 9J6F| Papers|175.0| 2| | 2| Chicago| 4| P2T6|Scissors|150.0| 2| +---------+-------------+--------+--------+--------+-----+---------+
bp.show_methods(sdfw,4)
# count, fillna, filter, where, groubpy, first, corr,
# missing than pandas: max, min, sum, etc.
0 | 1 | 2 | 3 | |
---|---|---|---|---|
0 | agg | drop_duplicates | localCheckpoint | sql_ctx |
1 | alias | dropna | mapInPandas | stat |
2 | approxQuantile | dtypes | na | storageLevel |
3 | cache | exceptAll | orderBy | subtract |
4 | checkpoint | explain | persist | summary |
5 | coalesce | fillna | printSchema | tail |
6 | colRegex | filter | randomSplit | take |
7 | collect | first | rdd | toDF |
8 | columns | foreach | registerTempTable | toJSON |
9 | corr | foreachPartition | repartition | toLocalIterator |
10 | count | freqItems | repartitionByRange | toPandas |
11 | cov | groupBy | replace | transform |
12 | createGlobalTempView | groupby | rollup | union |
13 | createOrReplaceGlobalTempView | head | sameSemantics | unionAll |
14 | createOrReplaceTempView | hint | sample | unionByName |
15 | createTempView | inputFiles | sampleBy | unpersist |
16 | crossJoin | intersect | schema | where |
17 | crosstab | intersectAll | select | withColumn |
18 | cube | isLocal | selectExpr | withColumnRenamed |
19 | describe | isStreaming | semanticHash | withWatermark |
20 | distinct | is_cached | show | write |
21 | drop | join | sort | writeStream |
22 | dropDuplicates | limit | sortWithinPartitions | writeTo |
Global Temporary View
Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database global_temp, and we must use the qualified name to refer it, e.g. SELECT * FROM global_temp.view1
.
# Register the DataFrame as a SQL temporary view
sdfw.createOrReplaceTempView("Warehouses")
sdfb.createOrReplaceTempView("Boxes")
sdf.createOrReplaceTempView("sdf")
spark.sql('select * from Warehouses limit 2').show()
+----+--------+--------+ |code|location|capacity| +----+--------+--------+ | 1| Chicago| 3| | 2| Chicago| 4| +----+--------+--------+
%%sql
SELECT *
FROM Warehouses
* postgres://postgres:***@localhost:5432/wbwarehouse 5 rows affected.
code | location | capacity |
---|---|---|
1 | Chicago | 3 |
2 | Chicago | 4 |
3 | New York | 7 |
4 | Los Angeles | 2 |
5 | San Francisco | 8 |
# sdfb[sdfb['Value']>150].show()
%%sql
SELECT *
FROM Boxes
WHERE value > 150
* postgres://postgres:***@localhost:5432/wbwarehouse 5 rows affected.
code | contents | value | warehouse |
---|---|---|---|
0MN7 | Rocks | 180.0 | 3 |
4H8P | Rocks | 250.0 | 1 |
4RT3 | Scissors | 190.0 | 4 |
7G3H | Rocks | 200.0 | 1 |
9J6F | Papers | 175.0 | 2 |
sdfb.select('Contents').distinct().show()
+--------+ |Contents| +--------+ | Rocks| | Papers| |Scissors| +--------+
%%sql
SELECT distinct(CONTENTS)
FROM Boxes
* postgres://postgres:***@localhost:5432/wbwarehouse 3 rows affected.
contents |
---|
Scissors |
Papers |
Rocks |
sdfb.select(_mean('Value')).show()
+------------------+ | avg(Value)| +------------------+ |147.72727272727272| +------------------+
%%sql
SELECT avg(value)
FROM Boxes
* postgres://postgres:***@localhost:5432/wbwarehouse 1 rows affected.
avg |
---|
147.727272727273 |
dfb.head(2)
code | contents | value | warehouse | |
---|---|---|---|---|
0 | 0MN7 | Rocks | 180.0 | 3 |
1 | 4H8P | Rocks | 250.0 | 1 |
# dfb.groupby('warehouse')['value'].mean()
# dfb.groupby('warehouse').agg(mean_value=('value','mean'))
sdfb.groupby('warehouse').agg({'value':'mean'}).show()
+---------+----------+ |warehouse|avg(value)| +---------+----------+ | 1| 162.5| | 3| 115.0| | 5| 90.0| | 4| 165.0| | 2| 162.5| +---------+----------+
%%sql
SELECT warehouse,
avg(value)
FROM Boxes
GROUP BY warehouse
* postgres://postgres:***@localhost:5432/wbwarehouse 5 rows affected.
warehouse | avg |
---|---|
3 | 115.0 |
5 | 90.0 |
4 | 165.0 |
2 | 162.5 |
1 | 162.5 |
dfb.groupby('warehouse')['value'].mean()[lambda x: x>150]
warehouse 1 162.5 2 162.5 4 165.0 Name: value, dtype: float64
# dfb.groupby('warehouse').filter(lambda dfx: dfx['value'].mean()>150)
sdfb.groupby('warehouse').agg({'value':'mean'}).filter('avg(value)>150').show()
+---------+----------+ |warehouse|avg(value)| +---------+----------+ | 1| 162.5| | 4| 165.0| | 2| 162.5| +---------+----------+
%%sql
SELECT warehouse,
avg(value) AS avg_value
FROM Boxes
GROUP BY warehouse
HAVING avg(value)>150
* postgres://postgres:***@localhost:5432/wbwarehouse 3 rows affected.
warehouse | avg_value |
---|---|
4 | 165.0 |
2 | 162.5 |
1 | 162.5 |
# dfb.head(2)
df.head(2)
code_ware | location | capacity | code_box | contents | value | warehouse | |
---|---|---|---|---|---|---|---|
0 | 1 | Chicago | 3 | 4H8P | Rocks | 250.0 | 1 |
1 | 1 | Chicago | 3 | 7G3H | Rocks | 200.0 | 1 |
# df[['code_box','location']]
# sdf.select('code_box','location').show()
%%sql
SELECT b.code AS code_box,
w.location AS LOCATION
FROM Boxes b,
Warehouses w
WHERE b.warehouse = w.code
* postgres://postgres:***@localhost:5432/wbwarehouse 11 rows affected.
code_box | location |
---|---|
0MN7 | New York |
4H8P | Chicago |
4RT3 | Los Angeles |
7G3H | Chicago |
8JN6 | Chicago |
8Y6U | New York |
9J6F | Chicago |
LL08 | Los Angeles |
P0H6 | Chicago |
P2T6 | Chicago |
TU55 | San Francisco |
# dfb
# dfb.groupby('warehouse')['code'].count()
# sdfb.groupby('warehouse').agg({'code':'count'}).show()
%%sql
SELECT warehouse,
count(*)
FROM Boxes
GROUP BY warehouse
* postgres://postgres:***@localhost:5432/wbwarehouse 5 rows affected.
warehouse | count |
---|---|
3 | 2 |
5 | 1 |
4 | 2 |
2 | 2 |
1 | 4 |
%%sql
/* taking into account nulls */
SELECT w.code,
count(w.code)
FROM Warehouses w
LEFT JOIN Boxes b ON w.code = b.warehouse
GROUP BY w.code
* postgres://postgres:***@localhost:5432/wbwarehouse 5 rows affected.
code | count |
---|---|
2 | 2 |
3 | 2 |
5 | 1 |
4 | 2 |
1 | 4 |
df['num_boxes_ware'] = df.groupby('warehouse')['code_box'].transform('count')
df.head(2)
code_ware | location | capacity | code_box | contents | value | warehouse | num_boxes_ware | |
---|---|---|---|---|---|---|---|---|
0 | 1 | Chicago | 3 | 4H8P | Rocks | 250.0 | 1 | 4 |
1 | 1 | Chicago | 3 | 7G3H | Rocks | 200.0 | 1 | 4 |
df[df.num_boxes_ware>df.capacity]['code_ware'].unique()
array([1])
# spark does not have groupby transform
# https://github.com/bhishanpdl/Tutorials_and_Lessons/blob/master/Tutorial_PySpark/z01_pyspark_methods/z01_pyspark_groupby/README.md
spark.sql("""
select code from Warehouses w
where capacity <
(
select count(*) from Boxes b
where b.warehouse = w.code
)
""").show()
+----+ |code| +----+ | 1| +----+
# next cell command does not work in pyspark, only work in sql
%%sql
SELECT w.code
FROM Warehouses w
JOIN Boxes b ON w.code = b.warehouse
GROUP BY w.code
HAVING count(b.code) > w.capacity
* postgres://postgres:***@localhost:5432/wbwarehouse 1 rows affected.
code |
---|
1 |
spark.sql("""
select w.code
from Warehouses w
join Boxes b
on w.code = b.warehouse
group by w.code
having count(b.code) > first(w.capacity)
""").show()
# NOTE: We need FIRST function inside HAVING clause.
+----+ |code| +----+ | 1| +----+
# df[df.location=='Chicago']['code_box']
# sdf.filter(_col('location')=='Chicago').select('code_box').distinct().show()
%%sql
SELECT b.code
FROM Warehouses w
RIGHT JOIN Boxes b ON w.code=b.warehouse
WHERE w.location='Chicago'
* postgres://postgres:***@localhost:5432/wbwarehouse 6 rows affected.
code |
---|
4H8P |
7G3H |
8JN6 |
9J6F |
P0H6 |
P2T6 |
%%sql
SELECT *
FROM Boxes
LIMIT 1;
* postgres://postgres:***@localhost:5432/wbwarehouse 1 rows affected.
code | contents | value | warehouse |
---|---|---|---|
0MN7 | Rocks | 180.0 | 3 |
%%sql
SELECT code
FROM Boxes
WHERE warehouse IN
(SELECT code
FROM Warehouses
WHERE LOCATION='Chicago' )
* postgres://postgres:***@localhost:5432/wbwarehouse 6 rows affected.
code |
---|
4H8P |
7G3H |
8JN6 |
9J6F |
P0H6 |
P2T6 |
spark.sql("""
select b.code
from Warehouses w
right join Boxes b
on w.code=b.warehouse
where w.location='Chicago'
""").take(1)
[Row(code='4H8P')]
spark.sql("""
select code from Boxes
where warehouse in (
select code from Warehouses where location='Chicago'
)
""").take(1)
[Row(code='4H8P')]
%%sql
SELECT *
FROM Warehouses;
* postgres://postgres:***@localhost:5432/wbwarehouse 5 rows affected.
code | location | capacity |
---|---|---|
1 | Chicago | 3 |
2 | Chicago | 4 |
3 | New York | 7 |
4 | Los Angeles | 2 |
5 | San Francisco | 8 |
%%sql
INSERT INTO Warehouses (code, LOCATION, Capacity)
VALUES (6, 'New York',3);
* postgres://postgres:***@localhost:5432/wbwarehouse 1 rows affected.
[]
%%sql
SELECT *
FROM warehouses;
* postgres://postgres:***@localhost:5432/wbwarehouse 6 rows affected.
code | location | capacity |
---|---|---|
1 | Chicago | 3 |
2 | Chicago | 4 |
3 | New York | 7 |
4 | Los Angeles | 2 |
5 | San Francisco | 8 |
6 | New York | 3 |
%%sql
SELECT *
FROM warehouses;
* postgres://postgres:***@localhost:5432/wbwarehouse 6 rows affected.
code | location | capacity |
---|---|---|
1 | Chicago | 3 |
2 | Chicago | 4 |
3 | New York | 7 |
4 | Los Angeles | 2 |
5 | San Francisco | 8 |
6 | New York | 3 |
%%sql
INSERT INTO Boxes
VALUES('H5RT','Papers',200,2);
* postgres://postgres:***@localhost:5432/wbwarehouse 1 rows affected.
[]
%%sql
SELECT *
FROM warehouses;
* postgres://postgres:***@localhost:5432/wbwarehouse 6 rows affected.
code | location | capacity |
---|---|---|
1 | Chicago | 3 |
2 | Chicago | 4 |
3 | New York | 7 |
4 | Los Angeles | 2 |
5 | San Francisco | 8 |
6 | New York | 3 |
%%sql
SELECT *
FROM Boxes;
* postgres://postgres:***@localhost:5432/wbwarehouse 12 rows affected.
code | contents | value | warehouse |
---|---|---|---|
0MN7 | Rocks | 180.0 | 3 |
4H8P | Rocks | 250.0 | 1 |
4RT3 | Scissors | 190.0 | 4 |
7G3H | Rocks | 200.0 | 1 |
8JN6 | Papers | 75.0 | 1 |
8Y6U | Papers | 50.0 | 3 |
9J6F | Papers | 175.0 | 2 |
LL08 | Rocks | 140.0 | 4 |
P0H6 | Scissors | 125.0 | 1 |
P2T6 | Scissors | 150.0 | 2 |
TU55 | Papers | 90.0 | 5 |
H5RT | Papers | 200.0 | 2 |
%%sql
UPDATE Boxes
SET Value = Value * 0.85;
* postgres://postgres:***@localhost:5432/wbwarehouse 12 rows affected.
[]
%%sql
SELECT *
FROM Boxes;
* postgres://postgres:***@localhost:5432/wbwarehouse 12 rows affected.
code | contents | value | warehouse |
---|---|---|---|
0MN7 | Rocks | 153.0 | 3 |
4H8P | Rocks | 212.5 | 1 |
4RT3 | Scissors | 161.5 | 4 |
7G3H | Rocks | 170.0 | 1 |
8JN6 | Papers | 63.75 | 1 |
8Y6U | Papers | 42.5 | 3 |
9J6F | Papers | 148.75 | 2 |
LL08 | Rocks | 119.0 | 4 |
P0H6 | Scissors | 106.25 | 1 |
P2T6 | Scissors | 127.5 | 2 |
TU55 | Papers | 76.5 | 5 |
H5RT | Papers | 170.0 | 2 |
dfb
code | contents | value | warehouse | |
---|---|---|---|---|
0 | 0MN7 | Rocks | 180.0 | 3 |
1 | 4H8P | Rocks | 250.0 | 1 |
2 | 4RT3 | Scissors | 190.0 | 4 |
3 | 7G3H | Rocks | 200.0 | 1 |
4 | 8JN6 | Papers | 75.0 | 1 |
5 | 8Y6U | Papers | 50.0 | 3 |
6 | 9J6F | Papers | 175.0 | 2 |
7 | LL08 | Rocks | 140.0 | 4 |
8 | P0H6 | Scissors | 125.0 | 1 |
9 | P2T6 | Scissors | 150.0 | 2 |
10 | TU55 | Papers | 90.0 | 5 |
dfb['value'].mean()
147.72727272727272
dfb['discounted_value'] = np.where(dfb['value']>dfb['value'].mean(),
dfb['value']*0.8,
dfb['value'])
dfb
code | contents | value | warehouse | discounted_value | |
---|---|---|---|---|---|
0 | 0MN7 | Rocks | 180.0 | 3 | 144.0 |
1 | 4H8P | Rocks | 250.0 | 1 | 200.0 |
2 | 4RT3 | Scissors | 190.0 | 4 | 152.0 |
3 | 7G3H | Rocks | 200.0 | 1 | 160.0 |
4 | 8JN6 | Papers | 75.0 | 1 | 75.0 |
5 | 8Y6U | Papers | 50.0 | 3 | 50.0 |
6 | 9J6F | Papers | 175.0 | 2 | 140.0 |
7 | LL08 | Rocks | 140.0 | 4 | 140.0 |
8 | P0H6 | Scissors | 125.0 | 1 | 125.0 |
9 | P2T6 | Scissors | 150.0 | 2 | 120.0 |
10 | TU55 | Papers | 90.0 | 5 | 90.0 |
m = sdfb.select(F.mean('value')).collect()[0][0]
m
147.72727272727272
sdfb.withColumn('discounted_value',
F.when( F.col('value') > m,
F.col('value')*0.8
)
.otherwise(F.col('value'))
).show()
+----+--------+-----+---------+----------------+ |code|contents|value|warehouse|discounted_value| +----+--------+-----+---------+----------------+ |0MN7| Rocks|180.0| 3| 144.0| |4H8P| Rocks|250.0| 1| 200.0| |4RT3|Scissors|190.0| 4| 152.0| |7G3H| Rocks|200.0| 1| 160.0| |8JN6| Papers| 75.0| 1| 75.0| |8Y6U| Papers| 50.0| 3| 50.0| |9J6F| Papers|175.0| 2| 140.0| |LL08| Rocks|140.0| 4| 140.0| |P0H6|Scissors|125.0| 1| 125.0| |P2T6|Scissors|150.0| 2| 120.0| |TU55| Papers| 90.0| 5| 90.0| +----+--------+-----+---------+----------------+
%%sql
SELECT *
FROM Boxes;
* postgres://postgres:***@localhost:5432/wbwarehouse 12 rows affected.
code | contents | value | warehouse |
---|---|---|---|
0MN7 | Rocks | 153.0 | 3 |
4H8P | Rocks | 212.5 | 1 |
4RT3 | Scissors | 161.5 | 4 |
7G3H | Rocks | 170.0 | 1 |
8JN6 | Papers | 63.75 | 1 |
8Y6U | Papers | 42.5 | 3 |
9J6F | Papers | 148.75 | 2 |
LL08 | Rocks | 119.0 | 4 |
P0H6 | Scissors | 106.25 | 1 |
P2T6 | Scissors | 127.5 | 2 |
TU55 | Papers | 76.5 | 5 |
H5RT | Papers | 170.0 | 2 |
%%sql
UPDATE Boxes
SET Value = Value * 0.80
WHERE Value >
(SELECT AVG(Value)
FROM
(SELECT *
FROM Boxes) AS X);
* postgres://postgres:***@localhost:5432/wbwarehouse 6 rows affected.
[]
%%sql
SELECT *
FROM Boxes;
* postgres://postgres:***@localhost:5432/wbwarehouse 12 rows affected.
code | contents | value | warehouse |
---|---|---|---|
8JN6 | Papers | 63.75 | 1 |
8Y6U | Papers | 42.5 | 3 |
LL08 | Rocks | 119.0 | 4 |
P0H6 | Scissors | 106.25 | 1 |
P2T6 | Scissors | 127.5 | 2 |
TU55 | Papers | 76.5 | 5 |
0MN7 | Rocks | 122.4 | 3 |
4H8P | Rocks | 170.0 | 1 |
4RT3 | Scissors | 129.2 | 4 |
7G3H | Rocks | 136.0 | 1 |
9J6F | Papers | 119.0 | 2 |
H5RT | Papers | 136.0 | 2 |
# dfb[dfb['value']>100]
# sdfb[sdfb['value']>100].show()
%%sql
SELECT *
FROM Boxes;
* postgres://postgres:***@localhost:5432/wbwarehouse 12 rows affected.
code | contents | value | warehouse |
---|---|---|---|
8JN6 | Papers | 63.75 | 1 |
8Y6U | Papers | 42.5 | 3 |
LL08 | Rocks | 119.0 | 4 |
P0H6 | Scissors | 106.25 | 1 |
P2T6 | Scissors | 127.5 | 2 |
TU55 | Papers | 76.5 | 5 |
0MN7 | Rocks | 122.4 | 3 |
4H8P | Rocks | 170.0 | 1 |
4RT3 | Scissors | 129.2 | 4 |
7G3H | Rocks | 136.0 | 1 |
9J6F | Papers | 119.0 | 2 |
H5RT | Papers | 136.0 | 2 |
%%sql
DELETE
FROM Boxes
WHERE Value < 100;
* postgres://postgres:***@localhost:5432/wbwarehouse 3 rows affected.
[]
%%sql
SELECT *
FROM Boxes;
* postgres://postgres:***@localhost:5432/wbwarehouse 9 rows affected.
code | contents | value | warehouse |
---|---|---|---|
LL08 | Rocks | 119.0 | 4 |
P0H6 | Scissors | 106.25 | 1 |
P2T6 | Scissors | 127.5 | 2 |
0MN7 | Rocks | 122.4 | 3 |
4H8P | Rocks | 170.0 | 1 |
4RT3 | Scissors | 129.2 | 4 |
7G3H | Rocks | 136.0 | 1 |
9J6F | Papers | 119.0 | 2 |
H5RT | Papers | 136.0 | 2 |
# df[df.num_boxes_ware > df.capacity]
%%sql
SELECT *
FROM Boxes;
* postgres://postgres:***@localhost:5432/wbwarehouse 9 rows affected.
code | contents | value | warehouse |
---|---|---|---|
LL08 | Rocks | 119.0 | 4 |
P0H6 | Scissors | 106.25 | 1 |
P2T6 | Scissors | 127.5 | 2 |
0MN7 | Rocks | 122.4 | 3 |
4H8P | Rocks | 170.0 | 1 |
4RT3 | Scissors | 129.2 | 4 |
7G3H | Rocks | 136.0 | 1 |
9J6F | Papers | 119.0 | 2 |
H5RT | Papers | 136.0 | 2 |
%%sql
DELETE
FROM Boxes
WHERE Warehouse IN
(SELECT Code
FROM Warehouses
WHERE Capacity <
(SELECT COUNT(*)
FROM Boxes
WHERE Warehouse = Warehouses.Code ) );
* postgres://postgres:***@localhost:5432/wbwarehouse 0 rows affected.
[]
%%sql
SELECT *
FROM Boxes;
* postgres://postgres:***@localhost:5432/wbwarehouse 9 rows affected.
code | contents | value | warehouse |
---|---|---|---|
LL08 | Rocks | 119.0 | 4 |
P0H6 | Scissors | 106.25 | 1 |
P2T6 | Scissors | 127.5 | 2 |
0MN7 | Rocks | 122.4 | 3 |
4H8P | Rocks | 170.0 | 1 |
4RT3 | Scissors | 129.2 | 4 |
7G3H | Rocks | 136.0 | 1 |
9J6F | Papers | 119.0 | 2 |
H5RT | Papers | 136.0 | 2 |
time_taken = time.time() - time_start_notebook
h,m = divmod(time_taken,60*60)
print('Time taken to run whole notebook: {:.0f} hr '\
'{:.0f} min {:.0f} secs'.format(h, *divmod(m,60)))
Time taken to run whole notebook: 0 hr 0 min 36 secs