import time
import os
time_start_notebook = time.time()
# my local library
import sys
sys.path.append("/Users/poudel/Dropbox/a00_Bhishan_Modules/")
sys.path.append("/Users/poudel/Dropbox/a00_Bhishan_Modules/bhishan")
from bhishan import bp
import numpy as np
import pandas as pd
# Jupyter notebook settings for pandas
pd.set_option('display.max_columns', 200)
pd.set_option('display.max_rows', 100)
from datetime import datetime
from dateutil.relativedelta import relativedelta
# last_year = datetime.now() - relativedelta(years=1)
import sqlite3
sqlite3.sqlite_version # sqlite database version 3.29
sqlite3.version # python module version
'2.6.0'
# pyspark settings
import os
import findspark
HOME = os.path.expanduser('~')
findspark.init(HOME + "/Softwares/Spark/spark-3.1.2-bin-hadoop3.2")
# Set spark environments
os.environ['PYSPARK_PYTHON'] = f'{HOME}/opt/miniconda3/envs/spk/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = f'{HOME}/opt/miniconda3/envs/spk/bin/python'
import pyspark
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import col as _col
from pyspark.sql.functions import udf # @udf("integer") def myfunc(x,y): return x - y
from pyspark.sql import functions as F # stddev format_number date_format, dayofyear, when
from pyspark.sql.window import Window
from pyspark.sql.types import StructField, StringType, IntegerType, FloatType, StructType
from pyspark.sql.functions import (mean as _mean, min as _min,
max as _max, avg as _avg,
when as _when
)
spark = SparkSession\
.builder\
.appName("bhishan")\
.getOrCreate()
sc = spark.sparkContext
sqlContext = pyspark.SQLContext(sc) # spark_df = sqlContext.createDataFrame(pandas_df)
sc.setLogLevel("INFO")
print(pyspark.__version__)
3.1.2
from pyspark import SparkConf, SparkContext, SQLContext
spark = pyspark.sql.SparkSession.builder.appName('app').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc) # spark_df = sqlContext.createDataFrame(pandas_df)
sc.setLogLevel("INFO")
def f(q,n=None):
return pd.read_sql(q,conn).head(n)
def s(q,n=5):
sdf = spark.sql(q)
print(f'nrows: {sdf.count()}, ncols: {len(sdf.columns)}')
return sdf.show(n)
# create db if does not exist
dbname = 'wbmovie.db'
if os.path.isfile(dbname):
os.remove(dbname)
conn = sqlite3.connect(dbname)
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE Movies (
Code INTEGER PRIMARY KEY NOT NULL,
Title TEXT NOT NULL,
Rating TEXT
)
""")
<sqlite3.Cursor at 0x7f8eb6ad3030>
cur.execute(
"""
CREATE TABLE MovieTheaters (
Code INTEGER PRIMARY KEY NOT NULL,
Name TEXT NOT NULL,
Movie INTEGER
CONSTRAINT fk_Movies_Code REFERENCES Movies(Code)
)
""")
<sqlite3.Cursor at 0x7f8eb6ad3030>
data = """
INSERT INTO Movies(Code,Title,Rating) VALUES(9,'Citizen King','G');
INSERT INTO Movies(Code,Title,Rating) VALUES(1,'Citizen Kane','PG');
INSERT INTO Movies(Code,Title,Rating) VALUES(2,'Singin'' in the Rain','G');
INSERT INTO Movies(Code,Title,Rating) VALUES(3,'The Wizard of Oz','G');
INSERT INTO Movies(Code,Title,Rating) VALUES(4,'The Quiet Man',NULL);
INSERT INTO Movies(Code,Title,Rating) VALUES(5,'North by Northwest',NULL);
INSERT INTO Movies(Code,Title,Rating) VALUES(6,'The Last Tango in Paris','NC-17');
INSERT INTO Movies(Code,Title,Rating) VALUES(7,'Some Like it Hot','PG-13');
INSERT INTO Movies(Code,Title,Rating) VALUES(8,'A Night at the Opera',NULL);
INSERT INTO MovieTheaters(Code,Name,Movie) VALUES(1,'Odeon',5);
INSERT INTO MovieTheaters(Code,Name,Movie) VALUES(2,'Imperial',1);
INSERT INTO MovieTheaters(Code,Name,Movie) VALUES(3,'Majestic',NULL);
INSERT INTO MovieTheaters(Code,Name,Movie) VALUES(4,'Royale',6);
INSERT INTO MovieTheaters(Code,Name,Movie) VALUES(5,'Paraiso',3);
INSERT INTO MovieTheaters(Code,Name,Movie) VALUES(6,'Nickelodeon',NULL);
""".split('\n')
for i,line in enumerate(data):
if line:
cur.execute(line.strip())
conn.commit()
pd.read_sql("select * from Movies", conn).pipe(print)
Code Title Rating 0 1 Citizen Kane PG 1 2 Singin' in the Rain G 2 3 The Wizard of Oz G 3 4 The Quiet Man None 4 5 North by Northwest None 5 6 The Last Tango in Paris NC-17 6 7 Some Like it Hot PG-13 7 8 A Night at the Opera None 8 9 Citizen King G
dfm = pd.DataFrame({
'Code': range(1,10),
'Title': ['Citizen Kane', "Singin' in the Rain",
'The Wizard of Oz', 'The Quiet Man',
'North by Northwest', 'The Last Tango in Paris',
'Some Like it Hot', 'A Night at the Opera',
'Citizen King'],
'Rating': ['PG', 'G', 'G',
None, None, 'NC-17',
'PG-13', None, 'G']})
dfm
Code | Title | Rating | |
---|---|---|---|
0 | 1 | Citizen Kane | PG |
1 | 2 | Singin' in the Rain | G |
2 | 3 | The Wizard of Oz | G |
3 | 4 | The Quiet Man | None |
4 | 5 | North by Northwest | None |
5 | 6 | The Last Tango in Paris | NC-17 |
6 | 7 | Some Like it Hot | PG-13 |
7 | 8 | A Night at the Opera | None |
8 | 9 | Citizen King | G |
dfm.dtypes
Code int64 Title object Rating object dtype: object
schema = StructType([
StructField('Code',IntegerType(),True),
StructField('Title',StringType(),True),
StructField('Rating',StringType(),True),
])
sdfm = sqlContext.createDataFrame(dfm, schema)
sdfm.show()
+----+--------------------+------+ |Code| Title|Rating| +----+--------------------+------+ | 1| Citizen Kane| PG| | 2| Singin' in the Rain| G| | 3| The Wizard of Oz| G| | 4| The Quiet Man| null| | 5| North by Northwest| null| | 6|The Last Tango in...| NC-17| | 7| Some Like it Hot| PG-13| | 8|A Night at the Opera| null| | 9| Citizen King| G| +----+--------------------+------+
pd.read_sql("select * from MovieTheaters", conn).pipe(print)
Code Name Movie 0 1 Odeon 5.0 1 2 Imperial 1.0 2 3 Majestic NaN 3 4 Royale 6.0 4 5 Paraiso 3.0 5 6 Nickelodeon NaN
dft = pd.DataFrame({
'Code': [1, 2, 3, 4, 5, 6],
'Name': ['Odeon', 'Imperial', 'Majestic',
'Royale', 'Paraiso', 'Nickelodeon'],
'Movie': [5.0, 1.0, np.nan, 6.0, 3.0, np.nan]})
dft
Code | Name | Movie | |
---|---|---|---|
0 | 1 | Odeon | 5.0 |
1 | 2 | Imperial | 1.0 |
2 | 3 | Majestic | NaN |
3 | 4 | Royale | 6.0 |
4 | 5 | Paraiso | 3.0 |
5 | 6 | Nickelodeon | NaN |
dft.dtypes
Code int64 Name object Movie float64 dtype: object
schema = StructType([
StructField('Code',IntegerType(),True),
StructField('Name',StringType(),True),
StructField('Movie',FloatType(),True),
])
sdft = sqlContext.createDataFrame(dft, schema)
sdft.show()
+----+-----------+-----+ |Code| Name|Movie| +----+-----------+-----+ | 1| Odeon| 5.0| | 2| Imperial| 1.0| | 3| Majestic| NaN| | 4| Royale| 6.0| | 5| Paraiso| 3.0| | 6|Nickelodeon| NaN| +----+-----------+-----+
# sdft.printSchema()
df = dfm.merge(dft,left_on='Code', right_on='Movie',suffixes=['_m','_t'])
df
Code_m | Title | Rating | Code_t | Name | Movie | |
---|---|---|---|---|---|---|
0 | 1 | Citizen Kane | PG | 2 | Imperial | 1.0 |
1 | 3 | The Wizard of Oz | G | 5 | Paraiso | 3.0 |
2 | 5 | North by Northwest | None | 1 | Odeon | 5.0 |
3 | 6 | The Last Tango in Paris | NC-17 | 4 | Royale | 6.0 |
cols1 = sdft.columns
cols2 = sdfm.columns
cols_common = set(cols1) & set(cols2)
cols1, cols2, cols_common
(['Code', 'Name', 'Movie'], ['Code', 'Title', 'Rating'], {'Code'})
# rename common columns of first spark dataframe.
sdfm2 = sdfm.withColumnRenamed('Code','Code_m')
sdfm2.show(2)
+------+-------------------+------+ |Code_m| Title|Rating| +------+-------------------+------+ | 1| Citizen Kane| PG| | 2|Singin' in the Rain| G| +------+-------------------+------+ only showing top 2 rows
# rename common columns of second spark dataframe.
sdft2 = sdft.withColumnRenamed('Code','Code_t')
sdft2.show(2)
+------+--------+-----+ |Code_t| Name|Movie| +------+--------+-----+ | 1| Odeon| 5.0| | 2|Imperial| 1.0| +------+--------+-----+ only showing top 2 rows
# now join two spark dataframes which have different column names.
sdf = sdfm2.join(sdft2,sdfm2.Code_m==sdft2.Movie)
sdf.show()
+------+--------------------+------+------+--------+-----+ |Code_m| Title|Rating|Code_t| Name|Movie| +------+--------------------+------+------+--------+-----+ | 5| North by Northwest| null| 1| Odeon| 5.0| | 3| The Wizard of Oz| G| 5| Paraiso| 3.0| | 1| Citizen Kane| PG| 2|Imperial| 1.0| | 6|The Last Tango in...| NC-17| 4| Royale| 6.0| +------+--------------------+------+------+--------+-----+
bp.show_methods(sdf,4)
# count, fillna, filter, where, groubpy, first, corr,
# missing than pandas: max, min, sum, etc.
0 | 1 | 2 | 3 | |
---|---|---|---|---|
0 | agg | drop_duplicates | localCheckpoint | sql_ctx |
1 | alias | dropna | mapInPandas | stat |
2 | approxQuantile | dtypes | na | storageLevel |
3 | cache | exceptAll | orderBy | subtract |
4 | checkpoint | explain | persist | summary |
5 | coalesce | fillna | printSchema | tail |
6 | colRegex | filter | randomSplit | take |
7 | collect | first | rdd | toDF |
8 | columns | foreach | registerTempTable | toJSON |
9 | corr | foreachPartition | repartition | toLocalIterator |
10 | count | freqItems | repartitionByRange | toPandas |
11 | cov | groupBy | replace | transform |
12 | createGlobalTempView | groupby | rollup | union |
13 | createOrReplaceGlobalTempView | head | sameSemantics | unionAll |
14 | createOrReplaceTempView | hint | sample | unionByName |
15 | createTempView | inputFiles | sampleBy | unpersist |
16 | crossJoin | intersect | schema | where |
17 | crosstab | intersectAll | select | withColumn |
18 | cube | isLocal | selectExpr | withColumnRenamed |
19 | describe | isStreaming | semanticHash | withWatermark |
20 | distinct | is_cached | show | write |
21 | drop | join | sort | writeStream |
22 | dropDuplicates | limit | sortWithinPartitions | writeTo |
Global Temporary View
Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database global_temp, and we must use the qualified name to refer it, e.g. SELECT * FROM global_temp.view1
.
# Register the DataFrame as a SQL temporary view
sdfm.createOrReplaceTempView("Movies")
sdft.createOrReplaceTempView("MovieTheaters")
sdf.createOrReplaceTempView("sdf")
spark.sql('select * from Movies limit 2').show()
+----+-------------------+------+ |Code| Title|Rating| +----+-------------------+------+ | 1| Citizen Kane| PG| | 2|Singin' in the Rain| G| +----+-------------------+------+
# dfm['Title']
# sdfm.select('Title').show()
q = """select title from Movies""";
# f(q)
# s(q)
dfm['Rating'].unique()
array(['PG', 'G', None, 'NC-17', 'PG-13'], dtype=object)
sdfm.select('Rating').distinct().show()
+------+ |Rating| +------+ | null| | PG| | NC-17| | G| | PG-13| +------+
q = "select distinct rating from Movies"
# f(q)
# s(q)
dfm[dfm.Rating.isnull()]
Code | Title | Rating | |
---|---|---|---|
3 | 4 | The Quiet Man | None |
4 | 5 | North by Northwest | None |
7 | 8 | A Night at the Opera | None |
sdfm.filter(sdfm.Rating.isNull()).show()
+----+--------------------+------+ |Code| Title|Rating| +----+--------------------+------+ | 4| The Quiet Man| null| | 5| North by Northwest| null| | 8|A Night at the Opera| null| +----+--------------------+------+
q = "select * from Movies where Rating is null"
# f(q)
s(q)
nrows: 3, ncols: 3 +----+--------------------+------+ |Code| Title|Rating| +----+--------------------+------+ | 4| The Quiet Man| null| | 5| North by Northwest| null| | 8|A Night at the Opera| null| +----+--------------------+------+
dft[dft.Movie.isnull()]
Code | Name | Movie | |
---|---|---|---|
2 | 3 | Majestic | NaN |
5 | 6 | Nickelodeon | NaN |
sdft.filter(sdft.Movie.isNull()).show() # fails, gives empty result
sdft.where(F.isnull('Movie')).show() # gives empty result
+----+----+-----+ |Code|Name|Movie| +----+----+-----+ +----+----+-----+ +----+----+-----+ |Code|Name|Movie| +----+----+-----+ +----+----+-----+
spark.sql("""select * from MovieTheaters where Movie = 'NaN' """).show()
+----+-----------+-----+ |Code| Name|Movie| +----+-----------+-----+ | 3| Majestic| NaN| | 6|Nickelodeon| NaN| +----+-----------+-----+
spark.sql("select * from MovieTheaters where isnan(Movie)").show()
+----+-----------+-----+ |Code| Name|Movie| +----+-----------+-----+ | 3| Majestic| NaN| | 6|Nickelodeon| NaN| +----+-----------+-----+
q = 'select * from MovieTheaters where Movie is null'
f(q)
Code | Name | Movie | |
---|---|---|---|
0 | 3 | Majestic | None |
1 | 6 | Nickelodeon | None |
s(q)
nrows: 0, ncols: 3 +----+----+-----+ |Code|Name|Movie| +----+----+-----+ +----+----+-----+
dft.head(1)
Code | Name | Movie | |
---|---|---|---|
0 | 1 | Odeon | 5.0 |
dfm.head(1)
Code | Title | Rating | |
---|---|---|---|
0 | 1 | Citizen Kane | PG |
dft.merge(dfm,left_on='Movie',right_on='Code',how='left',suffixes=['_t','_m'])
# left join also include nulls.
Code_t | Name | Movie | Code_m | Title | Rating | |
---|---|---|---|---|---|---|
0 | 1 | Odeon | 5.0 | 5.0 | North by Northwest | None |
1 | 2 | Imperial | 1.0 | 1.0 | Citizen Kane | PG |
2 | 3 | Majestic | NaN | NaN | NaN | NaN |
3 | 4 | Royale | 6.0 | 6.0 | The Last Tango in Paris | NC-17 |
4 | 5 | Paraiso | 3.0 | 3.0 | The Wizard of Oz | G |
5 | 6 | Nickelodeon | NaN | NaN | NaN | NaN |
(sdft2
.join(sdfm2,
sdfm2.Code_m==sdft2.Movie,
how='left'
)
.show()
)
+------+-----------+-----+------+--------------------+------+ |Code_t| Name|Movie|Code_m| Title|Rating| +------+-----------+-----+------+--------------------+------+ | 1| Odeon| 5.0| 5| North by Northwest| null| | 5| Paraiso| 3.0| 3| The Wizard of Oz| G| | 2| Imperial| 1.0| 1| Citizen Kane| PG| | 4| Royale| 6.0| 6|The Last Tango in...| NC-17| | 3| Majestic| NaN| null| null| null| | 6|Nickelodeon| NaN| null| null| null| +------+-----------+-----+------+--------------------+------+
q = """
select *
from MovieTheaters t
left join Movies m
on m.code = t.movie
"""
f(q,6)
Code | Name | Movie | Code | Title | Rating | |
---|---|---|---|---|---|---|
0 | 1 | Odeon | 5.0 | 5.0 | North by Northwest | None |
1 | 2 | Imperial | 1.0 | 1.0 | Citizen Kane | PG |
2 | 3 | Majestic | NaN | NaN | None | None |
3 | 4 | Royale | 6.0 | 6.0 | The Last Tango in Paris | NC-17 |
4 | 5 | Paraiso | 3.0 | 3.0 | The Wizard of Oz | G |
5 | 6 | Nickelodeon | NaN | NaN | None | None |
q = """
select *
from MovieTheaters t
right join Movies m
on m.code=t.movie
"""
s(q,9)
nrows: 9, ncols: 6 +----+--------+-----+----+--------------------+------+ |Code| Name|Movie|Code| Title|Rating| +----+--------+-----+----+--------------------+------+ |null| null| null| 9| Citizen King| G| | 1| Odeon| 5.0| 5| North by Northwest| null| |null| null| null| 7| Some Like it Hot| PG-13| |null| null| null| 2| Singin' in the Rain| G| | 5| Paraiso| 3.0| 3| The Wizard of Oz| G| | 2|Imperial| 1.0| 1| Citizen Kane| PG| | 4| Royale| 6.0| 6|The Last Tango in...| NC-17| |null| null| null| 8|A Night at the Opera| null| |null| null| null| 4| The Quiet Man| null| +----+--------+-----+----+--------------------+------+
dfm
Code | Title | Rating | |
---|---|---|---|
0 | 1 | Citizen Kane | PG |
1 | 2 | Singin' in the Rain | G |
2 | 3 | The Wizard of Oz | G |
3 | 4 | The Quiet Man | None |
4 | 5 | North by Northwest | None |
5 | 6 | The Last Tango in Paris | NC-17 |
6 | 7 | Some Like it Hot | PG-13 |
7 | 8 | A Night at the Opera | None |
8 | 9 | Citizen King | G |
dft
Code | Name | Movie | |
---|---|---|---|
0 | 1 | Odeon | 5.0 |
1 | 2 | Imperial | 1.0 |
2 | 3 | Majestic | NaN |
3 | 4 | Royale | 6.0 |
4 | 5 | Paraiso | 3.0 |
5 | 6 | Nickelodeon | NaN |
# I see only moives 1,3,5,6 are showing in theaters.
# movies 2,4,7-9 are not being shown in any theater.
# 2 Singin' in the Rain G
# 4 The Quiet Man None
q = """
select *
from Movies m
left join MovieTheaters t
on t.movie = m.code
where t.movie is null
"""
s(q,6)
nrows: 5, ncols: 6 +----+--------------------+------+----+----+-----+ |Code| Title|Rating|Code|Name|Movie| +----+--------------------+------+----+----+-----+ | 9| Citizen King| G|null|null| null| | 7| Some Like it Hot| PG-13|null|null| null| | 2| Singin' in the Rain| G|null|null| null| | 8|A Night at the Opera| null|null|null| null| | 4| The Quiet Man| null|null|null| null| +----+--------------------+------+----+----+-----+
q = """
select title
from Movies m
where code not in
(
select movie from MovieTheaters where movie is not null
)
"""
f(q,9)
Title | |
---|---|
0 | Singin' in the Rain |
1 | The Quiet Man |
2 | Some Like it Hot |
3 | A Night at the Opera |
4 | Citizen King |
q = "INSERT INTO Movies(Title,Rating) VALUES('One, Two, Three',NULL);"
cur.execute(q)
conn.commit()
pd.read_sql('select * from Movies',conn)
Code | Title | Rating | |
---|---|---|---|
0 | 1 | Citizen Kane | PG |
1 | 2 | Singin' in the Rain | G |
2 | 3 | The Wizard of Oz | G |
3 | 4 | The Quiet Man | None |
4 | 5 | North by Northwest | None |
5 | 6 | The Last Tango in Paris | NC-17 |
6 | 7 | Some Like it Hot | PG-13 |
7 | 8 | A Night at the Opera | None |
8 | 9 | Citizen King | G |
9 | 10 | One, Two, Three | None |
q = "UPDATE Movies SET Rating='G' WHERE Rating IS NULL;"
cur.execute(q)
conn.commit()
pd.read_sql('select * from Movies',conn)
Code | Title | Rating | |
---|---|---|---|
0 | 1 | Citizen Kane | PG |
1 | 2 | Singin' in the Rain | G |
2 | 3 | The Wizard of Oz | G |
3 | 4 | The Quiet Man | G |
4 | 5 | North by Northwest | G |
5 | 6 | The Last Tango in Paris | NC-17 |
6 | 7 | Some Like it Hot | PG-13 |
7 | 8 | A Night at the Opera | G |
8 | 9 | Citizen King | G |
9 | 10 | One, Two, Three | G |
q = """
DELETE FROM MovieTheaters WHERE Movie IN
(SELECT Code FROM Movies WHERE Rating = 'NC-17');
""".strip()
cur.execute(q)
conn.commit()
pd.read_sql('select * from MovieTheaters',conn)
Code | Name | Movie | |
---|---|---|---|
0 | 1 | Odeon | 5.0 |
1 | 2 | Imperial | 1.0 |
2 | 3 | Majestic | NaN |
3 | 5 | Paraiso | 3.0 |
4 | 6 | Nickelodeon | NaN |
time_taken = time.time() - time_start_notebook
h,m = divmod(time_taken,60*60)
print('Time taken: {:.0f} hr {:.0f} min {:.0f} secs'.format(h, *divmod(m,60)))
Time taken: 0 hr 0 min 29 secs