spring boot 删除kafka没有消费者的消费组
背景: 服务使用k8s 部署,服务逻辑是广播消费,即每个服务(pod)都会起一个消费组
因为每个pod都是一样的,无法固定消费组。同时每次更新或者重启pod时消费组就会增多。
时间旧了消费组就会消息堆积,消费组也会越来越多。
在项目中启动一个定时任务删除无消费者的消费组
直接看代码
package org.osc.scan.codesearch.service.impl;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.SaslConfigs;
import org.osc.scan.codesearch.service.DeleteKafkaGroups;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Service
public class DeleteKafkaGroupsImpl implements DeleteKafkaGroups {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.properties.security.protocol}")
private String protocol;
@Value("${spring.kafka.properties.sasl.mechanism}")
private String saslMechanism;
@Value("${spring.kafka.properties.sasl.jaas.config}")
private String jaasConfig;
@Override
public void deleteGroupIds() {
Map<String, Object> props = new HashMap<>(1);
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, protocol);
props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
// 创建一个kafka的admin客户端,执行高级操作
AdminClient adminClient = AdminClient.create(props);
// 获取消费组
ListConsumerGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups();
try {
// 获取所有消费组
Collection<ConsumerGroupListing> consumerGroupListings = listConsumerGroupsResult.all().get();
// 遍历消费组
for (ConsumerGroupListing consumerGroupListing : consumerGroupListings) {
String groupId = consumerGroupListing.groupId();
// 根据消费组的前缀(避免删除其他服务正在使用的消费组) 和 消费组没有消费者
if (groupId.contains("LatchGroup-") && adminClient.describeConsumerGroups(Arrays.asList(groupId)).all().get().get(groupId).members().size() == 0){
log.info("找到要删除的groupID:{}", groupId);
// 执行删除操作
KafkaFuture<Void> resultFuture = adminClient.deleteConsumerGroups(Arrays.asList(groupId)).all();
try {
// 执行
resultFuture.get();
log.info("成功删除groupID:{}", groupId);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
adminClient.close();
}
}