KafkaAdminClient 诞生原因 :
Maven 依赖 :
org.apache.kafka kafka-clients 2.3.0
AdminClient 2.3 的 9 大类 :
AdminClient 是双线程
AdminClient 内大量用生产者 - 消费者模式 ,将请求生成与处理解耦
前端主线程创建请求对象实例 (Call) 的任务 :
后端 I/O 线程的工作 : 用 3 个队列来承载不同时期的请求对象 (新请求队列、待发送请求队列、处理中请求队列 )
import org.apache.kafka.clients.admin.AdminClient;Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:port");
props.put("request.timeout.ms", 600000);try (AdminClient client = AdminClient.create(props)) {// 执行你要做的操作……
}
String newTopicName = "test-topic";try (AdminClient client = AdminClient.create(props)) {// 主题名称、分区数、副本数NewTopic newTopic = new NewTopic(newTopicName, 10, (short) 3);// 结果以 Java Future 封装CreateTopicsResult result = client.createTopics(Arrays.asList(newTopic));// 获取执行结果result.all().get(10, TimeUnit.SECONDS);
}
String groupID = "test-group";
try (AdminClient client = AdminClient.create(props)) {// 获取指定消费者组的位移数据ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);// 按分区分组的位移数据Map offsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);System.out.println(offsets);
}
try (AdminClient client = AdminClient.create(props)) {// 获取Broker 所有分区主题的日志路径信息DescribeLogDirsResult ret = client.describeLogDirs(Collections.singletonList(targetBrokerId)); // 指定 Broker idlong size = 0L;for (Map logDirInfoMap : ret.all().get().values()) {// 累加 = 得出总磁盘占用量size += logDirInfoMap.values().stream().map(logDirInfo -> logDirInfo.replicaInfos).flatMap(topicPartitionReplicaInfoMap ->topicPartitionReplicaInfoMap.values().stream().map(replicaInfo -> replicaInfo.size)).mapToLong(Long::longValue).sum();}System.out.println(size);
}