PySpark SQL编程模型

所有的PySpark SQL应用程序都以特定的步骤来工作。这些工作步骤如下图所示:

也就是说,每个PySpark SQL应用程序都由相同的基本部分组成:

  • 从数据源加载数据,构造DataFrame;
  • 对DataFrame执行转换(transformation)操作;
  • 将最终的DataFrame存储到指定位置。

下面我们实现一个完整的的PySpark SQL应用程序。

【示例】加载json文件中的人员信息,并进行统计。请按以下步骤执行。

1)准备数据源文件。

Spark安装目录中自带了一个people.json文件,位于“examples/src/main/resources/”目录下。其内容如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

我们将这个people.json文件,拷贝到/home/hduser/data/spark/resources/目录下。

2)创建一个Jupyter Notebook文件,编辑代码如下:

from pyspark.sql import SparkSession

# 1)构建SparkSession实例
spark = SparkSession.builder \
   .master("spark://xueai8:7077") \
   .appName("pyspark sql demo") \
   .getOrCreate()

# 2)加载数据源,构造DataFrame
input = "file:///home/hduser/data/spark/resources/people.json"
df = spark.read.json(input)

# 3)执行转换操作
from pyspark.sql.functions import *

# 找出年龄超过21岁的人
resultDF = df.where(col("age") > 21)

# 显示DataFrame数据  
resultDF.show()   

# 4)将结果保存到csv文件中
output = "/data/spark/people-output"
resultDF.write.format("csv").save(output)

# 5)停止SparkSession
spark.stop()

3)执行过程和执行结果如下:

在本例中,我们使用了本地文件系统。大家可自行修改代码,使用HDFS文件系统。


《Spark原理深入与编程实战》