Spark SQL编程案例1_电商数据分析
本节通过几个案例的学习,掌握使用Spark SQL进行大数据分析的复杂用法。
【示例】电商数据分析示例。使用nw数据集,回答以下问题:
- 每个客户下了多少订单?
- 每个国家的订单有多少?
- 每月/年有多少订单?
- 每个客户的年销售总额是多少?
- 客户每年的平均订单是多少?
实现代码如下所示:
def main(args: Array[String]): Unit = {
// 0) 创建SparkSession的实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("Spark Basic Example")
.getOrCreate()
// 1、读取源数据文件
val filePath = "src/main/resources/"
// 加载订单数据集到DataFrame中
val orders = spark.read
.option("header","true")
.option("inferSchema","true")
.csv(filePath + "NW-Orders-01.csv")
println("订单有" + orders.count() + "行")
orders.printSchema()
orders.show(3)
// 加载订单明细数据集到DataFrame中
val orderDetails = spark.read
.option("header","true")
.option("inferSchema","true")
.csv(filePath + "NW-Order-Details.csv")
println("订单明细有" + orderDetails.count() + "行")
orderDetails.printSchema()
orderDetails.show(3)
// 2、每个客户下了多少订单?
val orderByCustomer = orders.groupBy("CustomerID").count()
orderByCustomer.sort(col("count").desc).show(3)
// 3、每个国家的订单有多少?
val orderByCountry = orders.groupBy("ShipCountry").count()
orderByCountry.sort(col("count").desc).show(3)
//# 对于后面三个问题,需要对数据进行一些转换
//# 1. 向Orders DataFrame增加一个OrderTotal列
//# 1.1. 计算每个订单明细的实际金额
//# 1.2. 根据order id统计每张订单的总金额
//# 1.3. 对order details & orders进行等值内连接,增加订单总金额
//# 1.4. 检查是否有任何null列
//# 2. 增加一个date列
//# 3. 增加month和year
//# 1.1. 向order details中增加每行的小计(每个订单明细的实际金额)
import spark.implicits._
val orderDetails1 = orderDetails
.select($"OrderID",(($"UnitPrice" * $"Qty") - ($"UnitPrice" * $"Qty") * $"Discount").as("OrderPrice"))
orderDetails1.show(5)
//# 1.2. 根据order id统计每张订单的总金额
// val orderTot = orderDetails1.groupBy("OrderID").sum("OrderPrice").alias("OrderTotal")
val orderTot = orderDetails1.groupBy("OrderID").agg(sum("OrderPrice").as("OrderTotal"))
// orderTot.sort("OrderID").show(5)
// orderTot.sort($"OrderTotal".desc).show(5)
orderTot.select($"OrderID",bround($"OrderTotal",2)).sort("OrderID").show(5)
//# 1.3. 对order details & orders进行等值内连接,增加订单总金额
val orders1 = orders
.join(orderTot, orders("OrderID").equalTo(orderTot("OrderID")), "inner")
.select(
orders("OrderID"),
orders("CustomerID"),
orders("OrderDate"),
orders("ShipCountry").alias("ShipCountry"),
orderTot("OrderTotal").alias("Total")
)
orders1.sort("CustomerID").show()
// # 1.4. 检查是否有任何null列
orders1.filter(orders1("Total").isNull).show()
// # 2. 增加一个date列
val orders2 = orders1.withColumn("Date",to_date(orders1("OrderDate")))
orders2.printSchema()
orders2.show(2)
// # 3. 增加month和year
val orders3 = orders2.withColumn("Month",month($"OrderDate")).withColumn("Year",year($"OrderDate"))
orders3.show(2)
// Q 4. 每月/年有多少订单金额?
val ordersByYM = orders3.groupBy("Year","Month").agg(sum("Total").as("Total"))
ordersByYM.select($"Year",$"Month",bround($"Total",2) as "Total").sort($"Year",$"Month").show()
// Q 5. 每个客户的年销售总额是多少?
var ordersByCY = orders3.groupBy("CustomerID","Year").agg(sum("Total").as("Total"))
ordersByCY.sort($"CustomerID",$"Year").show()
// Q 6. 客户每年的平均订单是多少?
ordersByCY = orders3.groupBy("CustomerID","Year").agg(avg("Total").as("Avg"))
// ordersByCY.show(5)
ordersByCY.select($"CustomerID",$"Year",bround($"Avg",2)).sort($"CustomerID",$"Year").show()
// Q 7. 客户的平均订单金额是多少?
val ordersCA = orders3.groupBy("CustomerID").agg(avg("Total").as("C-Avg"), sum("Total").as("C-Sum"))
ordersCA.sort(col("C-Avg").desc).show()
}