PySpark SQL分析案例:电商订单数据分析
【示例】Northwind是一个最初由Microsoft创建的示例数据库,包含一个名为“Northwind Traders”的虚拟公司的销售数据,该公司从世界各地进口和出口特色食品。现要求通过分析该电商数据集,回答以下问题:
- (1) 每个客户下了多少订单?
- (2) 每个国家的订单有多少?
- (3) 每月/年有多少订单?
- (4) 每个客户的年销售总额是多少?
- (5) 客户每年的平均订单是多少?
要回答以上问题,需要使用到其中的订单表和订单明细表。请按以下步骤操作。
(1) 首先构造一个SparkSession实例,代码如下:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# 创建SparkSession的实例
spark = SparkSession.builder \
.master("spark://localhost:7077") \
.appName("pyspark sql demo") \
.config("spark.hadoop.hive.exec.dynamic.partition", "true") \
.config("spark.hadoop.hive.exec.dynamic.partition.mode","nonstrict") \
.enableHiveSupport() \
.getOrCreate()
(2) 加载订单数据集,代码如下:
# 1、读取源数据文件
filePath = "/data/spark/nw/"
# 加载订单数据集到DataFrame中
orders = spark.read \
.option("header","true") \
.option("inferSchema","true") \
.csv(filePath + "NW-Orders-01.csv") \
print(f"订单有{orders.count()}行")
orders.printSchema()
orders.show(3)
执行以上代码,输出结果如下:
订单有830行 root |-- OrderID: integer (nullable = true) |-- CustomerID: string (nullable = true) |-- EmployeeID: integer (nullable = true) |-- OrderDate: string (nullable = true) |-- ShipCountry: string (nullable = true) +-------+----------+----------+---------+-----------+ |OrderID|CustomerID|EmployeeID|OrderDate|ShipCountry| +-------+----------+----------+---------+-----------+ | 10248| VINET| 5| 1996-7-2| France| | 10249| TOMSP| 6| 1996-7-3| Germany| | 10250| HANAR| 4| 1996-7-6| Brazil| +-------+----------+----------+---------+-----------+ only showing top 3 rows
(3) 加载订单明细数据集,代码如下:
# 加载订单明细数据集到DataFrame中
orderDetails = spark.read \
.option("header","true") \
.option("inferSchema","true") \
.csv(filePath + "NW-Order-Details.csv") \
print(f"订单明细有{orderDetails.count()}行")
orderDetails.printSchema()
orderDetails.show(3)
执行以上代码,输出结果如下:
订单明细有2155行 root |-- OrderID: integer (nullable = true) |-- ProductId: integer (nullable = true) |-- UnitPrice: double (nullable = true) |-- Qty: integer (nullable = true) |-- Discount: double (nullable = true) +-------+---------+---------+---+--------+ |OrderID|ProductId|UnitPrice|Qty|Discount| +-------+---------+---------+---+--------+ | 10248| 11| 14.0| 12| 0.0| | 10248| 42| 9.8| 10| 0.0| | 10248| 72| 34.8| 5| 0.0| +-------+---------+---------+---+--------+ only showing top 3 rows
(4) 回答第1个问题:每个客户下了多少订单?代码如下:
from pyspark.sql.functions import col
orderByCustomer = orders.groupBy("CustomerID").count()
orderByCustomer.sort(col("count").desc()).show(3)
执行以上代码,输出结果如下:
+----------+-----+ |CustomerID|count| +----------+-----+ | SAVEA| 31| | ERNSH| 30| | QUICK| 28| | HUNGO| 19| | FOLKO| 19| | HILAA| 18| | RATTC| 18| | BERGS| 18| | BONAP| 17| | WARTH| 15| +----------+-----+ only showing top 10 rows
(5) 回答第2个问题:每个国家的订单有多少?代码如下:
orderByCountry = orders.groupBy("ShipCountry").count()
orderByCountry.sort(col("count").desc()).show(3)
执行以上代码,输出结果如下:
+-----------+-----+ |ShipCountry|count| +-----------+-----+ | Germany| 122| | USA| 122| | Brazil| 82| | France| 77| | UK| 56| | Venezuela| 46| | Austria| 40| | Sweden| 37| | Canada| 30| | Mexico| 28| +-----------+-----+ only showing top 10 rows
对于后面三个问题,需要对数据进行一些转换,包括:
- (1) 向Orders DataFrame增加一个OrderTotal列,为此,需要计算每个明细的实际金额,然后根据order id统计每张订单的总金额,并对order details & orders进行等值内连接,增加订单总金额。另外还要检查是否有任何null列。
- (2) 增加一个date列。
- (3) 增加month和year列,以便按月进行统计。
(6) 向Orders DataFrame增加一个OrderTotal列。首先,向order details中增加每行的小计(即每个订单明细的实际金额),代码如下:
orderDetails1 = orderDetails.select(
"OrderID",
(col("UnitPrice") * col("Qty") - col("UnitPrice") * col("Qty") * col("Discount")).alias("OrderPrice")
)
orderDetails1.show(10)
执行以上代码,输出结果如下:
+-------+----------+ |OrderID|OrderPrice| +-------+----------+ | 10248| 168.0| | 10248| 98.0| | 10248| 174.0| | 10249| 167.4| | 10249| 1696.0| | 10250| 77.0| | 10250| 1261.4| | 10250| 214.2| | 10251| 95.76| | 10251| 222.3| +-------+----------+ only showing top 10 rows
然后根据order id统计每张订单的总金额,代码如下:
from pyspark.sql.functions import sum,bround
orderTot = orderDetails1 \
.groupBy("OrderID") \
.agg(sum("OrderPrice").alias("OrderTotal"))
orderTot \
.select("OrderID",bround("OrderTotal",2).alias("OrderTotal")) \
.sort("OrderID") \
.show(5)
# orderTot.sort(col("OrderTotal").desc()).show(10)
执行以上代码,输出结果如下:
+-------+----------+ |OrderID|OrderTotal| +-------+----------+ | 10248| 440.0| | 10249| 1863.4| | 10250| 1552.6| | 10251| 654.06| | 10252| 3597.9| | 10253| 1444.8| | 10254| 556.62| | 10255| 2490.5| | 10256| 517.8| | 10257| 1119.9| +-------+----------+ only showing top 10 rows
接下来,对orderTot & orders进行等值内连接,为订单表增加一个订单总金额属性列(Total),代码如下:
orders1 = orders \
.join(orderTot, "OrderID", "inner") \
.select(
orders.OrderID,
orders.CustomerID,
orders.OrderDate,
orders.ShipCountry,
orderTot.OrderTotal.alias("Total")
)
orders1.sort("CustomerID").show()
执行以上代码,输出结果如下:
+-------+----------+----------+-----------+------------------+ |OrderID|CustomerID| OrderDate|ShipCountry| Total| +-------+----------+----------+-----------+------------------+ | 11011| ALFKI| 1998-4-7| Germany| 933.5| | 10692| ALFKI| 1997-10-1| Germany| 878.0| | 10702| ALFKI|1997-10-11| Germany| 330.0| | 10835| ALFKI| 1998-1-13| Germany| 845.8| | 10643| ALFKI| 1997-8-23| Germany| 814.5| | 10952| ALFKI| 1998-3-14| Germany| 471.2| | 10308| ANATR| 1996-9-16| Mexico| 88.8| | 10926| ANATR| 1998-3-2| Mexico| 514.4| | 10759| ANATR|1997-11-26| Mexico| 320.0| | 10625| ANATR| 1997-8-6| Mexico| 479.75| | 10507| ANTON| 1997-4-13| Mexico| 749.0625| | 10365| ANTON|1996-11-25| Mexico|403.20000000000005| | 10535| ANTON| 1997-5-11| Mexico| 1940.85| | 10573| ANTON| 1997-6-17| Mexico| 2082.0| | 10682| ANTON| 1997-9-23| Mexico| 375.5| | 10677| ANTON| 1997-9-20| Mexico| 813.365| | 10856| ANTON| 1998-1-26| Mexico| 660.0| | 10383| AROUT|1996-12-14| UK| 899.0| | 10355| AROUT|1996-11-13| UK| 480.0| | 10453| AROUT| 1997-2-19| UK| 407.7| +-------+----------+----------+-----------+------------------+ only showing top 20 rows
最后,检查orders是否有空值,代码如下:
orders1.filter(col("Total").isNull()).show()
执行以上代码,输出结果如下:
+-------+----------+---------+-----------+-----+ |OrderID|CustomerID|OrderDate|ShipCountry|Total| +-------+----------+---------+-----------+-----+ +-------+----------+---------+-----------+-----+
可以看到没有空值,说明不需要处理空值。
(7) 对OrderDate列进行规范化,转换为一个新的Date属性列,代码如下:
from pyspark.sql.functions import to_date
orders2 = orders1.withColumn("Date",to_date("OrderDate"))
orders2.printSchema()
orders2.show(5)
执行以上代码,输出结果如下:
root |-- OrderID: integer (nullable = true) |-- CustomerID: string (nullable = true) |-- OrderDate: string (nullable = true) |-- ShipCountry: string (nullable = true) |-- Total: double (nullable = true) |-- Date: date (nullable = true) +-------+----------+---------+-----------+------------------+----------+ |OrderID|CustomerID|OrderDate|ShipCountry| Total| Date| +-------+----------+---------+-----------+------------------+----------+ | 10248| VINET| 1996-7-2| France| 440.0|1996-07-02| | 10249| TOMSP| 1996-7-3| Germany| 1863.4|1996-07-03| | 10250| HANAR| 1996-7-6| Brazil|1552.6000000000001|1996-07-06| | 10251| VICTE| 1996-7-6| France| 654.06|1996-07-06| | 10252| SUPRD| 1996-7-7| Belgium| 3597.9|1996-07-07| +-------+----------+---------+-----------+------------------+----------+ only showing top 5 rows
(8) 再从OrderDate列中分别抽取月份和年份部分,增加两个新的属性列month和year,代码如下:
from pyspark.sql.functions import month,year
orders3 = orders2 \
.withColumn("Month",month("OrderDate")) \
.withColumn("Year",year("OrderDate"))
orders3.show(10)
执行以上代码,输出结果如下:
+-------+----------+---------+-----------+------------------+----------+-----+----+ |OrderID|CustomerID|OrderDate|ShipCountry| Total| Date|Month|Year| +-------+----------+---------+-----------+------------------+----------+-----+----+ | 10248| VINET| 1996-7-2| France| 440.0|1996-07-02| 7|1996| | 10249| TOMSP| 1996-7-3| Germany| 1863.4|1996-07-03| 7|1996| | 10250| HANAR| 1996-7-6| Brazil|1552.6000000000001|1996-07-06| 7|1996| | 10251| VICTE| 1996-7-6| France| 654.06|1996-07-06| 7|1996| | 10252| SUPRD| 1996-7-7| Belgium| 3597.9|1996-07-07| 7|1996| | 10253| HANAR| 1996-7-8| Brazil|1444.8000000000002|1996-07-08| 7|1996| | 10254| CHOPS| 1996-7-9|Switzerland| 556.6199999999999|1996-07-09| 7|1996| | 10255| RICSU|1996-7-10|Switzerland| 2490.5|1996-07-10| 7|1996| | 10256| WELLI|1996-7-13| Brazil| 517.8|1996-07-13| 7|1996| | 10257| HILAA|1996-7-14| Venezuela| 1119.9|1996-07-14| 7|1996| +-------+----------+---------+-----------+------------------+----------+-----+----+ only showing top 10 rows
整理好相关的属性之后,现在可以回答示例开始所提出的剩余问题了。
(9) 现在可以回答第3个问题:每月/年有多少订单金额?代码如下:
ordersByYM = orders3 \
.groupBy("Year","Month") \
.agg(sum("Total").alias("Total"))
ordersByYM.select(
"Year",
"Month",
bround("Total",2).alias("Total")
).sort("Year","Month").show(10)
执行以上代码,输出结果如下:
+----+-----+---------+ |Year|Month| Total| +----+-----+---------+ |1996| 7| 30741.9| |1996| 8| 22726.88| |1996| 9| 27691.4| |1996| 10| 38380.12| |1996| 11| 45694.44| |1996| 12| 52494.33| |1997| 1| 51612.97| |1997| 2| 38483.64| |1997| 3| 40918.82| |1997| 4| 57116.71| |1997| 5| 50270.33| |1997| 6| 34392.08| |1997| 7| 52744.68| |1997| 8| 46991.78| |1997| 9| 57723.23| |1997| 10| 62253.63| |1997| 11| 51294.81| |1997| 12| 67920.23| |1998| 1|107049.96| |1998| 2| 85240.83| +----+-----+---------+ only showing top 20 rows
(10) 回答第4个问题:每个客户的年销售总额是多少?代码如下:
ordersByCY = orders3 \
.groupBy("CustomerID","Year") \
.agg(sum("Total").alias("Total"))
ordersByCY.sort("CustomerID","Year").show(10)
执行以上代码,输出结果如下:
+----------+----+------------------+ |CustomerID|Year| Total| +----------+----+------------------+ | ALFKI|1997| 2022.5| | ALFKI|1998| 2250.5| | ANATR|1996| 88.8| | ANATR|1997| 799.75| | ANATR|1998| 514.4| | ANTON|1996| 403.20000000000005| | ANTON|1997| 5960.7775| | ANTON|1998| 660.0| | AROUT|1996| 1379.0| | AROUT|1997| 6406.900000000001| | AROUT|1998| 5604.75| | BERGS|1996| 4324.4| | BERGS|1997| 13849.015000000001| | BERGS|1998| 6754.1625| | BLAUS|1997| 1079.8| | BLAUS|1998| 2160.0| | BLONP|1996| 9986.2| | BLONP|1997| 7817.88| | BLONP|1998| 730.0| | BOLID|1996| 982.0| +----------+----+------------------+ only showing top 20 rows
(11) 回答第5个问题:客户每年的平均订单金额是多少?代码如下:
from pyspark.sql.functions import avg
ordersByCY = orders3 \
.groupBy("CustomerID","Year") \
.agg(avg("Total").alias("Avg"))
ordersByCY.select(
"CustomerID",
"Year",
bround("Avg",2)
).sort("CustomerID","Year").show(10)
执行以上代码,输出结果如下:
+----------+----+--------------+ |CustomerID|Year|bround(Avg, 2)| +----------+----+--------------+ | ALFKI|1997| 674.17| | ALFKI|1998| 750.17| | ANATR|1996| 88.8| | ANATR|1997| 399.88| | ANATR|1998| 514.4| | ANTON|1996| 403.2| | ANTON|1997| 1192.16| | ANTON|1998| 660.0| | AROUT|1996| 689.5| | AROUT|1997| 915.27| | AROUT|1998| 1401.19| | BERGS|1996| 1441.47| | BERGS|1997| 1384.9| | BERGS|1998| 1350.83| | BLAUS|1997| 269.95| | BLAUS|1998| 720.0| | BLONP|1996| 3328.73| | BLONP|1997| 1116.84| | BLONP|1998| 730.0| | BOLID|1996| 982.0| +----------+----+--------------+ only showing top 20 rows