发布日期:2022-09-28
VIP内容
在Pycharm中开发PySpark结构化流程序
开发PySpark结构化流应用程序,使用交互式开发环境非常不方便,因此通常使用PyCharm来开发流应用程序。本节讲述如何在Windows环境下应用PyCharm来开发PySpark结构化流应用程序。
本节所使用的示例代码来自使用流数据源2_Kafka。
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
# 创建SparkSession实例
spark = SparkSession.builder.master("local[*]").appName("Kafka Source").getOrCreate()
# 创建一个流来监听test topic的消息
dataDF = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xueai8:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "earliest") \
.load()
# 查看这个DataFrame的schema
dataDF.printSchema()
# 将该流转换为String数据类型(key和value都是字节数组形式)
# kvstream = dataDF.selectExpr("CAST(key as string)", "CAST(value as string)")
kvstream = dataDF.selectExpr("CAST(value as string)", "topic", "partition", "offset")
# 将该流写出到控制台
query = kvstream.writeStream \
.outputMode("append") \
.format("console") \
.start()
# 等待流程序执行结束(作为作业文件提交时启用)
query.awaitTermination()
如果在Windows的PyCharm开发环境中编写以上代码,在运行时可能会遇到以下三类错误信息:
1) pyspark.sql.utils.AnalysisException: Failed to find data source: kafka.
原因:Spark结构化流要访问Kafka,缺少Spark SQL和Kafka的集成包。
2) java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer.
原因:Spark结构化流要访问Kafka,缺少对Kafka字节数组进行序列化的类(缺少相应的jar包)。
3) java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig.
原因:缺少common-pool2-2.x.x.jar包。
解决方法:
(1) 将以下三个jar包,拷贝到Windows文件系统的某个指定地方。比如,我这里将它们拷贝到项目根目录下的jars(自己创建的)目录下:
- spark-sql-kafka-0-10_2.12-3.1.2.jar
- spark-streaming-kafka-0-10-assembly_2.12-3.1.2.jar
- commons-pool2-2.11.1.jar
(2) 修改代码如下,在创建SparkSession的实例spark时,在配置中添加对以上三个jar包的引用。
# 使用Kafka数据源
from pyspark.sql import SparkSession
# 创建一个SparkSession实例spark
spark_jars = "../jars/spark-sql-kafka-0-10_2.12-3.1.2.jar, " \
"../jars/spark-streaming-kafka-0-10-assembly_2.12-3.1.2.jar, "\
"../jars/commons-pool2-2.11.1.jar"
spark = SparkSession.builder \
.master("local[2]") \
.appName("kafka source") \
.config("spark.jars", spark_jars) \
.getOrCreate()
# 设置shuffle后的分区数为2(默认200)
spark.conf.set("spark.sql.shuffle.partitions", "2")
# 创建一个流,监听Kafka中test topic的消息
dataDF = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xueai8:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "earliest") \
.load()
# 查看这个流DataFrame的schema
dataDF.printSchema()
# Kafka的topic中的数据具有固定的格式
# key(二进制), value(二进制), topic, partition, offset, timestamp, timestampType
kvstream = dataDF.selectExpr("CAST(value as string)", "topic", "partition", "offset")
# 写出到data sink
query = kvstream.writeStream.outputMode("append").format("console").start()
# 等待流程序结束
query.awaitTermination()
(3) 最终结果如下图所示: