spring boot 删除kafka没有消费者的消费组
背景: 服务使用k8s 部署,服务逻辑是广播消费,即每个服务(pod)都会起一个消费组
因为每个pod都是一样的,无法固定消费组。同时每次更新或者重启pod时消费组就会增多。
时间旧了消费组就会消息堆积,消费组也会越来越多。
在项目中启动一个定时任务删除无消费者的消费组
直接看代码
package org.osc.scan.codesearch.service.impl; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.ListConsumerGroupsResult; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.config.SaslConfigs; import org.osc.scan.codesearch.service.DeleteKafkaGroups; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Map; @Slf4j @Service public class DeleteKafkaGroupsImpl implements DeleteKafkaGroups { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.properties.security.protocol}") private String protocol; @Value("${spring.kafka.properties.sasl.mechanism}") private String saslMechanism; @Value("${spring.kafka.properties.sasl.jaas.config}") private String jaasConfig; @Override public void deleteGroupIds() { Map<String, Object> props = new HashMap<>(1); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers); props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, protocol); props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); // 创建一个kafka的admin客户端,执行高级操作 AdminClient adminClient = AdminClient.create(props); // 获取消费组 ListConsumerGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups(); try { // 获取所有消费组 Collection<ConsumerGroupListing> consumerGroupListings = listConsumerGroupsResult.all().get(); // 遍历消费组 for (ConsumerGroupListing consumerGroupListing : consumerGroupListings) { String groupId = consumerGroupListing.groupId(); // 根据消费组的前缀(避免删除其他服务正在使用的消费组) 和 消费组没有消费者 if (groupId.contains("LatchGroup-") && adminClient.describeConsumerGroups(Arrays.asList(groupId)).all().get().get(groupId).members().size() == 0){ log.info("找到要删除的groupID:{}", groupId); // 执行删除操作 KafkaFuture<Void> resultFuture = adminClient.deleteConsumerGroups(Arrays.asList(groupId)).all(); try { // 执行 resultFuture.get(); log.info("成功删除groupID:{}", groupId); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } } } catch (Exception e) { e.printStackTrace(); } adminClient.close(); } }