PySpark SQL编程案例:用户数据集分析

【示例】下面这个示例分析PySpark安装包自带的people.txt文件内容,但是以编程方式指定模式。

首先加载people.txt到一个RDD中,代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# 构建SparkSession实例
spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark sql demo") \
   .getOrCreate()

# 构造一个RDD
file = "/data/spark/resources/people.txt"

# 创建一个RDD
peopleRDD = spark.sparkContext.textFile(file)

# 指定一个Schema(模式)
fields = [
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
] 
schema = StructType(fields) 

# 将RDD[String]转换为RDD[(String, Int)]
rowRDD = peopleRDD \
      .map(lambda line: line.split(",")) \
      .map(lambda arr:(arr[0], int(arr[1])))

# 将这个schema应用到该RDD
peopleDF = spark.createDataFrame(rowRDD, schema)

# 输出 schema和内容
peopleDF.printSchema()
peopleDF.show()

执行以上代码,输出结果如下:

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

然后,注册一个临时视图,使用SQL对该视图进行查询,代码如下:

# 使用该DataFrame创建一个临时视图
peopleDF.createOrReplaceTempView("people")

# SQL可以在使用DataFrames创建的临时视图上运行
sqlStr = "SELECT name, age FROM people WHERE age BETWEEN 13 AND 19"
results = spark.sql(sqlStr)

results.show()

执行以上代码,输出内容如下:

+------------+
|        value|
+------------+
|Name: Justin|
+------------+

《Flink原理深入与编程实战》