当前位置:首页 > java > 正文内容

spring boot 删除kafka没有消费者的消费组

root3个月前 (03-21)java1392

背景: 服务使用k8s 部署,服务逻辑是广播消费,即每个服务(pod)都会起一个消费组
因为每个pod都是一样的,无法固定消费组。同时每次更新或者重启pod时消费组就会增多。
时间旧了消费组就会消息堆积,消费组也会越来越多。

在项目中启动一个定时任务删除无消费者的消费组

直接看代码

package org.osc.scan.codesearch.service.impl;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.SaslConfigs;
import org.osc.scan.codesearch.service.DeleteKafkaGroups;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@Service
public class DeleteKafkaGroupsImpl implements DeleteKafkaGroups {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.properties.security.protocol}")
    private String protocol;

    @Value("${spring.kafka.properties.sasl.mechanism}")
    private String saslMechanism;

    @Value("${spring.kafka.properties.sasl.jaas.config}")
    private String jaasConfig;

    @Override
    public void deleteGroupIds() {

        Map<String, Object> props = new HashMap<>(1);
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, protocol);
        props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
        props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
        // 创建一个kafka的admin客户端,执行高级操作
        AdminClient adminClient = AdminClient.create(props);
        // 获取消费组
        ListConsumerGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups();
        try {
            // 获取所有消费组
            Collection<ConsumerGroupListing> consumerGroupListings = listConsumerGroupsResult.all().get();
            // 遍历消费组
            for (ConsumerGroupListing consumerGroupListing : consumerGroupListings) {
                String groupId = consumerGroupListing.groupId();
                // 根据消费组的前缀(避免删除其他服务正在使用的消费组) 和 消费组没有消费者
                if (groupId.contains("LatchGroup-") && adminClient.describeConsumerGroups(Arrays.asList(groupId)).all().get().get(groupId).members().size() == 0){
                    log.info("找到要删除的groupID:{}", groupId);
                    // 执行删除操作
                    KafkaFuture<Void> resultFuture = adminClient.deleteConsumerGroups(Arrays.asList(groupId)).all();
                    try {
                    // 执行
                        resultFuture.get();
                        log.info("成功删除groupID:{}", groupId);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        adminClient.close();


    }
}


扫描二维码推送至手机访问。

版权声明:本文由一叶知秋发布,如需转载请注明出处。

本文链接:http://zhiqiu.top/?id=258

分享给朋友:
返回列表

上一篇:java pom 引入的依赖中 标签

没有最新的文章了...

相关文章

spring程序开发步骤

1、导入Spring开发的基本包坐标2、编写Dao接口和实现类3、创建Spring核心配置文件4、在Spring配置文件中配置xxDaoImpl5、使用Spring的API获取Bean实例...

java mybatis 语法之 foreach 对列表的处理

<delete id="batchDeleteEmps" parameterType="int"> delete from emp wh...

java @Bean 注解

Spring的@Bean注解用于告诉方法,产生一个Bean对象,然后这个Bean对象交给Spring管理。产生这个Bean对象的方法Spring只会调用一次,随后这个Spring将会将这个Bean对象放在自己的IOC容器中。SpringIO...

java mybatis Parameter index out of range (5 > number of parameters, which is 4

java mybatis Parameter index out of range (5 > number of parameters, which is 4

该报错在修改mapper的xml之后出现的发现是因为注释的问题导致的在xml中注释已经要谨慎。...

java class 中的getField和getDeclaredField 通过字段名获取字段方法的区别

getField和getDeclaredField的区别这两个方法都是用于获取字段getField 只能获取public的,包括从父类继承来的字段。getDeclaredField 可以获取本类所有的字段,包括private的,但...

maven的简单使用

maven是什么?Maven简化和标准化项目建设过程。处理编译,分配,文档,团队协作和其他任务的无缝连接。 Maven增加可重用性并负责建立相关的任务。简单的说就是用来引入包的maven的目标:项目是可重复使用,易维护,更容易理解的一个综合...