PySpark SQL编程案例:单词计数
下面这个示例中,使用DataFrame和SQL两种方式来实现单词计数功能。
【示例】统计某个英文文本中的词频,找出出现频次最高的三个单词。实现过程和代码如下所示。
(1) 准备数据文件。请自行创建一个纯文本文件word.txt,并编辑内容如下:
good good study day day up
将该文件上传到HDFS的/data/spark/目录下。
(2) 方法一:使用关系型 API实现单词计数,代码如下:
from pyspark.sql import SparkSession
# 构建SparkSession实例
spark = SparkSession.builder \
.master("spark://localhost:7077") \
.appName("pyspark sql demo") \
.getOrCreate()
# 定义文件路径
filePath = "/data/spark/words.txt"
df = spark.read.text(filePath)
# df.printSchema()
# df.show()
# 对DataFrame进行一系列处理,产生一个包含最终结果的DataFrame
wordDF = wordDF.rdd \
.flatMap(lambda line: line.value.split(" ")) \
.map(lambda word: (word, 1)) \
.toDF(["word", "one"])
# wordDF.show()
# 获得前3个出现频率最高的词
from pyspark.sql.functions import col
top3 = wordDF \
.groupBy("word") \
.count() \
.orderBy(col("count").desc()) \
.limit(3)
# 输出结果
top3.show()
执行以上代码,输出结果如下:
+----+-----+ |word|count| +----+-----+ | day| 2| |good| 2| | up| 1| +----+-----+
(3) 方法二:使用SQL语句,代码如下:
from pyspark.sql import SparkSession
# 构建SparkSession实例
spark = SparkSession.builder \
.master("spark://localhost:7077") \
.appName("pyspark sql demo") \
.getOrCreate()
# 定义文件路径
filePath = "/data/spark/words.txt"
df = spark.read.text(filePath)
# df.printSchema()
# df.show()
# 对DataFrame进行一系列处理,产生一个包含最终结果的DataFrame
wordDF = wordDF.rdd \
.flatMap(lambda line: line.value.split(" ")) \
.map(lambda word: (word, 1)) \
.toDF(["word", "one"])
# wordDF.show()
# 注册为临时view
wordDF.createOrReplaceTempView("wc_tb")
# 执行SQL查询,分析产生结果
sql = """
select word,count(1) as count
from wc_tb
group by word
order by count desc
"""
resultDF = spark.sql(sql)
resultDF.limit(3).show()
执行以上代码,输出结果如下:
+----+-----+ |word|count| +----+-----+ | day| 2| |good| 2| | up| 1| +----+-----+