import time
time_start_notebook = time.time()
# my local library
import sys
sys.path.append("/Users/poudel/Dropbox/a00_Bhishan_Modules/")
sys.path.append("/Users/poudel/Dropbox/a00_Bhishan_Modules/bhishan")
from bhishan import bp
import numpy as np
import pandas as pd
import functools # reduce
import findspark, pyspark
%load_ext watermark
%watermark -a "Bhishan Poudel" -d -v -m
print()
%watermark -iv
The watermark extension is already loaded. To reload it, use: %reload_ext watermark Author: Bhishan Poudel Python implementation: CPython Python version : 3.7.7 IPython version : 7.16.1 Compiler : Clang 4.0.1 (tags/RELEASE_401/final) OS : Darwin Release : 19.6.0 Machine : x86_64 Processor : i386 CPU cores : 4 Architecture: 64bit json : 2.0.9 numpy : 1.17.5 findspark: 1.4.2 pyspark : 3.1.2 bhishan : 0.4 pandas : 1.0.5 sys : 3.7.7 (default, May 6 2020, 04:59:01) [Clang 4.0.1 (tags/RELEASE_401/final)]
# pyspark settings
import os
import findspark
HOME = os.path.expanduser('~')
findspark.init(HOME + "/Softwares/Spark/spark-3.1.2-bin-hadoop3.2")
# Set spark environments
os.environ['PYSPARK_PYTHON'] = f'{HOME}/opt/miniconda3/envs/spk/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = f'{HOME}/opt/miniconda3/envs/spk/bin/python'
import pyspark
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import col as _col
from pyspark.sql.functions import udf # @udf("integer") def myfunc(x,y): return x - y
from pyspark.sql import functions as F # stddev format_number date_format, dayofyear, when
from pyspark.sql.window import Window
from pyspark.sql.types import StructField, StringType, IntegerType, FloatType, StructType
from pyspark.sql.functions import (mean as _mean, min as _min,
max as _max, avg as _avg,
when as _when
)
spark = SparkSession\
.builder\
.appName("bhishan")\
.getOrCreate()
sc = spark.sparkContext
sqlContext = pyspark.SQLContext(sc) # spark_df = sqlContext.createDataFrame(pandas_df)
sc.setLogLevel("INFO")
print(pyspark.__version__)
3.1.2
# postgres configs
import os, json
data = json.load(open(os.path.expanduser('~/.config/postgres/credentials.json')))
pw,port = data['password'], data['port'] # port is 5432
dbname = 'wbemployee' # make sure pgadmin is running and given database is there.
%load_ext sql
%sql postgres://postgres:$pw@localhost:$port/$dbname
The sql extension is already loaded. To reload it, use: %reload_ext sql
%%sql
DROP TABLE IF EXISTS Departments CASCADE;
DROP TABLE IF EXISTS Employees CASCADE;
CREATE TABLE Departments (Code INTEGER PRIMARY KEY,
Name varchar(255) NOT NULL,
Budget decimal NOT NULL);
CREATE TABLE Employees (SSN INTEGER PRIMARY KEY,
Name varchar(255) NOT NULL,
LastName varchar(255) NOT NULL,
Department INTEGER NOT NULL,
FOREIGN KEY (department) REFERENCES Departments(Code));
INSERT INTO Departments(Code, Name, Budget)
VALUES(14,'IT',65000);
INSERT INTO Departments(Code, Name, Budget)
VALUES(37,'Accounting',15000);
INSERT INTO Departments(Code, Name, Budget)
VALUES(59,'Human Resources',240000);
INSERT INTO Departments(Code, Name, Budget)
VALUES(77,'Research',55000);
INSERT INTO Employees(SSN, Name, LastName, Department)
VALUES('123234877','Michael','Rogers',14);
INSERT INTO Employees(SSN, Name, LastName, Department)
VALUES('152934485','Anand','Manikutty',14);
INSERT INTO Employees(SSN, Name, LastName, Department)
VALUES('222364883','Carol','Smith',37);
INSERT INTO Employees(SSN, Name, LastName, Department)
VALUES('326587417','Joe','Stevens',37);
INSERT INTO Employees(SSN, Name, LastName, Department)
VALUES('332154719','Mary-Anne','Foster',14);
INSERT INTO Employees(SSN, Name, LastName, Department)
VALUES('332569843','George','O''Donnell',77);
INSERT INTO Employees(SSN, Name, LastName, Department)
VALUES('546523478','John','Doe',59);
INSERT INTO Employees(SSN, Name, LastName, Department)
VALUES('631231482','David','Smith',77);
INSERT INTO Employees(SSN, Name, LastName, Department)
VALUES('654873219','Zacary','Efron',59);
INSERT INTO Employees(SSN, Name, LastName, Department)
VALUES('745685214','Eric','Goldsmith',59);
INSERT INTO Employees(SSN, Name, LastName, Department)
VALUES('845657245','Elizabeth','Doe',14);
INSERT INTO Employees(SSN, Name, LastName, Department)
VALUES('845657246','Kumar','Swamy',14);
* postgres://postgres:***@localhost:5432/wbemployee Done. Done.
[]
%%sql
SELECT *
FROM Departments;
* postgres://postgres:***@localhost:5432/wbemployee 4 rows affected.
code | name | budget |
---|---|---|
14 | IT | 65000 |
37 | Accounting | 15000 |
59 | Human Resources | 240000 |
77 | Research | 55000 |
dfd = pd.DataFrame({'code': [14, 37, 59, 77],
'name': ['IT', 'Accounting', 'Human Resources', 'Research'],
'budget': [65000, 15000, 240000, 55000]})
dfd
code | name | budget | |
---|---|---|---|
0 | 14 | IT | 65000 |
1 | 37 | Accounting | 15000 |
2 | 59 | Human Resources | 240000 |
3 | 77 | Research | 55000 |
dfd.dtypes
code int64 name object budget int64 dtype: object
schema = StructType([
StructField('code',IntegerType(),True),
StructField('name',StringType(),True),
StructField('budget',IntegerType(),True),
])
sdfd = sqlContext.createDataFrame(dfd, schema)
sdfd.show()
+----+---------------+------+ |code| name|budget| +----+---------------+------+ | 14| IT| 65000| | 37| Accounting| 15000| | 59|Human Resources|240000| | 77| Research| 55000| +----+---------------+------+
%%sql
SELECT *
FROM Employees;
* postgres://postgres:***@localhost:5432/wbemployee 12 rows affected.
ssn | name | lastname | department |
---|---|---|---|
123234877 | Michael | Rogers | 14 |
152934485 | Anand | Manikutty | 14 |
222364883 | Carol | Smith | 37 |
326587417 | Joe | Stevens | 37 |
332154719 | Mary-Anne | Foster | 14 |
332569843 | George | O'Donnell | 77 |
546523478 | John | Doe | 59 |
631231482 | David | Smith | 77 |
654873219 | Zacary | Efron | 59 |
745685214 | Eric | Goldsmith | 59 |
845657245 | Elizabeth | Doe | 14 |
845657246 | Kumar | Swamy | 14 |
dfe = pd.DataFrame({'ssn': [123234877, 152934485, 222364883, 326587417, 332154719, 332569843, 546523478, 631231482, 654873219, 745685214, 845657245, 845657246],
'name': ['Michael', 'Anp.nand', 'Carol', 'Joe', 'Mary-Anne', 'George', 'John', 'David', 'Zacary', 'Eric', 'Elizabeth', 'Kumar'],
'lastname': ['Rogers', 'Manikutty', 'Smith', 'Stevens', 'Foster', "O'Donnell", 'Doe', 'Smith', 'Efron', 'Goldsmith', 'Doe', 'Swamy'],
'department': [14, 14, 37, 37, 14, 77, 59, 77, 59, 59, 14, 14]})
dfe
ssn | name | lastname | department | |
---|---|---|---|---|
0 | 123234877 | Michael | Rogers | 14 |
1 | 152934485 | Anp.nand | Manikutty | 14 |
2 | 222364883 | Carol | Smith | 37 |
3 | 326587417 | Joe | Stevens | 37 |
4 | 332154719 | Mary-Anne | Foster | 14 |
5 | 332569843 | George | O'Donnell | 77 |
6 | 546523478 | John | Doe | 59 |
7 | 631231482 | David | Smith | 77 |
8 | 654873219 | Zacary | Efron | 59 |
9 | 745685214 | Eric | Goldsmith | 59 |
10 | 845657245 | Elizabeth | Doe | 14 |
11 | 845657246 | Kumar | Swamy | 14 |
dfe.dtypes
ssn int64 name object lastname object department int64 dtype: object
schema = StructType([
StructField('ssn',IntegerType(),True),
StructField('name',StringType(),True),
StructField('lastname',StringType(),True),
StructField('department',IntegerType(),True),
])
sdfe = sqlContext.createDataFrame(dfe, schema)
sdfe.show()
+---------+---------+---------+----------+ | ssn| name| lastname|department| +---------+---------+---------+----------+ |123234877| Michael| Rogers| 14| |152934485| Anp.nand|Manikutty| 14| |222364883| Carol| Smith| 37| |326587417| Joe| Stevens| 37| |332154719|Mary-Anne| Foster| 14| |332569843| George|O'Donnell| 77| |546523478| John| Doe| 59| |631231482| David| Smith| 77| |654873219| Zacary| Efron| 59| |745685214| Eric|Goldsmith| 59| |845657245|Elizabeth| Doe| 14| |845657246| Kumar| Swamy| 14| +---------+---------+---------+----------+
df = dfd.merge(dfe,left_on='code', right_on='department',suffixes=['_dept','_emp'])
df
code | name_dept | budget | ssn | name_emp | lastname | department | |
---|---|---|---|---|---|---|---|
0 | 14 | IT | 65000 | 123234877 | Michael | Rogers | 14 |
1 | 14 | IT | 65000 | 152934485 | Anp.nand | Manikutty | 14 |
2 | 14 | IT | 65000 | 332154719 | Mary-Anne | Foster | 14 |
3 | 14 | IT | 65000 | 845657245 | Elizabeth | Doe | 14 |
4 | 14 | IT | 65000 | 845657246 | Kumar | Swamy | 14 |
5 | 37 | Accounting | 15000 | 222364883 | Carol | Smith | 37 |
6 | 37 | Accounting | 15000 | 326587417 | Joe | Stevens | 37 |
7 | 59 | Human Resources | 240000 | 546523478 | John | Doe | 59 |
8 | 59 | Human Resources | 240000 | 654873219 | Zacary | Efron | 59 |
9 | 59 | Human Resources | 240000 | 745685214 | Eric | Goldsmith | 59 |
10 | 77 | Research | 55000 | 332569843 | George | O'Donnell | 77 |
11 | 77 | Research | 55000 | 631231482 | David | Smith | 77 |
import functools
sdfd.show(2)
+----+----------+------+ |code| name|budget| +----+----------+------+ | 14| IT| 65000| | 37|Accounting| 15000| +----+----------+------+ only showing top 2 rows
sdfe.show(2)
+---------+--------+---------+----------+ | ssn| name| lastname|department| +---------+--------+---------+----------+ |123234877| Michael| Rogers| 14| |152934485|Anp.nand|Manikutty| 14| +---------+--------+---------+----------+ only showing top 2 rows
# we will merge departments.code with employee.department
# but there is a problem, both dataframes have some common columns.
cols_dept = sdfd.columns
cols_emp = sdfe.columns
cols_common = set(cols_dept) & set(cols_emp)
cols_dept, cols_emp, cols_common
(['code', 'name', 'budget'], ['ssn', 'name', 'lastname', 'department'], {'name'})
# rename common columns of first spark dataframe.
oldColumns = list(cols_common)
newColumns = [i + '_dept' for i in oldColumns]
sdfd2 = functools.reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx],
newColumns[idx]),
range(len(oldColumns)),
sdfd
)
sdfd2.show(2)
+----+----------+------+ |code| name_dept|budget| +----+----------+------+ | 14| IT| 65000| | 37|Accounting| 15000| +----+----------+------+ only showing top 2 rows
# rename common columns of second spark dataframe.
oldColumns = list(cols_common)
newColumns = [i + '_emp' for i in oldColumns]
sdfe2 = functools.reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx],
newColumns[idx]),
range(len(oldColumns)),
sdfe
)
sdfe2.show(2)
+---------+--------+---------+----------+ | ssn|name_emp| lastname|department| +---------+--------+---------+----------+ |123234877| Michael| Rogers| 14| |152934485|Anp.nand|Manikutty| 14| +---------+--------+---------+----------+ only showing top 2 rows
# now join two spark dataframes which have different column names.
sdf = sdfd2.join(sdfe2,sdfd2.code==sdfe2.department)
sdf.show()
+----+---------------+------+---------+---------+---------+----------+ |code| name_dept|budget| ssn| name_emp| lastname|department| +----+---------------+------+---------+---------+---------+----------+ | 37| Accounting| 15000|222364883| Carol| Smith| 37| | 37| Accounting| 15000|326587417| Joe| Stevens| 37| | 59|Human Resources|240000|546523478| John| Doe| 59| | 59|Human Resources|240000|654873219| Zacary| Efron| 59| | 59|Human Resources|240000|745685214| Eric|Goldsmith| 59| | 77| Research| 55000|332569843| George|O'Donnell| 77| | 77| Research| 55000|631231482| David| Smith| 77| | 14| IT| 65000|123234877| Michael| Rogers| 14| | 14| IT| 65000|152934485| Anp.nand|Manikutty| 14| | 14| IT| 65000|332154719|Mary-Anne| Foster| 14| | 14| IT| 65000|845657245|Elizabeth| Doe| 14| | 14| IT| 65000|845657246| Kumar| Swamy| 14| +----+---------------+------+---------+---------+---------+----------+
bp.show_methods(sdf,4)
# count, fillna, filter, where, groubpy, first, corr,
# missing than pandas: max, min, sum, etc.
0 | 1 | 2 | 3 | |
---|---|---|---|---|
0 | agg | drop_duplicates | localCheckpoint | sql_ctx |
1 | alias | dropna | mapInPandas | stat |
2 | approxQuantile | dtypes | na | storageLevel |
3 | cache | exceptAll | orderBy | subtract |
4 | checkpoint | explain | persist | summary |
5 | coalesce | fillna | printSchema | tail |
6 | colRegex | filter | randomSplit | take |
7 | collect | first | rdd | toDF |
8 | columns | foreach | registerTempTable | toJSON |
9 | corr | foreachPartition | repartition | toLocalIterator |
10 | count | freqItems | repartitionByRange | toPandas |
11 | cov | groupBy | replace | transform |
12 | createGlobalTempView | groupby | rollup | union |
13 | createOrReplaceGlobalTempView | head | sameSemantics | unionAll |
14 | createOrReplaceTempView | hint | sample | unionByName |
15 | createTempView | inputFiles | sampleBy | unpersist |
16 | crossJoin | intersect | schema | where |
17 | crosstab | intersectAll | select | withColumn |
18 | cube | isLocal | selectExpr | withColumnRenamed |
19 | describe | isStreaming | semanticHash | withWatermark |
20 | distinct | is_cached | show | write |
21 | drop | join | sort | writeStream |
22 | dropDuplicates | limit | sortWithinPartitions | writeTo |
Global Temporary View
Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database global_temp, and we must use the qualified name to refer it, e.g. SELECT * FROM global_temp.view1
.
# Register the DataFrame as a SQL temporary view
sdfd.createOrReplaceTempView("Departments")
sdfe.createOrReplaceTempView("Employees")
spark.sql('select * from Departments limit 2').show()
+----+----------+------+ |code| name|budget| +----+----------+------+ | 14| IT| 65000| | 37|Accounting| 15000| +----+----------+------+
dfe[['lastname']]
lastname | |
---|---|
0 | Rogers |
1 | Manikutty |
2 | Smith |
3 | Stevens |
4 | Foster |
5 | O'Donnell |
6 | Doe |
7 | Smith |
8 | Efron |
9 | Goldsmith |
10 | Doe |
11 | Swamy |
%%sql
SELECT lastname
FROM Employees;
* postgres://postgres:***@localhost:5432/wbemployee 12 rows affected.
lastname |
---|
Rogers |
Manikutty |
Smith |
Stevens |
Foster |
O'Donnell |
Doe |
Smith |
Efron |
Goldsmith |
Doe |
Swamy |
sdfe[['lastname']].show(2)
+---------+ | lastname| +---------+ | Rogers| |Manikutty| +---------+ only showing top 2 rows
dfe[['lastname']].drop_duplicates()
lastname | |
---|---|
0 | Rogers |
1 | Manikutty |
2 | Smith |
3 | Stevens |
4 | Foster |
5 | O'Donnell |
6 | Doe |
8 | Efron |
9 | Goldsmith |
11 | Swamy |
%%sql
SELECT DISTINCT lastname
FROM Employees;
* postgres://postgres:***@localhost:5432/wbemployee 10 rows affected.
lastname |
---|
Doe |
O'Donnell |
Rogers |
Foster |
Smith |
Goldsmith |
Stevens |
Efron |
Swamy |
Manikutty |
sdfe[['lastname']].drop_duplicates().show()
+---------+ | lastname| +---------+ |Goldsmith| | Smith| | Foster| | Doe| | Rogers| | Efron| | Stevens| | Swamy| |O'Donnell| |Manikutty| +---------+
dfe[dfe.lastname=='Smith']
ssn | name | lastname | department | |
---|---|---|---|---|
2 | 222364883 | Carol | Smith | 37 |
7 | 631231482 | David | Smith | 77 |
%%sql
SELECT *
FROM Employees
WHERE lastname = 'Smith'
* postgres://postgres:***@localhost:5432/wbemployee 2 rows affected.
ssn | name | lastname | department |
---|---|---|---|
222364883 | Carol | Smith | 37 |
631231482 | David | Smith | 77 |
sdfe[sdfe.lastname=='Smith'].show()
+---------+-----+--------+----------+ | ssn| name|lastname|department| +---------+-----+--------+----------+ |222364883|Carol| Smith| 37| |631231482|David| Smith| 77| +---------+-----+--------+----------+
dfe[ (dfe.lastname=='Smith') | (dfe.lastname=='Doe')]
ssn | name | lastname | department | |
---|---|---|---|---|
2 | 222364883 | Carol | Smith | 37 |
6 | 546523478 | John | Doe | 59 |
7 | 631231482 | David | Smith | 77 |
10 | 845657245 | Elizabeth | Doe | 14 |
%%sql
SELECT *
FROM Employees
WHERE lastname = 'Smith'
OR lastname = 'Doe'
* postgres://postgres:***@localhost:5432/wbemployee 4 rows affected.
ssn | name | lastname | department |
---|---|---|---|
222364883 | Carol | Smith | 37 |
546523478 | John | Doe | 59 |
631231482 | David | Smith | 77 |
845657245 | Elizabeth | Doe | 14 |
%%sql
SELECT *
FROM Employees
WHERE lastname IN ('Smith',
'Doe')
* postgres://postgres:***@localhost:5432/wbemployee 4 rows affected.
ssn | name | lastname | department |
---|---|---|---|
222364883 | Carol | Smith | 37 |
546523478 | John | Doe | 59 |
631231482 | David | Smith | 77 |
845657245 | Elizabeth | Doe | 14 |
sdfe[ (sdfe.lastname=='Smith') | (sdfe.lastname=='Doe')].show()
+---------+---------+--------+----------+ | ssn| name|lastname|department| +---------+---------+--------+----------+ |222364883| Carol| Smith| 37| |546523478| John| Doe| 59| |631231482| David| Smith| 77| |845657245|Elizabeth| Doe| 14| +---------+---------+--------+----------+
dfe[dfe.department==14]
ssn | name | lastname | department | |
---|---|---|---|---|
0 | 123234877 | Michael | Rogers | 14 |
1 | 152934485 | Anp.nand | Manikutty | 14 |
4 | 332154719 | Mary-Anne | Foster | 14 |
10 | 845657245 | Elizabeth | Doe | 14 |
11 | 845657246 | Kumar | Swamy | 14 |
%%sql
SELECT *
FROM Employees
WHERE department= 14
* postgres://postgres:***@localhost:5432/wbemployee 5 rows affected.
ssn | name | lastname | department |
---|---|---|---|
123234877 | Michael | Rogers | 14 |
152934485 | Anand | Manikutty | 14 |
332154719 | Mary-Anne | Foster | 14 |
845657245 | Elizabeth | Doe | 14 |
845657246 | Kumar | Swamy | 14 |
sdfe[sdfe.department==14].show()
+---------+---------+---------+----------+ | ssn| name| lastname|department| +---------+---------+---------+----------+ |123234877| Michael| Rogers| 14| |152934485| Anp.nand|Manikutty| 14| |332154719|Mary-Anne| Foster| 14| |845657245|Elizabeth| Doe| 14| |845657246| Kumar| Swamy| 14| +---------+---------+---------+----------+
dfe[(dfe.department==37) | (dfe.department==77)]
ssn | name | lastname | department | |
---|---|---|---|---|
2 | 222364883 | Carol | Smith | 37 |
3 | 326587417 | Joe | Stevens | 37 |
5 | 332569843 | George | O'Donnell | 77 |
7 | 631231482 | David | Smith | 77 |
dfe[dfe.department.isin([37,77])]
ssn | name | lastname | department | |
---|---|---|---|---|
2 | 222364883 | Carol | Smith | 37 |
3 | 326587417 | Joe | Stevens | 37 |
5 | 332569843 | George | O'Donnell | 77 |
7 | 631231482 | David | Smith | 77 |
dfe.query('department in [37,77]')
ssn | name | lastname | department | |
---|---|---|---|---|
2 | 222364883 | Carol | Smith | 37 |
3 | 326587417 | Joe | Stevens | 37 |
5 | 332569843 | George | O'Donnell | 77 |
7 | 631231482 | David | Smith | 77 |
%%sql
SELECT *
FROM Employees
WHERE department=37
OR department=77
* postgres://postgres:***@localhost:5432/wbemployee 4 rows affected.
ssn | name | lastname | department |
---|---|---|---|
222364883 | Carol | Smith | 37 |
326587417 | Joe | Stevens | 37 |
332569843 | George | O'Donnell | 77 |
631231482 | David | Smith | 77 |
sdfe[(sdfe.department==37) | (sdfe.department==77)].show()
+---------+------+---------+----------+ | ssn| name| lastname|department| +---------+------+---------+----------+ |222364883| Carol| Smith| 37| |326587417| Joe| Stevens| 37| |332569843|George|O'Donnell| 77| |631231482| David| Smith| 77| +---------+------+---------+----------+
sdfe[ sdfe.department.isin(37,77)].show()
+---------+------+---------+----------+ | ssn| name| lastname|department| +---------+------+---------+----------+ |222364883| Carol| Smith| 37| |326587417| Joe| Stevens| 37| |332569843|George|O'Donnell| 77| |631231482| David| Smith| 77| +---------+------+---------+----------+
dfe[dfe.lastname.str.startswith("S")]
ssn | name | lastname | department | |
---|---|---|---|---|
2 | 222364883 | Carol | Smith | 37 |
3 | 326587417 | Joe | Stevens | 37 |
7 | 631231482 | David | Smith | 77 |
11 | 845657246 | Kumar | Swamy | 14 |
%%sql
SELECT *
FROM Employees
WHERE lastname like 'S%'
* postgres://postgres:***@localhost:5432/wbemployee 4 rows affected.
ssn | name | lastname | department |
---|---|---|---|
222364883 | Carol | Smith | 37 |
326587417 | Joe | Stevens | 37 |
631231482 | David | Smith | 77 |
845657246 | Kumar | Swamy | 14 |
sdfe.filter(_col('lastname').startswith('S')).show()
+---------+-----+--------+----------+ | ssn| name|lastname|department| +---------+-----+--------+----------+ |222364883|Carol| Smith| 37| |326587417| Joe| Stevens| 37| |631231482|David| Smith| 77| |845657246|Kumar| Swamy| 14| +---------+-----+--------+----------+
dfd
code | name | budget | |
---|---|---|---|
0 | 14 | IT | 65000 |
1 | 37 | Accounting | 15000 |
2 | 59 | Human Resources | 240000 |
3 | 77 | Research | 55000 |
dfd.budget.sum()
375000
%%sql
SELECT sum(budget)
FROM Departments;
* postgres://postgres:***@localhost:5432/wbemployee 1 rows affected.
sum |
---|
375000 |
sdfd.select(_max(_col('budget'))).show()
+-----------+ |max(budget)| +-----------+ | 240000| +-----------+
sdfd.agg({'budget':'max'}).show()
+-----------+ |max(budget)| +-----------+ | 240000| +-----------+
sdfd.agg({'budget':'max'}).collect()[0][0]
240000
dfe
ssn | name | lastname | department | |
---|---|---|---|---|
0 | 123234877 | Michael | Rogers | 14 |
1 | 152934485 | Anp.nand | Manikutty | 14 |
2 | 222364883 | Carol | Smith | 37 |
3 | 326587417 | Joe | Stevens | 37 |
4 | 332154719 | Mary-Anne | Foster | 14 |
5 | 332569843 | George | O'Donnell | 77 |
6 | 546523478 | John | Doe | 59 |
7 | 631231482 | David | Smith | 77 |
8 | 654873219 | Zacary | Efron | 59 |
9 | 745685214 | Eric | Goldsmith | 59 |
10 | 845657245 | Elizabeth | Doe | 14 |
11 | 845657246 | Kumar | Swamy | 14 |
dfe.groupby('department')[['ssn']].count()
ssn | |
---|---|
department | |
14 | 5 |
37 | 2 |
59 | 3 |
77 | 2 |
%%sql
SELECT department,
count(*)
FROM Employees
GROUP BY department
* postgres://postgres:***@localhost:5432/wbemployee 4 rows affected.
department | count |
---|---|
14 | 5 |
59 | 3 |
77 | 2 |
37 | 2 |
sdfe.groupby('department').count().show()
+----------+-----+ |department|count| +----------+-----+ | 37| 2| | 59| 3| | 77| 2| | 14| 5| +----------+-----+
sdfe.groupBy('department').count()\
.select('department', _col('count').alias('n')).show()
+----------+---+ |department| n| +----------+---+ | 37| 2| | 59| 3| | 77| 2| | 14| 5| +----------+---+
sdfe.show(2)
+---------+--------+---------+----------+ | ssn| name| lastname|department| +---------+--------+---------+----------+ |123234877| Michael| Rogers| 14| |152934485|Anp.nand|Manikutty| 14| +---------+--------+---------+----------+ only showing top 2 rows
df
code | name_dept | budget | ssn | name_emp | lastname | department | |
---|---|---|---|---|---|---|---|
0 | 14 | IT | 65000 | 123234877 | Michael | Rogers | 14 |
1 | 14 | IT | 65000 | 152934485 | Anp.nand | Manikutty | 14 |
2 | 14 | IT | 65000 | 332154719 | Mary-Anne | Foster | 14 |
3 | 14 | IT | 65000 | 845657245 | Elizabeth | Doe | 14 |
4 | 14 | IT | 65000 | 845657246 | Kumar | Swamy | 14 |
5 | 37 | Accounting | 15000 | 222364883 | Carol | Smith | 37 |
6 | 37 | Accounting | 15000 | 326587417 | Joe | Stevens | 37 |
7 | 59 | Human Resources | 240000 | 546523478 | John | Doe | 59 |
8 | 59 | Human Resources | 240000 | 654873219 | Zacary | Efron | 59 |
9 | 59 | Human Resources | 240000 | 745685214 | Eric | Goldsmith | 59 |
10 | 77 | Research | 55000 | 332569843 | George | O'Donnell | 77 |
11 | 77 | Research | 55000 | 631231482 | David | Smith | 77 |
%%sql
SELECT *
FROM Departments D,
Employees E
WHERE D.code = E.department
* postgres://postgres:***@localhost:5432/wbemployee 12 rows affected.
code | name | budget | ssn | name_1 | lastname | department |
---|---|---|---|---|---|---|
14 | IT | 65000 | 845657246 | Kumar | Swamy | 14 |
14 | IT | 65000 | 845657245 | Elizabeth | Doe | 14 |
14 | IT | 65000 | 332154719 | Mary-Anne | Foster | 14 |
14 | IT | 65000 | 152934485 | Anand | Manikutty | 14 |
14 | IT | 65000 | 123234877 | Michael | Rogers | 14 |
37 | Accounting | 15000 | 326587417 | Joe | Stevens | 37 |
37 | Accounting | 15000 | 222364883 | Carol | Smith | 37 |
59 | Human Resources | 240000 | 745685214 | Eric | Goldsmith | 59 |
59 | Human Resources | 240000 | 654873219 | Zacary | Efron | 59 |
59 | Human Resources | 240000 | 546523478 | John | Doe | 59 |
77 | Research | 55000 | 631231482 | David | Smith | 77 |
77 | Research | 55000 | 332569843 | George | O'Donnell | 77 |
# I have already created sdf
sdf.show()
+----+---------------+------+---------+---------+---------+----------+ |code| name_dept|budget| ssn| name_emp| lastname|department| +----+---------------+------+---------+---------+---------+----------+ | 37| Accounting| 15000|222364883| Carol| Smith| 37| | 37| Accounting| 15000|326587417| Joe| Stevens| 37| | 59|Human Resources|240000|546523478| John| Doe| 59| | 59|Human Resources|240000|654873219| Zacary| Efron| 59| | 59|Human Resources|240000|745685214| Eric|Goldsmith| 59| | 77| Research| 55000|332569843| George|O'Donnell| 77| | 77| Research| 55000|631231482| David| Smith| 77| | 14| IT| 65000|123234877| Michael| Rogers| 14| | 14| IT| 65000|152934485| Anp.nand|Manikutty| 14| | 14| IT| 65000|332154719|Mary-Anne| Foster| 14| | 14| IT| 65000|845657245|Elizabeth| Doe| 14| | 14| IT| 65000|845657246| Kumar| Swamy| 14| +----+---------------+------+---------+---------+---------+----------+
df.head(2)
code | name_dept | budget | ssn | name_emp | lastname | department | |
---|---|---|---|---|---|---|---|
0 | 14 | IT | 65000 | 123234877 | Michael | Rogers | 14 |
1 | 14 | IT | 65000 | 152934485 | Anp.nand | Manikutty | 14 |
df[['name_emp','lastname','name_dept','budget']]
# df.filter(['name_emp','lastname','name_dept','budget'])
name_emp | lastname | name_dept | budget | |
---|---|---|---|---|
0 | Michael | Rogers | IT | 65000 |
1 | Anp.nand | Manikutty | IT | 65000 |
2 | Mary-Anne | Foster | IT | 65000 |
3 | Elizabeth | Doe | IT | 65000 |
4 | Kumar | Swamy | IT | 65000 |
5 | Carol | Smith | Accounting | 15000 |
6 | Joe | Stevens | Accounting | 15000 |
7 | John | Doe | Human Resources | 240000 |
8 | Zacary | Efron | Human Resources | 240000 |
9 | Eric | Goldsmith | Human Resources | 240000 |
10 | George | O'Donnell | Research | 55000 |
11 | David | Smith | Research | 55000 |
%%sql
SELECT *
FROM Employees
LIMIT 2;
* postgres://postgres:***@localhost:5432/wbemployee 2 rows affected.
ssn | name | lastname | department |
---|---|---|---|
123234877 | Michael | Rogers | 14 |
152934485 | Anand | Manikutty | 14 |
%%sql
SELECT *
FROM Departments
LIMIT 2;
* postgres://postgres:***@localhost:5432/wbemployee 2 rows affected.
code | name | budget |
---|---|---|
14 | IT | 65000 |
37 | Accounting | 15000 |
%%sql
SELECT e.name AS name_emp,
e.lastname AS lastname,
d.name AS name_dep,
d.budget AS budget
FROM Employees e
INNER JOIN Departments d ON e.department = d.code
* postgres://postgres:***@localhost:5432/wbemployee 12 rows affected.
name_emp | lastname | name_dep | budget |
---|---|---|---|
Kumar | Swamy | IT | 65000 |
Elizabeth | Doe | IT | 65000 |
Mary-Anne | Foster | IT | 65000 |
Anand | Manikutty | IT | 65000 |
Michael | Rogers | IT | 65000 |
Joe | Stevens | Accounting | 15000 |
Carol | Smith | Accounting | 15000 |
Eric | Goldsmith | Human Resources | 240000 |
Zacary | Efron | Human Resources | 240000 |
John | Doe | Human Resources | 240000 |
David | Smith | Research | 55000 |
George | O'Donnell | Research | 55000 |
sdf.show(2)
+----+----------+------+---------+--------+--------+----------+ |code| name_dept|budget| ssn|name_emp|lastname|department| +----+----------+------+---------+--------+--------+----------+ | 37|Accounting| 15000|222364883| Carol| Smith| 37| | 37|Accounting| 15000|326587417| Joe| Stevens| 37| +----+----------+------+---------+--------+--------+----------+ only showing top 2 rows
sdf[['name_emp', 'lastname','name_dept','budget']].show()
+---------+---------+---------------+------+ | name_emp| lastname| name_dept|budget| +---------+---------+---------------+------+ | Carol| Smith| Accounting| 15000| | Joe| Stevens| Accounting| 15000| | John| Doe|Human Resources|240000| | Zacary| Efron|Human Resources|240000| | Eric|Goldsmith|Human Resources|240000| | George|O'Donnell| Research| 55000| | David| Smith| Research| 55000| | Michael| Rogers| IT| 65000| | Anp.nand|Manikutty| IT| 65000| |Mary-Anne| Foster| IT| 65000| |Elizabeth| Doe| IT| 65000| | Kumar| Swamy| IT| 65000| +---------+---------+---------------+------+
# sdf.select(['name_emp', 'lastname','name_dept','budget']).show()
(sdfe.selectExpr('name as name_emp', 'lastname', 'department').alias('E')
.join(
sdfd.selectExpr('name as name_dept','budget','code').alias('D'),
_col('E.department') == _col('D.code')
)
.drop('department','code')
.show()
)
+---------+---------+---------------+------+ | name_emp| lastname| name_dept|budget| +---------+---------+---------------+------+ | Carol| Smith| Accounting| 15000| | Joe| Stevens| Accounting| 15000| | John| Doe|Human Resources|240000| | Zacary| Efron|Human Resources|240000| | Eric|Goldsmith|Human Resources|240000| | George|O'Donnell| Research| 55000| | David| Smith| Research| 55000| | Michael| Rogers| IT| 65000| | Anp.nand|Manikutty| IT| 65000| |Mary-Anne| Foster| IT| 65000| |Elizabeth| Doe| IT| 65000| | Kumar| Swamy| IT| 65000| +---------+---------+---------------+------+
spark.sql("""
select e.name as name_emp, e.lastname as lastname,
d.name as name_dep, d.budget as budget
from Employees e inner join Departments d
on e.department = d.code
"""
).show()
+---------+---------+---------------+------+ | name_emp| lastname| name_dep|budget| +---------+---------+---------------+------+ | Carol| Smith| Accounting| 15000| | Joe| Stevens| Accounting| 15000| | John| Doe|Human Resources|240000| | Zacary| Efron|Human Resources|240000| | Eric|Goldsmith|Human Resources|240000| | George|O'Donnell| Research| 55000| | David| Smith| Research| 55000| | Michael| Rogers| IT| 65000| | Anp.nand|Manikutty| IT| 65000| |Mary-Anne| Foster| IT| 65000| |Elizabeth| Doe| IT| 65000| | Kumar| Swamy| IT| 65000| +---------+---------+---------------+------+
dfe.head(2)
ssn | name | lastname | department | |
---|---|---|---|---|
0 | 123234877 | Michael | Rogers | 14 |
1 | 152934485 | Anp.nand | Manikutty | 14 |
dfd.head(2)
code | name | budget | |
---|---|---|---|
0 | 14 | IT | 65000 |
1 | 37 | Accounting | 15000 |
df.head(1)
code | name_dept | budget | ssn | name_emp | lastname | department | |
---|---|---|---|---|---|---|---|
0 | 14 | IT | 65000 | 123234877 | Michael | Rogers | 14 |
df[df.budget > 60000][['name_emp','lastname']]
name_emp | lastname | |
---|---|---|
0 | Michael | Rogers |
1 | Anp.nand | Manikutty |
2 | Mary-Anne | Foster |
3 | Elizabeth | Doe |
4 | Kumar | Swamy |
7 | John | Doe |
8 | Zacary | Efron |
9 | Eric | Goldsmith |
%%sql
SELECT e.name AS name_emp,
e.lastname AS lastname
FROM Employees e
INNER JOIN Departments d ON e.department = d.code
WHERE d.budget > 60000
* postgres://postgres:***@localhost:5432/wbemployee 8 rows affected.
name_emp | lastname |
---|---|
Michael | Rogers |
Anand | Manikutty |
Mary-Anne | Foster |
John | Doe |
Zacary | Efron |
Eric | Goldsmith |
Elizabeth | Doe |
Kumar | Swamy |
%%sql
SELECT e.name AS name_emp,
e.lastname AS lastname
FROM Employees e
INNER JOIN Departments d ON e.department = d.code
AND d.budget > 60000
* postgres://postgres:***@localhost:5432/wbemployee 8 rows affected.
name_emp | lastname |
---|---|
Michael | Rogers |
Anand | Manikutty |
Mary-Anne | Foster |
John | Doe |
Zacary | Efron |
Eric | Goldsmith |
Elizabeth | Doe |
Kumar | Swamy |
# using subquery
%%sql
SELECT name,
lastname
FROM Employees
WHERE (department IN
(SELECT code
FROM Departments
WHERE budget > 60000 ))
* postgres://postgres:***@localhost:5432/wbemployee 8 rows affected.
name | lastname |
---|---|
Michael | Rogers |
Anand | Manikutty |
Mary-Anne | Foster |
John | Doe |
Zacary | Efron |
Eric | Goldsmith |
Elizabeth | Doe |
Kumar | Swamy |
%%sql
SELECT name,
lastname
FROM Employees e
WHERE EXISTS
(SELECT *
FROM Departments d
WHERE budget > 60000
AND e.department = d.code )
* postgres://postgres:***@localhost:5432/wbemployee 8 rows affected.
name | lastname |
---|---|
Michael | Rogers |
Anand | Manikutty |
Mary-Anne | Foster |
John | Doe |
Zacary | Efron |
Eric | Goldsmith |
Elizabeth | Doe |
Kumar | Swamy |
%%sql
SELECT *
FROM Employees
LIMIT 1;
* postgres://postgres:***@localhost:5432/wbemployee 1 rows affected.
ssn | name | lastname | department |
---|---|---|---|
123234877 | Michael | Rogers | 14 |
sdf[sdf.budget > 60000][['name_emp','lastname']].show()
+---------+---------+ | name_emp| lastname| +---------+---------+ | John| Doe| | Zacary| Efron| | Eric|Goldsmith| | Michael| Rogers| | Anp.nand|Manikutty| |Mary-Anne| Foster| |Elizabeth| Doe| | Kumar| Swamy| +---------+---------+
dfd
code | name | budget | |
---|---|---|---|
0 | 14 | IT | 65000 |
1 | 37 | Accounting | 15000 |
2 | 59 | Human Resources | 240000 |
3 | 77 | Research | 55000 |
dfd.budget.mean()
93750.0
dfd[dfd.budget > dfd.budget.mean()]
code | name | budget | |
---|---|---|---|
2 | 59 | Human Resources | 240000 |
%%sql
SELECT *
FROM Departments
WHERE budget >
(SELECT avg(budget)
FROM Departments)
* postgres://postgres:***@localhost:5432/wbemployee 1 rows affected.
code | name | budget |
---|---|---|
59 | Human Resources | 240000 |
sdfd.agg({'budget':'mean'}).show()
+-----------+ |avg(budget)| +-----------+ | 93750.0| +-----------+
# fails: dfd[dfd.budget > sdfd.agg({'budget':'mean'})]
dfd[dfd.budget > sdfd.agg({'budget':'mean'}).collect()[0][0]]
code | name | budget | |
---|---|---|---|
2 | 59 | Human Resources | 240000 |
# dept + emp is df
df
code | name_dept | budget | ssn | name_emp | lastname | department | |
---|---|---|---|---|---|---|---|
0 | 14 | IT | 65000 | 123234877 | Michael | Rogers | 14 |
1 | 14 | IT | 65000 | 152934485 | Anp.nand | Manikutty | 14 |
2 | 14 | IT | 65000 | 332154719 | Mary-Anne | Foster | 14 |
3 | 14 | IT | 65000 | 845657245 | Elizabeth | Doe | 14 |
4 | 14 | IT | 65000 | 845657246 | Kumar | Swamy | 14 |
5 | 37 | Accounting | 15000 | 222364883 | Carol | Smith | 37 |
6 | 37 | Accounting | 15000 | 326587417 | Joe | Stevens | 37 |
7 | 59 | Human Resources | 240000 | 546523478 | John | Doe | 59 |
8 | 59 | Human Resources | 240000 | 654873219 | Zacary | Efron | 59 |
9 | 59 | Human Resources | 240000 | 745685214 | Eric | Goldsmith | 59 |
10 | 77 | Research | 55000 | 332569843 | George | O'Donnell | 77 |
11 | 77 | Research | 55000 | 631231482 | David | Smith | 77 |
# df.groupby('department').count() # gives all columns
df.groupby('name_dept')[['code']]\
.count()\
.loc[lambda dfx: dfx['code'] > 2]\
.reset_index().drop('code',axis=1)
name_dept | |
---|---|
0 | Human Resources |
1 | IT |
df.groupby('name_dept')\
.filter(lambda dfx: dfx['code'].count() > 2)\
.drop_duplicates('name_dept')\
[['name_dept']]
name_dept | |
---|---|
0 | IT |
7 | Human Resources |
df.groupby('department')\
.filter(lambda dfx: all(dfx.count()> 2))\
[['name_dept']]\
.drop_duplicates()
name_dept | |
---|---|
0 | IT |
7 | Human Resources |
df [ df.groupby('department')['code'].transform('count') > 2 ][['name_dept']]\
.drop_duplicates()
name_dept | |
---|---|
0 | IT |
7 | Human Resources |
%%sql
SELECT *
FROM Departments
LIMIT 2
* postgres://postgres:***@localhost:5432/wbemployee 2 rows affected.
code | name | budget |
---|---|---|
14 | IT | 65000 |
37 | Accounting | 15000 |
%%sql
SELECT *
FROM Employees
LIMIT 2;
* postgres://postgres:***@localhost:5432/wbemployee 2 rows affected.
ssn | name | lastname | department |
---|---|---|---|
123234877 | Michael | Rogers | 14 |
152934485 | Anand | Manikutty | 14 |
%%sql
SELECT d.name
FROM Departments d
INNER JOIN Employees e ON d.code = e.department
GROUP BY d.name
HAVING count(*) > 2
* postgres://postgres:***@localhost:5432/wbemployee 2 rows affected.
name |
---|
Human Resources |
IT |
%%sql
SELECT name
FROM Departments
WHERE code IN
(SELECT department
FROM Employees
GROUP BY department
HAVING count(*) > 2)
* postgres://postgres:***@localhost:5432/wbemployee 2 rows affected.
name |
---|
IT |
Human Resources |
# spark does not have transform, i need to create new column.
sdf.show()
+----+---------------+------+---------+---------+---------+----------+ |code| name_dept|budget| ssn| name_emp| lastname|department| +----+---------------+------+---------+---------+---------+----------+ | 37| Accounting| 15000|222364883| Carol| Smith| 37| | 37| Accounting| 15000|326587417| Joe| Stevens| 37| | 59|Human Resources|240000|546523478| John| Doe| 59| | 59|Human Resources|240000|654873219| Zacary| Efron| 59| | 59|Human Resources|240000|745685214| Eric|Goldsmith| 59| | 77| Research| 55000|332569843| George|O'Donnell| 77| | 77| Research| 55000|631231482| David| Smith| 77| | 14| IT| 65000|123234877| Michael| Rogers| 14| | 14| IT| 65000|152934485| Anp.nand|Manikutty| 14| | 14| IT| 65000|332154719|Mary-Anne| Foster| 14| | 14| IT| 65000|845657245|Elizabeth| Doe| 14| | 14| IT| 65000|845657246| Kumar| Swamy| 14| +----+---------------+------+---------+---------+---------+----------+
from pyspark.sql.window import Window
window = Window().partitionBy('department')
sdf = sdf.withColumn('dept_count', F.count('code').over(window))
sdf.show()
+----+---------------+------+---------+---------+---------+----------+----------+ |code| name_dept|budget| ssn| name_emp| lastname|department|dept_count| +----+---------------+------+---------+---------+---------+----------+----------+ | 37| Accounting| 15000|222364883| Carol| Smith| 37| 2| | 37| Accounting| 15000|326587417| Joe| Stevens| 37| 2| | 59|Human Resources|240000|546523478| John| Doe| 59| 3| | 59|Human Resources|240000|654873219| Zacary| Efron| 59| 3| | 59|Human Resources|240000|745685214| Eric|Goldsmith| 59| 3| | 77| Research| 55000|332569843| George|O'Donnell| 77| 2| | 77| Research| 55000|631231482| David| Smith| 77| 2| | 14| IT| 65000|123234877| Michael| Rogers| 14| 5| | 14| IT| 65000|152934485| Anp.nand|Manikutty| 14| 5| | 14| IT| 65000|332154719|Mary-Anne| Foster| 14| 5| | 14| IT| 65000|845657245|Elizabeth| Doe| 14| 5| | 14| IT| 65000|845657246| Kumar| Swamy| 14| 5| +----+---------------+------+---------+---------+---------+----------+----------+
sdf.filter(sdf.dept_count > 2).select('name_dept').drop_duplicates().show()
+---------------+ | name_dept| +---------------+ | IT| |Human Resources| +---------------+
sdf.groupby('name_dept').count().filter(F.col('count') > 2).drop('count').show()
+---------------+ | name_dept| +---------------+ | IT| |Human Resources| +---------------+
df
code | name_dept | budget | ssn | name_emp | lastname | department | |
---|---|---|---|---|---|---|---|
0 | 14 | IT | 65000 | 123234877 | Michael | Rogers | 14 |
1 | 14 | IT | 65000 | 152934485 | Anp.nand | Manikutty | 14 |
2 | 14 | IT | 65000 | 332154719 | Mary-Anne | Foster | 14 |
3 | 14 | IT | 65000 | 845657245 | Elizabeth | Doe | 14 |
4 | 14 | IT | 65000 | 845657246 | Kumar | Swamy | 14 |
5 | 37 | Accounting | 15000 | 222364883 | Carol | Smith | 37 |
6 | 37 | Accounting | 15000 | 326587417 | Joe | Stevens | 37 |
7 | 59 | Human Resources | 240000 | 546523478 | John | Doe | 59 |
8 | 59 | Human Resources | 240000 | 654873219 | Zacary | Efron | 59 |
9 | 59 | Human Resources | 240000 | 745685214 | Eric | Goldsmith | 59 |
10 | 77 | Research | 55000 | 332569843 | George | O'Donnell | 77 |
11 | 77 | Research | 55000 | 631231482 | David | Smith | 77 |
# df.groupby('department')['budget'].mean()
# df.drop_duplicates('department')
df['budget'].drop_duplicates().sort_values()
5 15000 10 55000 0 65000 7 240000 Name: budget, dtype: int64
df['budget'].drop_duplicates().sort_values().iloc[1]
55000
sec_lowest_budget = df['budget'].drop_duplicates().nsmallest(2).tail(1).to_numpy()[0]
sec_lowest_budget
55000
df.loc[df.budget==sec_lowest_budget, ['name_emp','lastname']]
name_emp | lastname | |
---|---|---|
10 | George | O'Donnell |
11 | David | Smith |
%%sql
SELECT *
FROM Employees
LIMIT 2;
* postgres://postgres:***@localhost:5432/wbemployee 2 rows affected.
ssn | name | lastname | department |
---|---|---|---|
123234877 | Michael | Rogers | 14 |
152934485 | Anand | Manikutty | 14 |
%%sql
SELECT *
FROM Departments
LIMIT 2;
* postgres://postgres:***@localhost:5432/wbemployee 2 rows affected.
code | name | budget |
---|---|---|
14 | IT | 65000 |
37 | Accounting | 15000 |
%%sql
SELECT min(budget)
FROM Departments
WHERE budget !=
(SELECT min(budget)
FROM Departments)
* postgres://postgres:***@localhost:5432/wbemployee 1 rows affected.
min |
---|
55000 |
%%sql
SELECT code
FROM Departments
WHERE budget =
(SELECT min(budget)
FROM Departments
WHERE budget !=
(SELECT min(budget)
FROM Departments) )
* postgres://postgres:***@localhost:5432/wbemployee 1 rows affected.
code |
---|
77 |
%%sql
SELECT name,
lastname
FROM Employees
WHERE department =
(SELECT code
FROM Departments
WHERE budget =
(SELECT min(budget)
FROM Departments
WHERE budget !=
(SELECT min(budget)
FROM Departments) ) )
* postgres://postgres:***@localhost:5432/wbemployee 2 rows affected.
name | lastname |
---|---|
George | O'Donnell |
David | Smith |
%%sql
SELECT *
FROM Departments d
ORDER BY d.budget
LIMIT 2
* postgres://postgres:***@localhost:5432/wbemployee 2 rows affected.
code | name | budget |
---|---|---|
37 | Accounting | 15000 |
77 | Research | 55000 |
%%sql
SELECT sub.code
FROM
(SELECT *
FROM Departments d
ORDER BY d.budget
LIMIT 2) sub
ORDER BY budget DESC
LIMIT 1
* postgres://postgres:***@localhost:5432/wbemployee 1 rows affected.
code |
---|
77 |
%%sql
SELECT e.name,
e.lastname
FROM Employees e
WHERE e.department =
(SELECT sub.code
FROM
(SELECT *
FROM Departments d
ORDER BY d.budget
LIMIT 2) sub
ORDER BY budget DESC
LIMIT 1)
* postgres://postgres:***@localhost:5432/wbemployee 2 rows affected.
name | lastname |
---|---|
George | O'Donnell |
David | Smith |
sdf.show()
+----+---------------+------+---------+---------+---------+----------+----------+ |code| name_dept|budget| ssn| name_emp| lastname|department|dept_count| +----+---------------+------+---------+---------+---------+----------+----------+ | 37| Accounting| 15000|222364883| Carol| Smith| 37| 2| | 37| Accounting| 15000|326587417| Joe| Stevens| 37| 2| | 59|Human Resources|240000|546523478| John| Doe| 59| 3| | 59|Human Resources|240000|654873219| Zacary| Efron| 59| 3| | 59|Human Resources|240000|745685214| Eric|Goldsmith| 59| 3| | 77| Research| 55000|332569843| George|O'Donnell| 77| 2| | 77| Research| 55000|631231482| David| Smith| 77| 2| | 14| IT| 65000|123234877| Michael| Rogers| 14| 5| | 14| IT| 65000|152934485| Anp.nand|Manikutty| 14| 5| | 14| IT| 65000|332154719|Mary-Anne| Foster| 14| 5| | 14| IT| 65000|845657245|Elizabeth| Doe| 14| 5| | 14| IT| 65000|845657246| Kumar| Swamy| 14| 5| +----+---------------+------+---------+---------+---------+----------+----------+
sdf.select(F.min(F.col('budget'))).show()
+-----------+ |min(budget)| +-----------+ | 15000| +-----------+
lowest_budget = sdf.select(F.min(F.col('budget'))).collect()[0][0]
lowest_budget
15000
sdf2 = sdf[sdf.budget != lowest_budget]
sdf2.show()
+----+---------------+------+---------+---------+---------+----------+----------+ |code| name_dept|budget| ssn| name_emp| lastname|department|dept_count| +----+---------------+------+---------+---------+---------+----------+----------+ | 59|Human Resources|240000|546523478| John| Doe| 59| 3| | 59|Human Resources|240000|654873219| Zacary| Efron| 59| 3| | 59|Human Resources|240000|745685214| Eric|Goldsmith| 59| 3| | 77| Research| 55000|332569843| George|O'Donnell| 77| 2| | 77| Research| 55000|631231482| David| Smith| 77| 2| | 14| IT| 65000|123234877| Michael| Rogers| 14| 5| | 14| IT| 65000|152934485| Anp.nand|Manikutty| 14| 5| | 14| IT| 65000|332154719|Mary-Anne| Foster| 14| 5| | 14| IT| 65000|845657245|Elizabeth| Doe| 14| 5| | 14| IT| 65000|845657246| Kumar| Swamy| 14| 5| +----+---------------+------+---------+---------+---------+----------+----------+
sec_lowest_budget = sdf2.select(F.min(F.col('budget'))).collect()[0][0]
sec_lowest_budget
55000
sdf2[sdf2.budget == sec_lowest_budget].select('name_emp','lastname').show()
+--------+---------+ |name_emp| lastname| +--------+---------+ | George|O'Donnell| | David| Smith| +--------+---------+
sdf.drop_duplicates(subset=['budget']).orderBy('budget').show()
+----+---------------+------+---------+--------+---------+----------+----------+ |code| name_dept|budget| ssn|name_emp| lastname|department|dept_count| +----+---------------+------+---------+--------+---------+----------+----------+ | 37| Accounting| 15000|222364883| Carol| Smith| 37| 2| | 77| Research| 55000|332569843| George|O'Donnell| 77| 2| | 14| IT| 65000|123234877| Michael| Rogers| 14| 5| | 59|Human Resources|240000|546523478| John| Doe| 59| 3| +----+---------------+------+---------+--------+---------+----------+----------+
sdf.drop_duplicates(subset=['budget']).orderBy('budget')[['budget']].show()
+------+ |budget| +------+ | 15000| | 55000| | 65000| |240000| +------+
sec_lowest_budget = sdf.drop_duplicates(subset=['budget']).orderBy('budget')[['budget']].collect()[1][0]
sec_lowest_budget
55000
dfd.head(2)
code | name | budget | |
---|---|---|---|
0 | 14 | IT | 65000 |
1 | 37 | Accounting | 15000 |
df.head(2)
code | name_dept | budget | ssn | name_emp | lastname | department | |
---|---|---|---|---|---|---|---|
0 | 14 | IT | 65000 | 123234877 | Michael | Rogers | 14 |
1 | 14 | IT | 65000 | 152934485 | Anp.nand | Manikutty | 14 |
df.loc[len(df)] = [11,'Quality Assurance',40000,847219811,'Mary','Moore',11]
df
code | name_dept | budget | ssn | name_emp | lastname | department | |
---|---|---|---|---|---|---|---|
0 | 14 | IT | 65000 | 123234877 | Michael | Rogers | 14 |
1 | 14 | IT | 65000 | 152934485 | Anp.nand | Manikutty | 14 |
2 | 14 | IT | 65000 | 332154719 | Mary-Anne | Foster | 14 |
3 | 14 | IT | 65000 | 845657245 | Elizabeth | Doe | 14 |
4 | 14 | IT | 65000 | 845657246 | Kumar | Swamy | 14 |
5 | 37 | Accounting | 15000 | 222364883 | Carol | Smith | 37 |
6 | 37 | Accounting | 15000 | 326587417 | Joe | Stevens | 37 |
7 | 59 | Human Resources | 240000 | 546523478 | John | Doe | 59 |
8 | 59 | Human Resources | 240000 | 654873219 | Zacary | Efron | 59 |
9 | 59 | Human Resources | 240000 | 745685214 | Eric | Goldsmith | 59 |
10 | 77 | Research | 55000 | 332569843 | George | O'Donnell | 77 |
11 | 77 | Research | 55000 | 631231482 | David | Smith | 77 |
12 | 11 | Quality Assurance | 40000 | 847219811 | Mary | Moore | 11 |
"""
INSERT INTO Departments
VALUES ( 11 , 'Quality Assurance' , 40000);
INSERT INTO Employees
VALUES ( '847219811' , 'Mary' , 'Moore' , 11);
""";
# pyspark
"""
spark df and rdd are immutable, we can not insert row to them,
but we can create new dataframe and union to old df and create new immutable dataframe.
""";
sdf.show(2)
+----+----------+------+---------+--------+--------+----------+----------+ |code| name_dept|budget| ssn|name_emp|lastname|department|dept_count| +----+----------+------+---------+--------+--------+----------+----------+ | 37|Accounting| 15000|222364883| Carol| Smith| 37| 2| | 37|Accounting| 15000|326587417| Joe| Stevens| 37| 2| +----+----------+------+---------+--------+--------+----------+----------+ only showing top 2 rows
sdf.columns
['code', 'name_dept', 'budget', 'ssn', 'name_emp', 'lastname', 'department', 'dept_count']
sdf.printSchema()
root |-- code: integer (nullable = true) |-- name_dept: string (nullable = true) |-- budget: integer (nullable = true) |-- ssn: integer (nullable = true) |-- name_emp: string (nullable = true) |-- lastname: string (nullable = true) |-- department: integer (nullable = true) |-- dept_count: long (nullable = false)
from pyspark.sql import Row
newRow = spark.createDataFrame([Row(
code=11,
name_dept='Quality Assurance',
budget=40000,
ssn=847219811,
name_emp='Mary',
lastname='Moore',
department=11
)])
newRow.show()
+----+-----------------+------+---------+--------+--------+----------+ |code| name_dept|budget| ssn|name_emp|lastname|department| +----+-----------------+------+---------+--------+--------+----------+ | 11|Quality Assurance| 40000|847219811| Mary| Moore| 11| +----+-----------------+------+---------+--------+--------+----------+
sdf.drop('dept_count').show()
+----+---------------+------+---------+---------+---------+----------+ |code| name_dept|budget| ssn| name_emp| lastname|department| +----+---------------+------+---------+---------+---------+----------+ | 37| Accounting| 15000|222364883| Carol| Smith| 37| | 37| Accounting| 15000|326587417| Joe| Stevens| 37| | 59|Human Resources|240000|546523478| John| Doe| 59| | 59|Human Resources|240000|654873219| Zacary| Efron| 59| | 59|Human Resources|240000|745685214| Eric|Goldsmith| 59| | 77| Research| 55000|332569843| George|O'Donnell| 77| | 77| Research| 55000|631231482| David| Smith| 77| | 14| IT| 65000|123234877| Michael| Rogers| 14| | 14| IT| 65000|152934485| Anp.nand|Manikutty| 14| | 14| IT| 65000|332154719|Mary-Anne| Foster| 14| | 14| IT| 65000|845657245|Elizabeth| Doe| 14| | 14| IT| 65000|845657246| Kumar| Swamy| 14| +----+---------------+------+---------+---------+---------+----------+
sdf.drop('dept_count').union(
newRow[['code',
'name_dept',
'budget',
'ssn',
'name_emp',
'lastname',
'department']]
).show()
+----+-----------------+------+---------+---------+---------+----------+ |code| name_dept|budget| ssn| name_emp| lastname|department| +----+-----------------+------+---------+---------+---------+----------+ | 37| Accounting| 15000|222364883| Carol| Smith| 37| | 37| Accounting| 15000|326587417| Joe| Stevens| 37| | 59| Human Resources|240000|546523478| John| Doe| 59| | 59| Human Resources|240000|654873219| Zacary| Efron| 59| | 59| Human Resources|240000|745685214| Eric|Goldsmith| 59| | 77| Research| 55000|332569843| George|O'Donnell| 77| | 77| Research| 55000|631231482| David| Smith| 77| | 14| IT| 65000|123234877| Michael| Rogers| 14| | 14| IT| 65000|152934485| Anp.nand|Manikutty| 14| | 14| IT| 65000|332154719|Mary-Anne| Foster| 14| | 14| IT| 65000|845657245|Elizabeth| Doe| 14| | 14| IT| 65000|845657246| Kumar| Swamy| 14| | 11|Quality Assurance| 40000|847219811| Mary| Moore| 11| +----+-----------------+------+---------+---------+---------+----------+
dfd.head(2)
code | name | budget | |
---|---|---|---|
0 | 14 | IT | 65000 |
1 | 37 | Accounting | 15000 |
dfd['budget'] *= 0.9
dfd.head(2)
code | name | budget | |
---|---|---|---|
0 | 14 | IT | 58500.0 |
1 | 37 | Accounting | 13500.0 |
"""
UPDATE Departments
SET Budget = Budget * 0.9;
""";
sdfd.withColumn('reduced_budget', F.col('budget')*0.9).show()
+----+---------------+------+--------------+ |code| name|budget|reduced_budget| +----+---------------+------+--------------+ | 14| IT| 65000| 58500.0| | 37| Accounting| 15000| 13500.0| | 59|Human Resources|240000| 216000.0| | 77| Research| 55000| 49500.0| +----+---------------+------+--------------+
df
code | name_dept | budget | ssn | name_emp | lastname | department | |
---|---|---|---|---|---|---|---|
0 | 14 | IT | 65000 | 123234877 | Michael | Rogers | 14 |
1 | 14 | IT | 65000 | 152934485 | Anp.nand | Manikutty | 14 |
2 | 14 | IT | 65000 | 332154719 | Mary-Anne | Foster | 14 |
3 | 14 | IT | 65000 | 845657245 | Elizabeth | Doe | 14 |
4 | 14 | IT | 65000 | 845657246 | Kumar | Swamy | 14 |
5 | 37 | Accounting | 15000 | 222364883 | Carol | Smith | 37 |
6 | 37 | Accounting | 15000 | 326587417 | Joe | Stevens | 37 |
7 | 59 | Human Resources | 240000 | 546523478 | John | Doe | 59 |
8 | 59 | Human Resources | 240000 | 654873219 | Zacary | Efron | 59 |
9 | 59 | Human Resources | 240000 | 745685214 | Eric | Goldsmith | 59 |
10 | 77 | Research | 55000 | 332569843 | George | O'Donnell | 77 |
11 | 77 | Research | 55000 | 631231482 | David | Smith | 77 |
12 | 11 | Quality Assurance | 40000 | 847219811 | Mary | Moore | 11 |
df.loc[df.name_dept=='Research', ['code','name_dept','department']] = [14,'IT',14]
df
code | name_dept | budget | ssn | name_emp | lastname | department | |
---|---|---|---|---|---|---|---|
0 | 14 | IT | 65000 | 123234877 | Michael | Rogers | 14 |
1 | 14 | IT | 65000 | 152934485 | Anp.nand | Manikutty | 14 |
2 | 14 | IT | 65000 | 332154719 | Mary-Anne | Foster | 14 |
3 | 14 | IT | 65000 | 845657245 | Elizabeth | Doe | 14 |
4 | 14 | IT | 65000 | 845657246 | Kumar | Swamy | 14 |
5 | 37 | Accounting | 15000 | 222364883 | Carol | Smith | 37 |
6 | 37 | Accounting | 15000 | 326587417 | Joe | Stevens | 37 |
7 | 59 | Human Resources | 240000 | 546523478 | John | Doe | 59 |
8 | 59 | Human Resources | 240000 | 654873219 | Zacary | Efron | 59 |
9 | 59 | Human Resources | 240000 | 745685214 | Eric | Goldsmith | 59 |
10 | 14 | IT | 55000 | 332569843 | George | O'Donnell | 14 |
11 | 14 | IT | 55000 | 631231482 | David | Smith | 14 |
12 | 11 | Quality Assurance | 40000 | 847219811 | Mary | Moore | 11 |
dfe2 = dfe.copy()
dfe2
ssn | name | lastname | department | |
---|---|---|---|---|
0 | 123234877 | Michael | Rogers | 14 |
1 | 152934485 | Anp.nand | Manikutty | 14 |
2 | 222364883 | Carol | Smith | 37 |
3 | 326587417 | Joe | Stevens | 37 |
4 | 332154719 | Mary-Anne | Foster | 14 |
5 | 332569843 | George | O'Donnell | 77 |
6 | 546523478 | John | Doe | 59 |
7 | 631231482 | David | Smith | 77 |
8 | 654873219 | Zacary | Efron | 59 |
9 | 745685214 | Eric | Goldsmith | 59 |
10 | 845657245 | Elizabeth | Doe | 14 |
11 | 845657246 | Kumar | Swamy | 14 |
dfe2.loc[dfe2.department==77] = 14
dfe2
ssn | name | lastname | department | |
---|---|---|---|---|
0 | 123234877 | Michael | Rogers | 14 |
1 | 152934485 | Anp.nand | Manikutty | 14 |
2 | 222364883 | Carol | Smith | 37 |
3 | 326587417 | Joe | Stevens | 37 |
4 | 332154719 | Mary-Anne | Foster | 14 |
5 | 14 | 14 | 14 | 14 |
6 | 546523478 | John | Doe | 59 |
7 | 14 | 14 | 14 | 14 |
8 | 654873219 | Zacary | Efron | 59 |
9 | 745685214 | Eric | Goldsmith | 59 |
10 | 845657245 | Elizabeth | Doe | 14 |
11 | 845657246 | Kumar | Swamy | 14 |
"""
UPDATE Employees
SET Department = 14 WHERE Department = 77;
""";
sdfe.withColumn("dept_changed",
F.when(F.col("department")==77, 14)\
.otherwise(F.col("department"))
).show()
+---------+---------+---------+----------+------------+ | ssn| name| lastname|department|dept_changed| +---------+---------+---------+----------+------------+ |123234877| Michael| Rogers| 14| 14| |152934485| Anp.nand|Manikutty| 14| 14| |222364883| Carol| Smith| 37| 37| |326587417| Joe| Stevens| 37| 37| |332154719|Mary-Anne| Foster| 14| 14| |332569843| George|O'Donnell| 77| 14| |546523478| John| Doe| 59| 59| |631231482| David| Smith| 77| 14| |654873219| Zacary| Efron| 59| 59| |745685214| Eric|Goldsmith| 59| 59| |845657245|Elizabeth| Doe| 14| 14| |845657246| Kumar| Swamy| 14| 14| +---------+---------+---------+----------+------------+
dfe
ssn | name | lastname | department | |
---|---|---|---|---|
0 | 123234877 | Michael | Rogers | 14 |
1 | 152934485 | Anp.nand | Manikutty | 14 |
2 | 222364883 | Carol | Smith | 37 |
3 | 326587417 | Joe | Stevens | 37 |
4 | 332154719 | Mary-Anne | Foster | 14 |
5 | 14 | 14 | 14 | 14 |
6 | 546523478 | John | Doe | 59 |
7 | 14 | 14 | 14 | 14 |
8 | 654873219 | Zacary | Efron | 59 |
9 | 745685214 | Eric | Goldsmith | 59 |
10 | 845657245 | Elizabeth | Doe | 14 |
11 | 845657246 | Kumar | Swamy | 14 |
dfe[dfe.department!=14]
ssn | name | lastname | department | |
---|---|---|---|---|
2 | 222364883 | Carol | Smith | 37 |
3 | 326587417 | Joe | Stevens | 37 |
6 | 546523478 | John | Doe | 59 |
8 | 654873219 | Zacary | Efron | 59 |
9 | 745685214 | Eric | Goldsmith | 59 |
"""
DELETE FROM Employees
WHERE Department = 14;
""";
sdfe[sdfe.department!=14].show()
+---------+------+---------+----------+ | ssn| name| lastname|department| +---------+------+---------+----------+ |222364883| Carol| Smith| 37| |326587417| Joe| Stevens| 37| |332569843|George|O'Donnell| 77| |546523478| John| Doe| 59| |631231482| David| Smith| 77| |654873219|Zacary| Efron| 59| |745685214| Eric|Goldsmith| 59| +---------+------+---------+----------+
dfd
code | name | budget | |
---|---|---|---|
0 | 14 | IT | 58500.0 |
1 | 37 | Accounting | 13500.0 |
2 | 59 | Human Resources | 216000.0 |
3 | 77 | Research | 49500.0 |
dfd[dfd.budget < 60000]
code | name | budget | |
---|---|---|---|
0 | 14 | IT | 58500.0 |
1 | 37 | Accounting | 13500.0 |
3 | 77 | Research | 49500.0 |
"""
DELETE FROM Employees
WHERE Department IN
(
SELECT Code FROM Departments
WHERE Budget >= 60000
);
""";
dfe2 = pd.DataFrame(columns=dfe.columns)
dfe2
ssn | name | lastname | department |
---|
"""
DELETE FROM Employees;
""";
schema = sdfe.schema
empty = spark.createDataFrame(sc.emptyRDD(), schema)
empty.show()
+---+----+--------+----------+ |ssn|name|lastname|department| +---+----+--------+----------+ +---+----+--------+----------+
time_taken = time.time() - time_start_notebook
h,m = divmod(time_taken,60*60)
print('Time taken: {:.0f} hr {:.0f} min {:.0f} secs'.format(h, *divmod(m,60)))
Time taken: 0 hr 0 min 26 secs