自定义数据分区
默认情况下,Flink应用程序是使用key的哈希分区或随机分区。但在有的情况下,我们需要自定义分区规则,这就需要自己来定义分区器(分区程序)。
【示例】(Scala实现)使用自定义分区程序,按年龄对数据流元素划分分区。
1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala项目模板。(Flink项目创建过程,请参见2.2节)
2、设置依赖。在pom.xml中添加如下依赖(根据项目模板创建的话,这个依赖会自动添加,此步可省略):
Scala Maven依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>
3、创建一个case class类,用来表示流中的数据类型。代码如下:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* 自定义分区
*/
object PartitionDemo1 {
// 定义case class类,表示流数据类型
case class Person(name:String, age:Int)
def main(args: Array[String]): Unit = {
// 设置批处理执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 触发流程序开始执行
env.execute("stream demo")
}
}
4、自定义分区类,实现了Partitioner接口,按年龄(age字段)划分分区。这里我们按年龄把流数据分为三个分区:20岁以下的、20~30岁以及30岁以上的。代码如下:
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* 自定义分区
*/
object PartitionDemo1 {
// 定义case class类,表示流数据类型
case class Person(name:String, age:Int)
// 自定义分区器,以年龄为key
class AgePartitioner extends Partitioner[Int] {
// 重写partition方法
override def partition(key: Int, i: Int): Int = {
key match {
case age if age<20 => 0
case age if age>30 => 2
case _ => 1
}
}
}
def main(args: Array[String]): Unit = {
// 设置批处理执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 触发流程序开始执行
env.execute("stream demo")
}
}
5、测试自定义分区逻辑。编辑流处理代码如下:
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* 自定义分区
*/
object PartitionDemo1 {
// 定义case class类,表示流数据类型
case class Person(name:String, age:Int)
// 自定义分区器,以年龄为key
class AgePartitioner extends Partitioner[Int] {
// 重写partition方法
override def partition(k: Int, i: Int): Int = {
k match {
case age if age<20 => 0
case age if age>30 => 2
case _ => 1
}
}
}
def main(args: Array[String]): Unit = {
// 设置批处理执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 读取数据源,构造DataStream
val peoples = List(
Person("张三", 21),
Person("李四", 16),
Person("王老五", 35),
Person("张三2", 22),
Person("李四2", 17),
Person("王老五2", 36)
)
val personDS = env.fromCollection(peoples)
// 应用自定义分区器,按年龄字段进行分组
// 注:通过字段位置指定key只对元组数据类型有效
val adults = personDS.partitionCustom(new AgePartitioner, "age")
// 将结果输出到控制台
adults.print
// 触发流程序开始执行
env.execute("stream demo")
}
}
6、执行以上程序,输出结果如下:
分区之前分区数:8 ...... 2> Person(张三,21) 1> Person(李四,16) 3> Person(王老五,35) 1> Person(李四2,17) 2> Person(张三2,22) 3> Person(王老五2,36)
从上面的输出结果可以看到,数据在三个分区中分别并行地被处理。
注意:输出结果前面的整数是分区号。
对于简单的自定义分区器,也可以直接使用匿名内部类,以简化代码。如下所示:
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* 自定义分区
* 对于简单的自定义分区器,也可以直接使用匿名内部类,以简化代码。
*/
object PartitionDemo2 {
// 定义case class类,表示流数据类型
case class Person(name:String, age:Int)
def main(args: Array[String]): Unit = {
// 设置批处理执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 读取数据源,构造DataStream
val peoples = List(
Person("张三", 21),
Person("李四", 16),
Person("王老五", 35),
Person("张三2", 22),
Person("李四2", 17),
Person("王老五2", 36)
)
val personDS = env.fromCollection(peoples)
// 应用自定义分区器,按年龄字段进行分组
// 注:通过字段位置指定key只对元组数据类型有效
val adults = personDS.partitionCustom(new Partitioner[Int] {
// 重写partition方法
override def partition(k: Int, i: Int): Int = {
k match {
case age if age<20 => 0
case age if age>30 => 2
case _ => 1
}
}
}, "age") // 第二个分组字段参数,也可以使用_.age或person => person.age
// 将结果输出到控制台
adults.print
// 触发流程序开始执行
env.execute("stream demo")
}
}
【示例】(Java实现)使用自定义分区程序,按年龄对数据流元素划分分区。
1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)
2、设置依赖。在pom.xml中添加如下依赖(根据项目模板创建的话,这个依赖会自动添加,此步可省略):
Java Maven依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>
3、创建一个POJO类,用来表示流中的数据。代码如下:
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class PartitionDemo1 {
// POJO类,表示人员信息实体
public static class Person {
public String name; // 存储姓名
public Integer age; // 存储年龄
// 空构造器
public Person() {}
// 构造器,初始化属性
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
// 用于调试时输出信息
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
}
}
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 触发流程序开始执行
env.execute("stream demo");
}
}
4、自定义分区类,实现了Partitioner接口,按年龄(age字段)划分分区。这里我们按年龄把流数据分为三个分区:20岁以下的、20~30岁以及30岁以上的。代码如下:
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class PartitionDemo1 {
// POJO类,表示人员信息实体
public static class Person {
public String name; // 存储姓名
public Integer age; // 存储年龄
// 空构造器
public Person() {};
// 构造器,初始化属性
public Person(String name, Integer age) {
this.name = name;
this.age = age;
};
// 用于调试时输出信息
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
};
}
// 自定义分区器,以年龄为key
public static class AgePartitioner implements Partitioner {
@Override
public int partition(Integer key, int numPartitions) {
if(key < 20){
return 0;
}else if(key > 30){
return 2;
}else{
return 1;
}
}
}
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 触发流程序开始执行
env.execute("stream demo");
}
}
5、测试自定义分区逻辑。编辑流处理代码如下:
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class PartitionDemo1 {
// POJO类,表示人员信息实体
public static class Person {
public String name; // 存储姓名
public Integer age; // 存储年龄
// 空构造器
public Person() {};
// 构造器,初始化属性
public Person(String name, Integer age) {
this.name = name;
this.age = age;
};
// 用于调试时输出信息
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
};
}
// 自定义分区器,以年龄为key
public static class AgePartitioner implements Partitioner {
@Override
public int partition(Integer key, int numPartitions) {
if(key < 20){
return 0;
}else if(key > 30){
return 2;
}else{
return 1;
}
}
}
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据源,构造DataStream
DataStream<Person> personDS = env.fromElements(
new Person("张三", 21),
new Person("李四", 16),
new Person("王老五", 35),
new Person("张三2", 22),
new Person("李四2", 17),
new Person("王老五2", 36));
// 应用自定义分区器,按年龄字段进行分组
// 注:通过字段位置指定key只对元组数据类型有效
DataStream<Person> adults = personDS.partitionCustom(new AgePartitioner(), p -> p.age);
// 将结果输出到控制台
adults.print();
// 触发流程序开始执行
env.execute("stream demo");
}
}
6、执行以上程序,输出结果如下:
1> 李四: age 16 2> 张三: age 21 3> 王老五: age 35 1> 李四2: age 17 3> 王老五2: age 36 2> 张三2: age 22
从上面的输出结果可以看到,数据在三个分区中分别并行地被处理。
注意:输出结果前面的整数是分区号。