kafka删除topic消息的四种方式[通俗易懂]

kafka删除topic消息的四种方式[通俗易懂]方法一:快速配置删除法(简单粗暴,如果这个主题有程序还在消费都,此时KAFKA就gameover)1.kafka启动之前,在server.properties配置delete.topic.enable=true2.执行命令bin/kafka-topics.sh–delete–topictest–zookeeperzk:2181或者使用kafka-manager集群管理工具删除注意:如果kafka启动之前没有配置delete.topic.enable=true,topic只会标记

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

方法一:快速配置删除法(简单粗暴,如果这个主题有程序还在消费者,此时KAFKA就game over)

1.kafka启动之前,在server.properties配置delete.topic.enable=true

2.执行命令bin/kafka-topics.sh –delete –topic test –zookeeper zk:2181或者使用kafka-manager集群管理工具删除

注意:如果kafka启动之前没有配置delete.topic.enable=true,topic只会标记为marked for deletion,加上配置,重启kafka,之前的topick就真正删除了。

 

方法二:设置删除策略(简单粗暴,如果这个消息有程序还在消费者,此时KAFKA就game over)

1.kafka启动之前,在server.properties配置

#日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.cleanup.policy = delete

# 注意:下面有两种配置,一种是基于时间的策略,另种是基于日志文件大小的策略,两种策略同是配置的话,只要满足其中种策略,则触发Log删除的操作。删除操作总是先删除最旧的日志
# 消息在Kafka中保存的时间,168小时之前的1og, 可以被删除掉,根据policy处理数据。
log.retention.hours=4

# 当剩余空间低于log.retention.bytes字节,则开始删除1og
log.retention.bytes=37580963840

# 每隔300000ms, logcleaner线程将检查一次,看是否符合上述保留策略的消息可以被删除
log.retention.check.interval.ms=1000

 

方法三:手动删除法(不推荐)(简单粗暴,如果这个消息有程序还在消费者,此时KAFKA就game over)

前提:不允许更改server.properties配置

1.删除zk下面topic(test)

启动bin/zkCli.sh
ls /brokers/topics
rmr /brokers/topics/test
ls /brokers/topics
查topic是否删除:bin/kafka-topics.sh –list –zookeeper zk:2181

2.删除各broker下topic数据,默认目录为/tmp/kafka-logs  

 

方法四:偏移量(看起来你最友好,会程序的你推荐)

package com.censoft.kafkaAdmin;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;

import java.sql.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @author zy Zhang
 * @version : 1.0
 * @Description
 * @since 2020/7/13 16:02
 */
public class DeleteReordsByOffset {
    public static void main(String[] args) throws ClassNotFoundException {
        // 1.创建kafkaAdminClient
        Properties properties = new Properties();
        properties.put("bootstrap.servers","192.168.27.111:9092");
        AdminClient kafkaAdminClient = KafkaAdminClient.create(properties);
        // 2.从数据库获取需要删除的消息
        Class.forName("com.mysql.jdbc.Driver");
        Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
        String url  = "jdbc:mysql://localhost:3306/test?useSSL=false&amp;useUnicode=true&amp;characterEncoding=UTF-8";
        String user = "root";
        String password = "123456";
        Connection conn = null;
        Statement statement = null;
        ResultSet res = null;
        String sql = "SELECT Topic, KafkaPartition, UntilOffset FROM Kafka_Offset;";
        try {
            conn = DriverManager.getConnection(url, user, password);
            statement = conn.createStatement();
            res = statement.executeQuery(sql);
            if (res != null) {
                while (res.next()) {
                    String topic = res.getString("Topic");
                    Integer partition = res.getInt("KafkaPartition");
                    Long offset = res.getLong("UntilOffset");
                    TopicPartition topicPartition = new TopicPartition(topic, partition);
                    RecordsToDelete recordsToDelete1 = RecordsToDelete.beforeOffset(offset);
                    recordsToDelete.put(topicPartition, recordsToDelete1);
                }
            }
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
        // 3.执行删除
        DeleteRecordsResult result = kafkaAdminClient.deleteRecords(recordsToDelete);
        Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = result.lowWatermarks();
        try {
            for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> entry : lowWatermarks.entrySet()) {
                System.out.println(entry.getKey().topic() + " " + entry.getKey().partition() + " " + entry.getValue().get().lowWatermark());
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        kafkaAdminClient.close();
    }
}

2020-11-27 补充说明:

		目前发现通过这种方法起到的效果是:
		topic的起始偏移量会被定位到传入的recordsToDelete指定的位置
		但是并没有将磁盘中存储的数据删除
		如果我找到在磁盘删除的方法会继续更新,看下面

2020-11-30 补充说明:
我重新进行了测试,发现使用kafka-delete-records.sh或者kafkaAdminClient.deleteRecords()方法还有其他约束条件:
首先就是log文件自身有大小设置,对应配置文件中log.segment.bytes,在没有达到这个大小的时候是不会创建下一个log文件的。

eg: test-0下有三个log文件
	00000000000000000000.log, 00000000000000000036.log, 00000000000000000136.log
我们修改起始偏移量=100
	那么deleteLogStartOffsetBreachedSegments运行时会判定00000000000000000000.log是可以删除的

在原先测试时,log.segment.bytes=1G了,这造成了很难观测到数据从硬盘删除
本次测试,我将log.segment.bytes修改为了1M

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/181323.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • Tomcat配置SSL证书(PFX证书)

    Tomcat配置SSL证书(PFX证书)Symantec提供免费版SSL,可快速免费申请一、什么是SSL(证书)?    SSL证书服务(AlibabaCloudSSLCertificatesService)由阿里云联合多家国内外数字证书管理和颁发的权威机构、在阿里云平台上直接提供的服务器数字证书。您可以在阿里云平台上直接购买、或者免费获取所需类型的数字证书,并一键部署在阿里云…

    2022年5月2日
    47
  • 单片机控制步进电机程序c语言正反转停止,51单片机控制步进电机正反转并实现调速的程序设计…

    单片机控制步进电机程序c语言正反转停止,51单片机控制步进电机正反转并实现调速的程序设计…这是一款51单片机控制步进电机正反转的程序,同时还能实现调速。#include”reg51.h“#include“intrins.h”#defineucharunsignedchar#defineuintunsignedint#definedelayNOP();{_nop_();_nop_();_nop_();_nop_();};unsignedcharcodeFFW[8]…

    2022年5月15日
    43
  • matlab图像处理基础「建议收藏」

    matlab图像处理基础「建议收藏」1、MATLAB中图象数据的读取A、imread   imread函数用于读入各种图象文件,其一般的用法为          [X,MAP]=imread(‘filename’,‘fmt’)其中,X,MAP分别为读出的图象数据和颜色表数据,fmt为图象的格式,filename为读取的图象文件(可以加上文件的路径)。例:[X,MAP]=imread(’flowers.t

    2022年10月4日
    0
  • scrapy框架中ROBOTSTXT_OBEY = True的说明

    scrapy框架中ROBOTSTXT_OBEY = True的说明在scrapy中创建项目以后,在settings文件中有这样的一条默认开启的语句:#Obeyrobots.txtrulesROBOTSTXT_OBEY=True默认为True,就是要遵守robots.txt的规则,那么robots.txt是个啥?通俗来说,robots.txt是遵循Robot协议的一个文件,它保存在网站的服务器中,它的作用是,告诉搜索引擎爬虫,本网站哪些目…

    2022年4月28日
    37
  • java创建线程池的四种方式_线程池对象的创建方式

    java创建线程池的四种方式_线程池对象的创建方式Java通过Executors提供四种线程池,分别为:newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。newFixedThreadPool创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。newScheduledThreadPool创建一个定长线程池,支持定时及周期性任务执行。newSingl…

    2022年9月27日
    0
  • 多模态融合注记_超融合泛用

    多模态融合注记_超融合泛用多模态机器学习MultiModalMachineLearning(MMML),旨在通过机器学习并处理理解多种模态信息。包括多模态表示学习MultimodalRepresentation,模态转

    2022年8月5日
    3

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号