发布日期:2022-07-31 VIP内容

PySpark SQL分析案例:电商订单数据分析

【示例】Northwind是一个最初由Microsoft创建的示例数据库,包含一个名为“Northwind Traders”的虚拟公司的销售数据,该公司从世界各地进口和出口特色食品。现要求通过分析该电商数据集,回答以下问题:

  • (1) 每个客户下了多少订单?
  • (2) 每个国家的订单有多少?
  • (3) 每月/年有多少订单?
  • (4) 每个客户的年销售总额是多少?
  • (5) 客户每年的平均订单是多少?

要回答以上问题,需要使用到其中的订单表和订单明细表。请按以下步骤操作。

(1) 首先构造一个SparkSession实例,代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# 创建SparkSession的实例
spark = SparkSession.builder \
    .master("spark://localhost:7077") \
    .appName("pyspark sql demo") \
    .config("spark.hadoop.hive.exec.dynamic.partition", "true") \
    .config("spark.hadoop.hive.exec.dynamic.partition.mode","nonstrict") \
    .enableHiveSupport() \
    .getOrCreate()

(2) 加载订单数据集,代码如下:

# 1、读取源数据文件
filePath = "/data/spark/nw/"

# 加载订单数据集到DataFrame中
orders = spark.read \
      .option("header","true") \
      .option("inferSchema","true") \
      .csv(filePath + "NW-Orders-01.csv") \

print(f"订单有{orders.count()}行")

orders.printSchema()
orders.show(3)

执行以上代码,输出结果如下:

订单有830行
root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- EmployeeID: integer (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- ShipCountry: string (nullable = true)

+-------+----------+----------+---------+-----------+
|OrderID|CustomerID|EmployeeID|OrderDate|ShipCountry|
+-------+----------+----------+---------+-----------+
|  10248|      VINET|           5| 1996-7-2|      France|
|  10249|      TOMSP|           6| 1996-7-3|     Germany|
|  10250|      HANAR|           4| 1996-7-6|      Brazil|
+-------+----------+----------+---------+-----------+
only showing top 3 rows

(3) 加载订单明细数据集,代码如下:

# 加载订单明细数据集到DataFrame中
orderDetails = spark.read \
      .option("header","true") \
      .option("inferSchema","true") \
      .csv(filePath + "NW-Order-Details.csv") \

print(f"订单明细有{orderDetails.count()}行")
orderDetails.printSchema()
orderDetails.show(3)

执行以上代码,输出结果如下:

订单明细有2155行
root
 |-- OrderID: integer (nullable = true)
 |-- ProductId: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- Qty: integer (nullable = true)
 |-- Discount: double (nullable = true)

+-------+---------+---------+---+--------+
|OrderID|ProductId|UnitPrice|Qty|Discount|
+-------+---------+---------+---+--------+
|  10248|        11|       14.0| 12|      0.0|
|  10248|        42|        9.8| 10|      0.0|
|  10248|        72|       34.8|  5|      0.0|
+-------+---------+---------+---+--------+
only showing top 3 rows

(4) 回答第1个问题:每个客户下了多少订单?代码如下:

from pyspark.sql.functions import col

orderByCustomer = orders.groupBy("CustomerID").count()
orderByCustomer.sort(col("count").desc()).show(3)

执行以上代码,输出结果如下:

+----------+-----+
|CustomerID|count|
+----------+-----+
|      SAVEA|   31|
|      ERNSH|   30|
|      QUICK|   28|
|      HUNGO|   19|
|      FOLKO|   19|
|      HILAA|   18|
|      RATTC|   18|
|      BERGS|   18|
|      BONAP|   17|
|      WARTH|   15|
+----------+-----+
only showing top 10 rows

(5) 回答第2个问题:每个国家的订单有多少?代码如下:

orderByCountry = orders.groupBy("ShipCountry").count()
orderByCountry.sort(col("count").desc()).show(3)

执行以上代码,输出结果如下:

+-----------+-----+
|ShipCountry|count|
+-----------+-----+
|     Germany|  122|
|          USA|  122|
|      Brazil|   82|
|      France|   77|
|           UK|   56|
|  Venezuela|   46|
|     Austria|   40|
|      Sweden|   37|
|      Canada|   30|
|      Mexico|   28|
+-----------+-----+
only showing top 10 rows

对于后面三个问题,需要对数据进行一些转换,包括:

  • (1) 向Orders DataFrame增加一个OrderTotal列,为此,需要计算每个明细的实际金额,然后根据order id统计每张订单的总金额,并对order details & orders进行等值内连接,增加订单总金额。另外还要检查是否有任何null列。
  • (2) 增加一个date列。
  • (3) 增加month和year列,以便按月进行统计。

(6) 向Orders DataFrame增加一个OrderTotal列。首先,向order details中增加每行的小计(即每个订单明细的实际金额),代码如下:

orderDetails1 = orderDetails.select(
    "OrderID",
    (col("UnitPrice") * col("Qty") - col("UnitPrice") * col("Qty") * col("Discount")).alias("OrderPrice")
)

orderDetails1.show(10)

执行以上代码,输出结果如下:

+-------+----------+
|OrderID|OrderPrice|
+-------+----------+
|  10248|      168.0|
|  10248|       98.0|
|  10248|      174.0|
|  10249|      167.4|
|  10249|     1696.0|
|  10250|       77.0|
|  10250|     1261.4|
|  10250|      214.2|
|  10251|      95.76|
|  10251|      222.3|
+-------+----------+
only showing top 10 rows

然后根据order id统计每张订单的总金额,代码如下:

from pyspark.sql.functions import sum,bround

orderTot = orderDetails1 \
	.groupBy("OrderID") \
	.agg(sum("OrderPrice").alias("OrderTotal"))

orderTot \
	.select("OrderID",bround("OrderTotal",2).alias("OrderTotal")) \
	.sort("OrderID") \
	.show(5)
# orderTot.sort(col("OrderTotal").desc()).show(10)

执行以上代码,输出结果如下:

+-------+----------+
|OrderID|OrderTotal|
+-------+----------+
|  10248|      440.0|
|  10249|     1863.4|
|  10250|     1552.6|
|  10251|     654.06|
|  10252|     3597.9|
|  10253|     1444.8|
|  10254|     556.62|
|  10255|     2490.5|
|  10256|      517.8|
|  10257|     1119.9|
+-------+----------+
only showing top 10 rows

接下来,对orderTot & orders进行等值内连接,为订单表增加一个订单总金额属性列(Total),代码如下:

orders1 = orders \
      .join(orderTot, "OrderID", "inner") \
      .select(
        orders.OrderID,
        orders.CustomerID,
        orders.OrderDate,
        orders.ShipCountry,
        orderTot.OrderTotal.alias("Total")
      )

orders1.sort("CustomerID").show()

执行以上代码,输出结果如下:

+-------+----------+----------+-----------+------------------+
|OrderID|CustomerID| OrderDate|ShipCountry|               Total|
+-------+----------+----------+-----------+------------------+
|  11011|      ALFKI|  1998-4-7|     Germany|                933.5|
|  10692|      ALFKI| 1997-10-1|     Germany|                878.0|
|  10702|      ALFKI|1997-10-11|     Germany|                330.0|
|  10835|      ALFKI| 1998-1-13|     Germany|                845.8|
|  10643|      ALFKI| 1997-8-23|     Germany|                814.5|
|  10952|      ALFKI| 1998-3-14|     Germany|                471.2|
|  10308|      ANATR| 1996-9-16|      Mexico|                 88.8|
|  10926|      ANATR|  1998-3-2|      Mexico|                514.4|
|  10759|      ANATR|1997-11-26|      Mexico|                320.0|
|  10625|      ANATR|  1997-8-6|      Mexico|               479.75|
|  10507|      ANTON| 1997-4-13|      Mexico|            749.0625|
|  10365|      ANTON|1996-11-25|      Mexico|403.20000000000005|
|  10535|      ANTON| 1997-5-11|      Mexico|             1940.85|
|  10573|      ANTON| 1997-6-17|      Mexico|              2082.0|
|  10682|      ANTON| 1997-9-23|      Mexico|               375.5|
|  10677|      ANTON| 1997-9-20|      Mexico|             813.365|
|  10856|      ANTON| 1998-1-26|      Mexico|               660.0|
|  10383|      AROUT|1996-12-14|           UK|               899.0|
|  10355|      AROUT|1996-11-13|           UK|               480.0|
|  10453|      AROUT| 1997-2-19|           UK|               407.7|
+-------+----------+----------+-----------+------------------+
only showing top 20 rows

最后,检查orders是否有空值,代码如下:

orders1.filter(col("Total").isNull()).show()

执行以上代码,输出结果如下:

+-------+----------+---------+-----------+-----+
|OrderID|CustomerID|OrderDate|ShipCountry|Total|
+-------+----------+---------+-----------+-----+
+-------+----------+---------+-----------+-----+

可以看到没有空值,说明不需要处理空值。

(7) 对OrderDate列进行规范化,转换为一个新的Date属性列,代码如下:

from pyspark.sql.functions import to_date

orders2 = orders1.withColumn("Date",to_date("OrderDate"))
orders2.printSchema()
orders2.show(5)

执行以上代码,输出结果如下:

root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- ShipCountry: string (nullable = true)
 |-- Total: double (nullable = true)
 |-- Date: date (nullable = true)

+-------+----------+---------+-----------+------------------+----------+
|OrderID|CustomerID|OrderDate|ShipCountry|               Total|        Date|
+-------+----------+---------+-----------+------------------+----------+
|  10248|      VINET| 1996-7-2|      France|                440.0|1996-07-02|
|  10249|      TOMSP| 1996-7-3|     Germany|               1863.4|1996-07-03|
|  10250|      HANAR| 1996-7-6|      Brazil|1552.6000000000001|1996-07-06|
|  10251|      VICTE| 1996-7-6|      France|               654.06|1996-07-06|
|  10252|      SUPRD| 1996-7-7|     Belgium|               3597.9|1996-07-07|
+-------+----------+---------+-----------+------------------+----------+
only showing top 5 rows

(8) 再从OrderDate列中分别抽取月份和年份部分,增加两个新的属性列month和year,代码如下:

from pyspark.sql.functions import month,year

orders3 = orders2 \
	.withColumn("Month",month("OrderDate")) \
	.withColumn("Year",year("OrderDate"))
orders3.show(10)

执行以上代码,输出结果如下:

+-------+----------+---------+-----------+------------------+----------+-----+----+
|OrderID|CustomerID|OrderDate|ShipCountry|               Total|      Date|Month|Year|
+-------+----------+---------+-----------+------------------+----------+-----+----+
|  10248|     VINET| 1996-7-2|     France|             440.0|1996-07-02|    7|1996|
|  10249|     TOMSP| 1996-7-3|    Germany|            1863.4|1996-07-03|    7|1996|
|  10250|     HANAR| 1996-7-6|     Brazil|1552.6000000000001|1996-07-06|    7|1996|
|  10251|     VICTE| 1996-7-6|     France|            654.06|1996-07-06|    7|1996|
|  10252|     SUPRD| 1996-7-7|    Belgium|            3597.9|1996-07-07|    7|1996|
|  10253|     HANAR| 1996-7-8|     Brazil|1444.8000000000002|1996-07-08|    7|1996|
|  10254|     CHOPS| 1996-7-9|Switzerland| 556.6199999999999|1996-07-09|    7|1996|
|  10255|     RICSU|1996-7-10|Switzerland|            2490.5|1996-07-10|    7|1996|
|  10256|     WELLI|1996-7-13|     Brazil|             517.8|1996-07-13|    7|1996|
|  10257|     HILAA|1996-7-14|  Venezuela|            1119.9|1996-07-14|    7|1996|
+-------+----------+---------+-----------+------------------+----------+-----+----+
only showing top 10 rows

整理好相关的属性之后,现在可以回答示例开始所提出的剩余问题了。

(9) 现在可以回答第3个问题:每月/年有多少订单金额?代码如下:

ordersByYM = orders3 \
	.groupBy("Year","Month") \
	.agg(sum("Total").alias("Total"))

ordersByYM.select(
    "Year",
    "Month",
    bround("Total",2).alias("Total")
).sort("Year","Month").show(10)

执行以上代码,输出结果如下:

+----+-----+---------+
|Year|Month|    Total|
+----+-----+---------+
|1996|    7|  30741.9|
|1996|    8| 22726.88|
|1996|    9|  27691.4|
|1996|   10| 38380.12|
|1996|   11| 45694.44|
|1996|   12| 52494.33|
|1997|    1| 51612.97|
|1997|    2| 38483.64|
|1997|    3| 40918.82|
|1997|    4| 57116.71|
|1997|    5| 50270.33|
|1997|    6| 34392.08|
|1997|    7| 52744.68|
|1997|    8| 46991.78|
|1997|    9| 57723.23|
|1997|   10| 62253.63|
|1997|   11| 51294.81|
|1997|   12| 67920.23|
|1998|    1|107049.96|
|1998|    2| 85240.83|
+----+-----+---------+
only showing top 20 rows

(10) 回答第4个问题:每个客户的年销售总额是多少?代码如下:

ordersByCY = orders3 \
	.groupBy("CustomerID","Year") \
	.agg(sum("Total").alias("Total"))

ordersByCY.sort("CustomerID","Year").show(10)

执行以上代码,输出结果如下:

+----------+----+------------------+
|CustomerID|Year|               Total|
+----------+----+------------------+
|     ALFKI|1997|               2022.5|
|     ALFKI|1998|               2250.5|
|     ANATR|1996|                 88.8|
|     ANATR|1997|               799.75|
|     ANATR|1998|                514.4|
|     ANTON|1996| 403.20000000000005|
|     ANTON|1997|            5960.7775|
|     ANTON|1998|                660.0|
|     AROUT|1996|               1379.0|
|     AROUT|1997|  6406.900000000001|
|     AROUT|1998|              5604.75|
|     BERGS|1996|               4324.4|
|     BERGS|1997| 13849.015000000001|
|     BERGS|1998|            6754.1625|
|     BLAUS|1997|               1079.8|
|     BLAUS|1998|               2160.0|
|     BLONP|1996|               9986.2|
|     BLONP|1997|             7817.88|
|     BLONP|1998|                730.0|
|     BOLID|1996|                982.0|
+----------+----+------------------+
only showing top 20 rows

(11) 回答第5个问题:客户每年的平均订单金额是多少?代码如下:

from pyspark.sql.functions import avg

ordersByCY = orders3 \
	.groupBy("CustomerID","Year") \
	.agg(avg("Total").alias("Avg"))

ordersByCY.select(
    "CustomerID",
    "Year",
    bround("Avg",2)
).sort("CustomerID","Year").show(10)

执行以上代码,输出结果如下:

+----------+----+--------------+
|CustomerID|Year|bround(Avg, 2)|
+----------+----+--------------+
|      ALFKI|1997|          674.17|
|      ALFKI|1998|          750.17|
|      ANATR|1996|            88.8|
|      ANATR|1997|          399.88|
|      ANATR|1998|           514.4|
|      ANTON|1996|           403.2|
|      ANTON|1997|         1192.16|
|      ANTON|1998|           660.0|
|      AROUT|1996|           689.5|
|      AROUT|1997|          915.27|
|      AROUT|1998|         1401.19|
|      BERGS|1996|         1441.47|
|      BERGS|1997|          1384.9|
|      BERGS|1998|         1350.83|
|      BLAUS|1997|          269.95|
|      BLAUS|1998|           720.0|
|      BLONP|1996|         3328.73|
|      BLONP|1997|         1116.84|
|      BLONP|1998|           730.0|
|      BOLID|1996|           982.0|
+----------+----+--------------+
only showing top 20 rows