KafkaAdminClient
创始人
2024-06-02 03:42:59
0

KafkaAdminClient

  • Admin 原理
  • 构造/销毁 AdminClient
  • 创建主题
  • 查询消费者组位移
  • 获取 Broker 磁盘占用

KafkaAdminClient 诞生原因 :

  • 命令行运维监控困难
  • 很多命令行脚本会连接 ZK (会跳过 Kafka 权限检查)
  • 统一服务端/客户端的运维机制

Maven 依赖 :

org.apache.kafkakafka-clients2.3.0

AdminClient 2.3 的 9 大类 :

  • 主题管理:主题的创建、删除、查询
  • 权限管理:具体权限的配置与删除
  • 配置参数管理:Kafka 各种资源的参数设置、详情查询。Kafka 资源 : Broker、主题、用户、Client-id
  • 副本日志管理:副本底层日志路径的变更和详情查询
  • 分区管理:创建额外的主题分区
  • 消息删除:删除指定位移之前的分区消息
  • Delegation Token 管理:Delegation Token 的创建、更新、过期、详情查询
  • 消费者组管理:消费者组的查询、位移查询、删除
  • Preferred 领导者选举:指定主题分区的 Preferred Broker 为领导者

Admin 原理

AdminClient 是双线程

  • 前端主线程 : 将用户操作转成对应的请求,再将请求发送到后端 I/O 线程的队列中
  • 后端 I/O 线程 : 从队列中读取相应的请求,然后发送到对应的 Broker 节点上,之后把执行结果保存起来,等待前端线程的获取

AdminClient 内大量用生产者 - 消费者模式 ,将请求生成与处理解耦

在这里插入图片描述

前端主线程创建请求对象实例 (Call) 的任务 :

  • 构建对应的请求对象 : 如 : 创建主题,就创建 CreateTopicsRequest;查询消费者组位移,就创建 OffsetFetchRequest
  • 指定响应的回调逻辑 : 如 : 从 Broker 接收 CreateTopicsResponse 后要执行的动作。创建好 Call ,前端主线程把它放到新请求队列(New Call Queue)中,前端主线程就完成了。只要等待结果返回

后端 I/O 线程的工作 : 用 3 个队列来承载不同时期的请求对象 (新请求队列、待发送请求队列、处理中请求队列 )

  • 新请求队列的线程安全 : 由 Java 的 monitor 锁保证。为了确保前端主线程不会因为 monitor 锁被阻塞,后端 I/O 线程定期将新请求队列中的所有 Call 全部s送到待发送请求队列中进行处理
  • 待发送请求队列和处理中请求队列 : 由后端 I/O 线程处理,因此无需任何锁机制来保证线程安全

构造/销毁 AdminClient

import org.apache.kafka.clients.admin.AdminClient;Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:port");
props.put("request.timeout.ms", 600000);try (AdminClient client = AdminClient.create(props)) {// 执行你要做的操作……
}

创建主题

String newTopicName = "test-topic";try (AdminClient client = AdminClient.create(props)) {// 主题名称、分区数、副本数NewTopic newTopic = new NewTopic(newTopicName, 10, (short) 3);// 结果以 Java Future 封装CreateTopicsResult result = client.createTopics(Arrays.asList(newTopic));// 获取执行结果result.all().get(10, TimeUnit.SECONDS);
}

查询消费者组位移

String groupID = "test-group";
try (AdminClient client = AdminClient.create(props)) {// 获取指定消费者组的位移数据ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);// 按分区分组的位移数据Map offsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);System.out.println(offsets);
}

获取 Broker 磁盘占用

try (AdminClient client = AdminClient.create(props)) {// 获取Broker 所有分区主题的日志路径信息DescribeLogDirsResult ret = client.describeLogDirs(Collections.singletonList(targetBrokerId)); // 指定 Broker idlong size = 0L;for (Map logDirInfoMap : ret.all().get().values()) {// 累加 = 得出总磁盘占用量size += logDirInfoMap.values().stream().map(logDirInfo -> logDirInfo.replicaInfos).flatMap(topicPartitionReplicaInfoMap ->topicPartitionReplicaInfoMap.values().stream().map(replicaInfo -> replicaInfo.size)).mapToLong(Long::longValue).sum();}System.out.println(size);
}

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【Ctfer训练计划】——(三... 作者名:Demo不是emo  主页面链接:主页传送门 创作初心ÿ...