发布日期:2021-12-06 VIP内容

Kafka故障转移示例演示

在本示例中,我们将运行多个Kafka节点。所以本地开发机器至少需要16G的内存。如果内存小于16G,则可以只运行两台服务器。

我们将创建一个复制的主题。然后,我们将演示消费者故障转移和Broker故障转移,以及Kafka消费者的负载平衡。我们将展示在多个消费者组中,Kafka像发布/订阅系统一样工作。但是,当我们把所有的消费者放在同一个消费者组时,Kafka将负载共享消息给在同一组的消费者。

接下来,请按以下步骤操作。

创建三个新的Kafka server-n.properties文件

首先,复制现有的Kafka server.properties到server-0.properties、server-1.properties和server-2.properties。然后使用文本编辑器分别打开进行修改。

修改server-0.properties:

broker.id=0
port=9092
log.dirs=./logs/kafka-0
......

修改server-1.properties:

broker.id=1
port=9093
log.dirs=./logs/kafka-1
......

修改server-2.properties:

broker.id=2
port=9094
log.dirs=./logs/kafka-2
......

为三个Kafka服务器创建启动脚本

启动脚本将使用相应的属性文件运行kafka-server-start.sh。

创建并编辑start-1st-server.sh:

#!/usr/bin/env bash
CONFIG=`pwd`/config

cd ~/bigdata/kafka_2.12-2.4.1

## Run Kafka
./bin/kafka-server-start.sh "$CONFIG/server-0.properties"

创建并编辑start-2nd-server.sh:

#!/usr/bin/env bash
CONFIG=`pwd`/config

cd ~/bigdata/kafka_2.12-2.4.1

## Run Kafka
./bin/kafka-server-start.sh "$CONFIG/server-1.properties"

创建并编辑start-3rd-server.sh:

#!/usr/bin/env bash
CONFIG=`pwd`/config

cd ~/bigdata/kafka_2.12-2.4.1

## Run Kafka
./bin/kafka-server-start.sh "$CONFIG/server-2.properties"

注意,我们传递了在上一步中创建的Kafka服务器属性文件。

现在在独立的终端/shell中运行这三个脚本程序。

# 在第一个终端窗口
$ ./start-1st-server.sh

# 在第二个终端窗口
$ ./start-2nd-server.sh

# 在第三个终端窗口
$ ./start-3rd-server.sh

给服务器一分钟时间启动并连接到ZooKeeper。

创建Kafka复制主题my-failsafe-topic

接下来,我们将创建一个控制台生产者和控制台消费者都可以使用的复制主题。

创建并编辑create-replicated-topic.sh脚本:

#!/usr/bin/env bash

cd ~/bigdata/kafka_2.12-2.4.1

./bin/kafka-topics.sh --create \
    --zookeeper localhost:2181 \
    --replication-factor 3 \
    --partitions 13 \
    --topic my-failsafe-topic

注意,复制因子被设置为3,主题名称是my-failsafe-topic,它有13个分区。

然后我们只需要运行脚本来创建主题。

$ cd ~/bigdata/kafka_2.12-2.4.1

$ ./create-replicated-topic.sh

启动使用复制主题的Kafka消费者

接下来,创建一个启动消费者的脚本,然后使用该脚本启动消费者。

创建并编辑start-consumer-console-replicated.sh:

#!/usr/bin/env bash
cd ~/bigdata/kafka_2.12-2.4.1

./bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9094,localhost:9092 \
    --topic my-failsafe-topic \
    --from-beginning

注意,Kafka服务器列表被传递给--bootstrap-server参数。只是,前面运行的三个服务器中有两个传递了。尽管只需要一个Broker,但消费者客户端将只从一个服务器了解另一个Broker。通常,我们会列出多个代理,以防出现中断,以便客户端机能够连接。

现在我们运行这个脚本来启动消费者。

cd ~/bigdata/kafka_2.12-2.4.1

$ ./start-consumer-console-replicated.sh

启动使用复制主题的Kafka生产者

接下来,我们创建一个启动生产者的脚本。然后使用创建的脚本启动生产者。

创建并编辑start-consumer-producer-replicated.sh:

#!/usr/bin/env bash
cd ~/bigdata/kafka_2.12-2.4.1

./bin/kafka-console-producer.sh \
--broker-list localhost:9092,localhost:9093 \
--topic my-failsafe-topic

注意,我们启动了Kafka producer,并通过参数--broker-list传递给它一个Kafka Brokers列表。

现在使用该启动脚本来启动生产者,如下所示。

$ cd ~/bigdata/kafka_2.12-2.4.1

$ ./start-consumer-producer-replicated.sh

现在发送消息

现在从生产者向Kafka发送一些消息,并观察被消费者消费的这些消息。

Producer控制台:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-consumer-producer-replicated.sh

Hi Mom
How are you?
How are things going?
Good!

Consumer控制台:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-consumer-console-replicated.sh

Hi Mom
How are you?
How are things going?
Good!

现在启动两个更多的消费者和发送更多的消息

现在在它们自己的终端窗口中启动另外两个消费者,并从生产者发送更多消息。

Producer控制台:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-consumer-producer-replicated.sh

Hi Mom
How are you?
How are things going?
Good!
message 1
message 2
message 3

第1个Consumer控制台:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-consumer-console-replicated.sh

Hi Mom
How are you?
How are things going?
Good!
message 1
message 2
message 3

在新的终端的第2个Consumer控制台:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-consumer-console-replicated.sh

Hi Mom
How are you?
How are things going?
Good!
message 1
message 2
message 3

在新的终端的第3个Consumer控制台:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-consumer-console-replicated.sh

Hi Mom
How are you?
How are things going?
Good!
message 1
message 2
message 3

请注意,消息被发送到所有消费者,因为每个消费者都在不同的消费者组中。

把消费者改在自己的消费者群体中

停止之前运行的生产者和消费者,但让Kafka和ZooKeeper保持运行。

现在让我们修改start-consumer-console-replicate.sh脚本来添加一个Kafka消费者组。我们想把所有的消费者都放在同一个消费者组中。通过这种方式,消费者将共享消息,因为消费者组中的每个消费者将获得其分区的份额。

修改start-consumer-console-replicated.sh脚本文件:

#!/usr/bin/env bash
cd ~/bigdata/kafka_2.12-2.4.1

./bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9094,localhost:9092 \
    --topic my-failsafe-topic \
    --consumer-property group.id=mygroup

注意,脚本与前面相同,只是我们添加了--consumer-property group.id=mygroup,这将把与此脚本一起运行的每个消费者都放入mygroup消费者组。

现在我们只运行生产者和三个消费者。

运行start-consumer-console-replicated.sh脚本三次:

$ cd ~/bigdata/kafka_2.12-2.4.1

$ ./start-consumer-console-replicated.sh

运行生产者脚本:

$ cd ~/bigdata/kafka_2.12-2.4.1

$ ./start-consumer-producer-replicated.sh

现在从Kafka生产者控制台发送7条消息。

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-consumer-producer-replicated.sh

m1
m2
m3
m4
m5
m6
m7

请注意,消息在消费者之间是均匀分布的。

第一个Kafka消费者得到m3, m5:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-consumer-console-replicated.sh
m3
m5

注意到,第一个消费者得到消息m3和m5。

第二个Kafka消费者得到m2, m6:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-consumer-console-replicated.sh
m2
m6

注意到,第二个消费者得到消息m2和m6。

第三个Kafka消费者得到m1, m4和m7:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-consumer-console-replicated.sh
m1
m4
m7

注意到,第三个消费者得到消息m1、m4和m7。

请注意,组中的每个使用者都得到了共享了消息。

Kafka消费者故障转移

接下来,让我们通过杀死一个消费者并发送另外7条消息来演示消费者故障转移。Kafka应该将工作分配给正在运行的消费者。

首先,杀掉第三个消费者(在消费者终端中按下CTRL+C)。

现在用Kafka console-producer脚本发送7条消息。

生产者控制台-发送7个消息,m8到m14

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-consumer-producer-replicated.sh

m1
...
m8
m9
m10
m11
m12
m13
m14

注意,消息均匀地分布在剩余的使用者之间。

第1个Kafka消费者获得m8, m9, m11, m14

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-consumer-console-replicated.sh
m3
m5
m8
m9
m11
m14

第一个消费者获取了m8、m9、m11和m14。

第2个Kafka消费者获得m10, m12, m13

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-consumer-console-replicated.sh
m2
m6
m10
m12
m13

第二个消费者得到了m10、m12和m13。

我们杀死了一个消费者,发送了另外7条消息,Kafka将负载分散到剩下的消费者。Kafka消费者故障转移工作了!

创建Kafka描述主题脚本

可以使用Kafka-topics.sh脚本查看Kafka主题在Kafka Brokers中的布局。---describe将显示分区、ISR和Broker分区领导。

创建并编辑describe-topics.sh:

#!/usr/bin/env bash

cd ~/bigdata/kafka_2.12-2.4.1

# 现有主题列表
./bin/kafka-topics.sh --describe \
    --topic my-failsafe-topic \
    --zookeeper localhost:2181

让我们运行kafka-topics.sh --describe,并查看my-failsafe-topic的拓扑结构。

我们将列出哪个Broker拥有哪个分区(哪个分区的leader),以及每个分区的副本和ISR。ISR是最新的副本。记住一共有13个分区。

Kafka主题分区所有权的拓扑如下:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./describe-topics.sh

Topic: my-failsafe-topic    PartitionCount: 13    ReplicationFactor: 3    Configs:
    Topic: my-failsafe-topic    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: my-failsafe-topic    Partition: 1    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: my-failsafe-topic    Partition: 2    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: my-failsafe-topic    Partition: 3    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: my-failsafe-topic    Partition: 4    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: my-failsafe-topic    Partition: 5    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: my-failsafe-topic    Partition: 6    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: my-failsafe-topic    Partition: 7    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: my-failsafe-topic    Partition: 8    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: my-failsafe-topic    Partition: 9    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: my-failsafe-topic    Partition: 10    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: my-failsafe-topic    Partition: 11    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: my-failsafe-topic    Partition: 12    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1

请注意,每个Broker如何作为leader和follower共享分区。此外,看看Kafka如何在每个代理上复制分区。

测试Broker故障转移

让我们杀掉第一个Broker服务器,然后测试Broker的故障转移。

杀掉第一个Broker:

$ kill `ps aux | grep java | grep server-0.properties | tr -s " " | cut -d " " -f2`

我们可以通过在Broker终端中按CTRL-C或运行上述命令来停止第一个Broker。

现在,第一个Kafka Broker已经停止,让我们用Kafka的主题描述,看看被选出的新的领导人!

再次运行describe-topics.sh查看领导层的变化:

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./describe-topics.sh

Topic:my-failsafe-topic    PartitionCount:13    ReplicationFactor:3    Configs:
    Topic: my-failsafe-topic    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 1    Leader: 1    Replicas: 0,1,2    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 2    Leader: 1    Replicas: 1,2,0    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 3    Leader: 2    Replicas: 2,1,0    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 4    Leader: 2    Replicas: 0,2,1    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 5    Leader: 1    Replicas: 1,0,2    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 6    Leader: 2    Replicas: 2,0,1    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 7    Leader: 1    Replicas: 0,1,2    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 8    Leader: 1    Replicas: 1,2,0    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 9    Leader: 2    Replicas: 2,1,0    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 10    Leader: 2    Replicas: 0,2,1    Isr: 2,1
    Topic: my-failsafe-topic    Partition: 11    Leader: 1    Replicas: 1,0,2    Isr: 1,2
    Topic: my-failsafe-topic    Partition: 12    Leader: 2    Replicas: 2,0,1    Isr: 2,1

注意Kafka是如何在第二个和第三个Kafka Broker中传播领导权的。

演示Broker故障转移有效

让我们通过从生产者控制台发送另外两条消息来证明故障转移是有效的。然后注意到消费者是否仍然获得消息。

发送消息m15和m16。

生产者控制台-发送m15和m16

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-consumer-producer-replicated.sh

m1
...
m15
m16

请注意,消息均匀地分布在剩余的活动消费者之间。

第1个Kafka消费者荀m16

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-consumer-console-replicated.sh
m3
m5
m8
m9
m11
m14
...
m16

第2个Kafka消费者荀m15

$ cd ~/bigdata/kafka_2.12-2.4.1
$ ./start-consumer-console-replicated.sh
m2
m6
m10
m12
m13
...
m15

Kafka Broker故障转移工作正常!

Kafka集群故障转移常见问题

问:为什么三个消费者一开始没有负载共享消息?

答:他们一开始没有负载共享,因为他们属于不同的消费者组。每个消费者组都订阅一个主题,并在该主题中为每个分区维护自己的偏移量。

问:我们如何演示消费者故障转移?

答:我们关闭了一个消费者。然后我们发送了更多的信息。我们观察到Kafka向剩余的集群传播消息。

问:我们如何演示生产者故障转移?

答:我们没有演示。我们演示了Kafka Broker的故障转移,关闭一个Broker,然后使用生产者控制台再发送两个消息。然后我们看到生产者使用了剩余的Kafka Broker。然后,这些Kafka Broker将消息传递给活跃的消费者。

问:我们使用什么工具和选项来显示分区和ISR的所有权?

答:我们使用带有--describe选项的kafka-topics.sh。