当前位置: 首页 > 知识库问答 >
问题:

使用apache camel和Kafka时如何处理背压?

汪晨
2023-03-14

我正试图编写一个应用程序,将与Kafka集成使用骆驼。(版本-3.4.2)

我从这个问题的答案中借用了一种方法。

我有一条路线可以监听Kafka主题的信息。通过使用一个简单的执行器,该消息的处理与消耗是分离的。每个处理都作为任务提交给该执行者。消息的顺序并不重要,唯一需要考虑的因素是消息处理的速度和效率。我已禁用自动提交,并在任务提交给执行者后手动提交消息。丢失当前正在处理的消息(由于崩溃/关闭)是可以的,但Kafka中从未提交处理的消息不应丢失(由于提交了偏移量)。现在来回答问题,

  1. 如何有效地处理负载?例如,有1000条消息,但我一次只能并行处理100条

现在,我的解决方案是阻止消费者轮询线程,并尝试持续提交作业。但暂停投票会是一个更好的办法,但我找不到任何办法来实现这一点。


public static void main(String[] args) throws Exception {
        String consumerId = System.getProperty("consumerId", "1");
        ExecutorService executor = new ThreadPoolExecutor(100, 100, 0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<>());
        LOGGER.info("Consumer {} starting....", consumerId);

        Main main = new Main();
        main.init();

        CamelContext context = main.getCamelContext();
        ComponentsBuilderFactory.kafka().brokers("localhost:9092").metadataMaxAgeMs(120000).groupId("consumer")
                .autoOffsetReset("earliest").autoCommitEnable(false).allowManualCommit(true).maxPollRecords(100)
                .register(context, "kafka");

        ConsumerBean bean = new ConsumerBean();
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() {
                from("kafka:test").process(exchange -> {
                    LOGGER.info("Consumer {} - Exhange is {}", consumerId, exchange.getIn().getHeaders());
                    processTask(exchange);
                    commitOffset(exchange);
                });
            }

            private void processTask(Exchange exchange) throws InterruptedException {
                try {
                    executor.submit(() -> bean.execute(exchange.getIn().getBody(String.class)));
                } catch (Exception e) {
                    LOGGER.error("Exception occured {}", e.getMessage());
                    Thread.sleep(1000);
                    processTask(exchange);
                }
            }

            private void commitOffset(Exchange exchange) {
                boolean lastOne = exchange.getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class);
                if (lastOne) {
                    KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT,
                            KafkaManualCommit.class);
                    if (manual != null) {
                        LOGGER.info("manually committing the offset for batch");
                        manual.commitSync();
                    }
                } else {
                    LOGGER.info("NOT time to commit the offset yet");
                }
            }
        });

        main.run();
    }

共有1个答案

莫乐
2023-03-14

为此,可以使用throttleEIP。

from("your uri here")
.throttle(maxRequestCount)
.timePeriodMillis(inTimePeriodMs)
.to(yourProcessorUri)
.end()

请看这里的原始留档。

 类似资料:
  • 我这么说是因为来自avro的POJO一代不是那么直截了当的。在此基础上,它需要maven插件和一个.avsc文件。 例如,我在我的Kafka制作人上创建了一个POJO,名为User: 我连载它,并发送到我的用户主题在Kafka。然后我有一个消费者,它本身有一个POJO用户,并反序列化消息。是空间问题吗?这样序列化和反序列化不是也更快吗?更不用说维护模式注册表的开销了。

  • 我有一份flink的工作,从Kafka读取数据,执行某些聚合,并将结果写入elasticsearch索引。我看到震源上有很高的背压。高背压导致数据从Kafka缓慢读取,我看到数据在网络堆栈中排队(netstat RecvQ显示源Kafka连接中有上万字节的数据,数据最终被读取),这反过来会导致数据在延迟后沉入elasticsearch,并且延迟持续增加。 源每分钟产生约17500条记录,Flink

  • 这个问题似乎不是关于特定的编程问题、软件算法或主要由程序员使用的软件工具。如果您认为这个问题在另一个Stack Exchange网站上是主题,您可以留下评论来解释这个问题在哪里可以得到回答。 我们构建了一个定制的Kafka Connect sink,它反过来调用一个远程REST API。我如何将背压传播到Kafka Connect基础设施,以便在远程系统比内部使用者向put()传递消息慢的情况下,

  • 我有一个注册了回拨的服务,现在我想将其公开为,具有某些要求/限制: 接收回调的线程不应该被阻塞(工作应该交给观察者指定的不同线程/调度程序) 不应该有任何异常抛出由于消费者是慢下来流 多个消费者可以相互独立订阅 消费者可以选择缓冲所有的物品,这样它们就不会丢失,但是它们不应该在生产者类中被缓冲 以下是我目前的情况 我不确定这是否符合我的要求。在javadoc上有一条关于的注释,我不明白: 请注意,

  • 当使用int-kafka: out站通道适配器生成到kafka时,似乎没有可用的错误通道。在这种情况下,如何处理reties次后无法向kafka生成消息? 任何可能导致kafka失败的错误。(以下代码只是来自Internet的代码片段,只是想知道如何向其添加错误句柄)

  • 问题内容: 我试图用一个批量一些 KTable 值,并送他们。似乎30秒钟超出了使用者超时间隔,在此间隔之后,Kafka认为该使用者已失效并释放了分区。 我尝试提高 轮询 和 提交间隔 的频率来避免这种情况: 不幸的是,这些错误仍在发生: (很多) 其次是: 显然,我需要更频繁地将心跳发送回服务器。怎么样? 我的拓扑是: 该 KTable 是关键,每30秒分组值。在 Processor.init(