Kafka - 08 Kafka Broker工作流程 | 节点服役 | 节点退役
创始人
2024-02-21 14:38:51
0

文章目录

      • 1. Kafka Broker 工作流程
      • 2. Kafka 节点服役
        • 1. 增加一个Kafka节点
        • 2. 执行负载均衡操作
      • 3. Kafka 节点退役

1. Kafka Broker 工作流程

在这里插入图片描述

Kafka上下线时Zookeeper中的数据变化:

在这里插入图片描述

[zk: localhost:2181(CONNECTED) 9] ls /
[zookeeper, kafka_cluster][zk: localhost:2181(CONNECTED) 10] ls /kafka_cluster
[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config][zk: localhost:2181(CONNECTED) 11] ls /kafka_cluster/brokers/ids
[0, 1, 2][zk: localhost:2181(CONNECTED) 12] get /kafka_cluster/controller
{"version":1,"brokerid":0,"timestamp":"1669535410717"}[zk: localhost:2181(CONNECTED) 13] get /kafka_cluster/brokers/topics/test2/partitions/0/state
{"controller_epoch":9,"leader":0,"version":1,"leader_epoch":0,"isr":[0,2,1]}

停止 kafka-01的 kafka节点,执行上面的3个步骤:

在这里插入图片描述

[zk: localhost:2181(CONNECTED) 14] ls /kafka_cluster
[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config][zk: localhost:2181(CONNECTED) 15] ls /kafka_cluster/brokers/ids
[1, 2][zk: localhost:2181(CONNECTED) 16] get /kafka_cluster/controller
{"version":1,"brokerid":2,"timestamp":"1669542009693"}[zk: localhost:2181(CONNECTED) 17] get /kafka_cluster/brokers/topics/test2/partitions/0/state
{"controller_epoch":9,"leader":2,"version":1,"leader_epoch":1,"isr":[2,1]}

启动 kafka-01的 kafka节点,执行上面的3个步骤:

在这里插入图片描述

[zk: localhost:2181(CONNECTED) 18] ls /kafka_cluster/brokers/ids
[0, 1, 2][zk: localhost:2181(CONNECTED) 19] get /kafka_cluster/controller
{"version":1,"brokerid":2,"timestamp":"1669542009693"}[zk: localhost:2181(CONNECTED) 20] get /kafka_cluster/brokers/topics/test2/partitions/0/state
{"controller_epoch":10,"leader":0,"version":1,"leader_epoch":2,"isr":[2,1,0]}

2. Kafka 节点服役

1. 增加一个Kafka节点

① 将hadoop-103虚拟机关机,右键-管理-克隆一个新的节点,节点名称 hadoop-104。

② 修改hadoop-104的ip地址:vi /etc/sysconfig/network-scripts/ifcfg-ens33

IPADDR=192.168.38.26

③ 修改 hostname 主机名称:vi /etc/hostname

hadoop104

④ 将 hostname 和 ip 绑定:vi /etc/hosts

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.38.23 hadoop101
192.168.38.24 hadoop102
192.168.38.25 hadoop103
192.168.38.26 hadoop104

⑤ 修改kafka的 server.properties 配置文件,主要关注以下几个配置:

broker.id=3
listeners=PLAINTEXT://192.168.38.26:9092
advertised.listeners=PLAINTEXT://192.168.38.26:9092
# 使用3台zookeeper集群即可 
zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster

⑥ 删除/opt/kafka/kafka_2.12-2.2.1安装目录下之前创建的log目录,并新建该目录:

[root@hadoop104 kafka_2.12-2.2.1]# rm -rf logs/
[root@hadoop104 kafka_2.12-2.2.1]# mkdir logs

⑦ 启动 hadoop-104节点的 kafka:

[root@hadoop104 kafka_2.12-2.2.1]# bin/kafka-server-start.sh config/server.properties

2. 执行负载均衡操作

① 我们创建过主题test2,查看下它的主题详情:

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic test2
Topic:test2     PartitionCount:3   ReplicationFactor:3   Configs:segment.bytes=1073741824
Topic: test2    Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 1,0,2
Topic: test2    Partition: 1    Leader: 1       Replicas: 2,1,0 Isr: 1,0,2
Topic: test2    Partition: 2    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2

可以看到以前创建的 test2 主题,仍然存储在 hadoop101、hadoop102、hadoop103 上,并没有存储在hadoop104 这个新的节点上,只有新创建的主题才会在 hadoop104 节点上,如何解决?

② 创建一个要均衡的主题:在集群中会有很多的主题,要指定对哪一个主题的存储数据进行负载均衡

[root@hadoop101 kafka_2.12-2.2.1]# vi topics-to-move.json
{"topics": [{"topic": "test2"}],"version": 1
}

如果有多个 topic 就添加在 topics 列表中。

③ 创建执行计划:

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
Missing required argument "[zookeeper]"

上面的命令报错缺少zookeeper参数,加上zookeeper连接集群地址,继续执行:

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]}Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}

Current partition replica assignment:当前分区副本分配

Proposed partition reassignment configuration:建议的分区重新分配配置,这个就是副本执行计划文件内容

④ 创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中):

[root@hadoop101 kafka_2.12-2.2.1]# vi increase-replication-factor.json
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}

⑤ 执行副本执行计划:

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]}Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

⑥ 验证副本存储计划:

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster  --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition test2-2 completed successfully
Reassignment of partition test2-1 completed successfully
Reassignment of partition test2-0 completed successfully
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic test2
Topic:test2  PartitionCount:3   ReplicationFactor:3    Configs:segment.bytes=1073741824
Topic: test2    Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 1,0,2
Topic: test2    Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
Topic: test2    Partition: 2    Leader: 2       Replicas: 2,3,0 Isr: 0,2,3

3. Kafka 节点退役

现在有 hadoop101、hadoop102、hadoop103、hadoop104 四个节点,将 hadoop104 从集群节点中退出,不能直接将 hadoop105 节点关闭,因为test2主题的数据已经有一部分存在 hadoop105 节点上了,那么如何退役呢?

先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。

① 创建一个要均衡的主题:

[root@hadoop101 kafka_2.12-2.2.1]# vi topics-to-move.json
{"topics": [{"topic": "test2"}],"version": 1
}

② 创建执行计划:节点服役时配置–broker-list “0,1,2,3”,退出服役时只需要配置–broker-list “0,1,2” 接口,把3删除,那么 test2 主题的数据就不会存在 broker.id=3 的 hadoop105 节点上了。

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[2,1,0],"log_dirs":["any","any","any"]}]}

③ 创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中):

[root@hadoop101 kafka_2.12-2.2.1]# vi increase-replication-factor.json
{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[2,1,0],"log_dirs":["any","any","any"]}]}

④ 执行副本存储计划:

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092  --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"test2","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"test2","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

⑤ 验证副本存储计划:

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092  --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka_cluster --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition test2-2 completed successfully
Reassignment of partition test2-1 completed successfully
Reassignment of partition test2-0 completed successfully
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic test2
Topic:test2  PartitionCount:3   ReplicationFactor:3     Configs:segment.bytes=1073741824
Topic: test2    Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 1,2,0
Topic: test2    Partition: 1    Leader: 1       Replicas: 0,2,1 Isr: 1,2,0
Topic: test2    Partition: 2    Leader: 2       Replicas: 1,0,2 Isr: 2,1,0

⑥ 执行停止命令,将 hadoop105 节点退役,在 hadoop105 上执行停止命令即可

[root@hadoop104 kafka_2.12-2.2.1]# bin/kafka-server-stop.sh

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【Ctfer训练计划】——(三... 作者名:Demo不是emo  主页面链接:主页传送门 创作初心ÿ...