当前位置: 首页 > 知识库问答 >
问题:

当主题中没有更多记录时,如何在Kafka Consumer中刷新数据批处理

桂学
2023-03-14

考虑从主题接收数据的Kafka消费者,将其缓冲成PravaReDealStand,当100K记录被批处理时,它将插入查询向DB发出。

直到数据仍在输入之前,这种方法都能很好地工作。但是,例如,当缓冲了20K条记录并且没有更多的记录传入时,它仍然会等待更多的80K条记录,直到in刷新语句。但如果一段时间后停止运行,我想冲洗这些20K。我该怎么做?我看不出有什么办法能抓住它。

例如,在PHP中,使用基于库的php-rdkafka扩展,当分区结束时,我会得到RD_KAFKA_RESP_ERR__PARTITION_EOF,所以当这种情况发生时,很容易钩住缓冲区刷新。

我试图简化代码只剩下重要部分

public class TestConsumer {

    private final Connection connection;
    private final CountDownLatch shutdownLatch;
    private final KafkaConsumer<String, Message> consumer;
    private int processedCount = 0;

    public TestConsumer(Connection connection) {
        this.connection = connection;
        this.consumer = new KafkaConsumer<>(getConfig(), new StringDeserializer(), new ProtoDeserializer<>(Message.parser()));
        this.shutdownLatch = new CountDownLatch(1);
    }

    public void execute() {
        PreparedStatement statement;
        try {
            statement = getPreparedStatement();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            commit(statement);

            consumer.wakeup();
        }));

        consumer.subscribe(Collections.singletonList("source.topic"));

        try {
            while (true) {
                ConsumerRecords<String, Message> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));

                records.forEach(record -> {
                    Message message = record.value();
                    try {
                        fillBatch(statement, message);
                        statement.addBatch();
                    } catch (SQLException e) {
                        throw new RuntimeException(e);
                    }
                });

                processedCount += records.count();

                if (processedCount > 100000) {
                    commit(statement);
                }
            }
        } catch (WakeupException e) {
            // ignore, we're closing
        } finally {
            consumer.close();
            shutdownLatch.countDown();
        }
    }

    private void commit(PreparedStatement statement) {
        try {
            statement.executeBatch();
            consumer.commitSync();
            processedCount = 0;
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }


    protected void fillBatch(PreparedStatement statement, Message message) throws SQLException {
        try {
            statement.setTimestamp(1, new Timestamp(message.getTime() * 1000L));
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }

共有1个答案

祝叶五
2023-03-14

我理解你的问题是这样的:

>

  • 您想要使用来自Kafka的消息

    把它们堆在记忆里100K记录

    批量提交到数据库

    但您只想等待t秒(假设为10秒)

    这可以通过使用Kafka内置的消费者批处理以一种高效可靠的方式实现。。假设您可以以某种方式预测消息的平均大小(以字节为单位)。

    在Kafka消费者配置中,您可以设置以下内容:

    fetch。最小字节数=

    fetch。麦克斯,等等。ms=

    max.partition。取来字节=

    max.poll。记录=

    fetch。最大字节数=

    这样,您可以获得100K记录,如果它们符合定义的字节大小,但它将等待可配置的毫秒数。

    一旦民意调查返回记录,您可以一次保存并重复。

  •  类似资料:
    • 问题内容: 我需要在一个请求中进行多次更新。 在我有: 因此需要进行更改。 这是我的序列化器代码: 我试图添加: 和 但这不起作用。如何更改此代码以进行多次更新。我的json请求 问题答案: 这是您请求的CreateMixins或UpdateMixins的示例。 ======================查看========================== ====== ==========

    • 这是我正在接收的数据,如果阵列中不存在id,则希望添加新记录,如果阵列中没有id,则还希望更新记录 我的模型:课堂课程 课堂主题 类用户 这是通过PostMan从Rails API接收的数组[{"title":"Topic Name","path_url":"https://resource-asws-path-url" }, { "id": 2311,"title":"Topic Name","

    • 如何刷新__consumer_offsets主题? 我刚刚设置了offsets.retention.minutes=1,重新启动了代理,检查了offsets.retention.minutes=1的日志,但是__consumer_offsets主题的50个分区的大小仍然相同。 为什么?

    • 问题内容: 有一个基于任何数组的。数组中的元素可能会更改。如何获得角度控制器来刷新阵列? module.js index.html 从控制台: 如何刷新以使其中的所有元素都是选项? 问题答案: Angular使用watchers,并且只有在摘要循环开始后才会更新UI 。 通常,您将通过UI中的某个事件或调用$ http服务 将数组添加到数组中,而这些将为您启动$ digest() 。 由于您只是直

    • 问题内容: 现在,我在框架中有一个中央模块,该模块使用模块产生多个进程。由于使用,因此存在模块级的多处理感知日志。根据文档,此记录器具有进程共享的锁,因此你不会通过同时写入多个进程来乱码内容(或任何文件句柄)。 我现在遇到的问题是框架中的其他模块不支持多处理。以我的方式看,我需要使这个中央模块上的所有依赖项都使用支持多处理的日志记录。在框架内这很烦人,更不用说框架的所有客户了。有我没有想到的替代方

    • 我的系统有三个数据源,都以名为datasourceA、datasourceB、datasourceC的bean的形式公开。我试图将spring batch的数据源设置为datasourceB,但我遇到了一些问题。 我的Spring班 在这个设置中,我在启动时会遇到这个错误 我无法将我的任何数据源设置为@Prime,因为我的Spring批处理编写器使用所有3个数据源进行读取和写入。我正在使用JPA存