当前位置: 首页 > 知识库问答 >
问题:

Kafka

袁文景
2023-03-14

我无法批量阅读Kafka骆驼消费者,尽管遵循了这里发布的一个例子。我需要对我的生产者进行更改,还是我的消费者配置最有可能出现问题?

所讨论的应用程序利用kafka camel组件接收来自restendpoint的消息,验证它们,并将它们放在主题上。然后,我有一个单独的服务,从主题中使用它们,并将它们保存在时间序列数据库中。

消息是一次一个地产生和消费的,但是数据库希望消息是批量消费和提交的,以获得最佳性能。在不接触生产者的情况下,我尝试调整消费者以匹配这个问题答案中的示例:

如何从骆驼进行交易投票Kafka?

我不确定消息会如何显示,所以现在我只是记录它们:

    from(kafkaReadingConsumerEndpoint).routeId("rawReadingsConsumer").process(exchange -> {
        // simple approach to generating errors
        String body = exchange.getIn().getBody(String.class);
        if (body.startsWith("error")) {
            throw new RuntimeException("can't handle the message");
        }
        log.info("BODY:{}", body);
    }).process(kafkaOffsetManager);

但这些消息似乎仍然是一次只收到一条,而没有批量读取。

我的消费者配置如下:

  kafka:
    host: myhost
    port: myport
    consumer:
      seekTo: beginning
      maxPartitionFetchBytes: 55000
      maxPollRecords: 50
      consumerCount: 1
      autoOffsetReset: earliest
      autoCommitEnable: false
      allowManualCommit: true
      breakOnFirstError: true

我的配置是否需要工作,或者是否需要对制作人进行更改才能正确工作?

共有1个答案

井轶
2023-03-14

在最低层,Kafka消费者#投票方法将返回一个迭代

我对Camel没有深入的经验,但为了获得“一批”记录,您需要一些中间收集来“排队”数据,最终将这些数据发送到下游的某个“收集消费者”流程。然后你需要一些“开关”处理器,上面写着“等待,处理这批”或“继续填充这批”。

就数据库而言,这个过程正是Kafka Connect JDBC Sink对batch.sizeconfig所做的。

 类似资料:
  • 问题内容: 我已经学习apache kafka一个月了。但是,我现在陷入了困境。我的用例是,我有两个或多个在不同计算机上运行的使用者进程。我进行了一些测试,在其中我在kafka服务器中发布了10,000条消息。然后,在处理这些消息时,我杀死了一个使用者进程并重新启动了它。消费者正在将处理后的消息写入文件中。因此,使用结束后,文件显示了超过1万条消息。因此,某些消息是重复的。 在使用者过程中,我已禁

  • 问题内容: 我正在尝试使用Avro来读取和写入Kafka的邮件。有没有人有使用Avro二进制编码器对将放入消息队列中的数据进行编码/解码的示例? 我需要的是Avro而不是Kafka。或者,也许我应该考虑其他解决方案?基本上,我试图在空间方面找到一种更有效的JSON解决方案。刚刚提到了Avro,因为它可以比JSON紧凑。 问题答案: 我终于想起要询问Kafka邮件列表,并得到以下答复,效果很好。 是

  • 问题内容: 想要使用高级消费者API实现延迟的消费者 大意: 按键生成消息(每个消息包含创建时间戳记),以确保每个分区按生成时间对消息进行排序。 auto.commit.enable = false(将在每个消息处理之后显式提交) 消费一条消息 检查消息时间戳,并检查是否经过了足够的时间 处理消息(此操作将永不失败) 提交1个偏移 有关此实现的一些担忧: 提交每个偏移量可能会使ZK变慢 Consu

  • 问题内容: 我编写了一个单一的Kafka使用者(使用Spring Kafka),该使用者从单个主题中读取内容,并且是使用者组的一部分。消耗完一条消息后,它将执行所有下游操作,并移至下一个消息偏移。我将其打包为WAR文件,并且我的部署管道将其推送到单个实例。使用部署管道,我可以将该工件部署到部署池中的多个实例。 但是,当我希望多个消费者作为基础架构的一部分时,我无法理解以下内容: 实际上,我可以在部

  • 问题内容: 在搜索如何通过API创建Kafka主题时,我在Scala中找到了以下示例: 最后一个arg 显然是Scala对象。我不清楚如何使该示例在Java中工作。 这篇文章如何在Clojure中创建Scala对象的问题在Clojure中提出了相同的问题,答案是: 我认为Java中的翻译成: 但是,当我尝试使用该方法(或其他任何数量的变体)时,它们都无法编译。 编译错误是: 我正在使用kafka_

  • 问题内容: 我有一个kafka stream应用程序,等待有关topic的记录被发布。它将接收json数据,并根据我想将该流推送到不同主题的键的值来确定。 这是我的流应用程序代码: 在此代码中,我要检查操作类型,然后根据需要将流推送到相关主题中。 我该如何实现? 编辑: 我已将代码更新为: 问题答案: 您可以使用方法来拆分流。此方法使用谓词将源流分成几个流。 以下代码取自kafka-streams

  • 问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp

  • 问题内容: 我正在使用Java 编写使用者。我想保持消息的实时性,因此,如果有太多消息在等待使用,例如1000条或更多,我应该放弃未使用的消息,并从最后一个偏移量开始使用。 对于此问题,我尝试比较主题的最后提交的偏移量和主题的结束偏移量(仅1个分区),如果这两个偏移量之间的差大于某个值,则将主题的最后提交的偏移量设置为下一个偏移量,这样我就可以放弃那些多余的消息。 现在我的问题是如何获得主题的最终