发布日期:2022-09-28 VIP内容

在Pycharm中开发PySpark结构化流程序

开发PySpark结构化流应用程序,使用交互式开发环境非常不方便,因此通常使用PyCharm来开发流应用程序。本节讲述如何在Windows环境下应用PyCharm来开发PySpark结构化流应用程序。

本节所使用的示例代码来自使用流数据源2_Kafka

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# 创建SparkSession实例
spark = SparkSession.builder.master("local[*]").appName("Kafka Source").getOrCreate()

# 创建一个流来监听test topic的消息
dataDF = spark.readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "xueai8:9092") \
      .option("subscribe", "test") \
      .option("startingOffsets", "earliest") \
      .load()

# 查看这个DataFrame的schema
dataDF.printSchema()

# 将该流转换为String数据类型(key和value都是字节数组形式)
# kvstream = dataDF.selectExpr("CAST(key as string)", "CAST(value as string)")
kvstream = dataDF.selectExpr("CAST(value as string)", "topic", "partition", "offset")

# 将该流写出到控制台
query = kvstream.writeStream \
      .outputMode("append") \
      .format("console") \
      .start()

# 等待流程序执行结束(作为作业文件提交时启用)
query.awaitTermination()

如果在Windows的PyCharm开发环境中编写以上代码,在运行时可能会遇到以下三类错误信息:

1) pyspark.sql.utils.AnalysisException: Failed to find data source: kafka.

原因:Spark结构化流要访问Kafka,缺少Spark SQL和Kafka的集成包。

2) java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer.

原因:Spark结构化流要访问Kafka,缺少对Kafka字节数组进行序列化的类(缺少相应的jar包)。

3) java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig.

原因:缺少common-pool2-2.x.x.jar包。

解决方法:

(1) 将以下三个jar包,拷贝到Windows文件系统的某个指定地方。比如,我这里将它们拷贝到项目根目录下的jars(自己创建的)目录下:

  • spark-sql-kafka-0-10_2.12-3.1.2.jar
  • spark-streaming-kafka-0-10-assembly_2.12-3.1.2.jar
  • commons-pool2-2.11.1.jar

(2) 修改代码如下,在创建SparkSession的实例spark时,在配置中添加对以上三个jar包的引用。

# 使用Kafka数据源
from pyspark.sql import SparkSession

# 创建一个SparkSession实例spark
spark_jars = "../jars/spark-sql-kafka-0-10_2.12-3.1.2.jar, " \
                   "../jars/spark-streaming-kafka-0-10-assembly_2.12-3.1.2.jar, "\
                   "../jars/commons-pool2-2.11.1.jar"

spark = SparkSession.builder \
    .master("local[2]") \
    .appName("kafka source") \
    .config("spark.jars", spark_jars) \
    .getOrCreate()

# 设置shuffle后的分区数为2(默认200)
spark.conf.set("spark.sql.shuffle.partitions", "2")

# 创建一个流,监听Kafka中test topic的消息
dataDF = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "xueai8:9092") \
    .option("subscribe", "test") \
    .option("startingOffsets", "earliest") \
    .load()

# 查看这个流DataFrame的schema
dataDF.printSchema()

# Kafka的topic中的数据具有固定的格式
# key(二进制), value(二进制), topic, partition, offset, timestamp, timestampType
kvstream = dataDF.selectExpr("CAST(value as string)", "topic", "partition", "offset")

# 写出到data sink
query = kvstream.writeStream.outputMode("append").format("console").start()

# 等待流程序结束
query.awaitTermination()

(3) 最终结果如下图所示: