当前位置: 首页 > 知识库问答 >
问题:

Kafka streams联接中的RecordTooLargeException

宇文迪
2023-03-14

我有一个KStream x KStream连接,它正在中断,但有以下例外。

Exception in thread “my-clicks-and-recs-join-streams-4c903fb1-5938-4919-9c56-2c8043b86986-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_15, processor=KSTREAM-SOURCE-0000000001, topic=my_outgoing_recs_prod, partition=15, offset=9248896
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_15] exception caught when producing
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87)
    at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
    at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:59)
    at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:105)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:107)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:100)
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:64)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)
    ... 3 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

尽管如此,我尝试将属性设置为config.put(“max.request.size”,“31457280”);,即30MB。我不希望推荐记录超过这个限制。但是,代码还是崩溃了。

我不能更改Kafka集群中的配置,但如果需要,我可以更改Kafka中相关主题的属性。

有人能建议我还能试试什么吗?

如果什么都不起作用,我愿意忽略这样的超大信息。但是,我不知道处理这个RecordToolargeException的方法。

执行连接的代码如下所示。

Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, JOINER_ID + "-" + System.getenv("HOSTNAME"));
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
config.put("max.request.size", "314572800");
config.put("message.max.bytes", "314572800");
config.put("max.message.bytes", "314572800");


KStreamBuilder builder = new KStreamBuilder();

KStream<String, byte[]> clicksStream = builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Serdes.String(), Serdes.ByteArray(), clicksTopic);
KStream<String, byte[]> recsStream = builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Serdes.String(), Serdes.ByteArray(), recsTopic);

KStream<String, ClickRec> join = clicksStream.join(
        recsStream,
        (click, recs) -> new ClickRec(click, recs),
        JoinWindows.of(windowMillis).until(3*windowMillis));

join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);

KafkaStreams streams = new KafkaStreams(builder, config);
streams.cleanUp();
streams.start();

clickrec是连接的对象(它远小于recomment对象,我不希望它大于几KBs)。

共有1个答案

栾越
2023-03-14

在不同级别上有多个配置:

  1. 您有一个代理设置message.max.bytes(默认值为1000012)(参见http://kafka.apache.org/documentation/#brokerconfigs)
  2. 有一个主题级配置max.message.bytes(默认值为1000012)(cf http://kafka.apache.org/documentation/#topicconfigs)
  3. 生产者具有max.request.size(默认值为1048576)(参见http://kafka.apache.org/documentation/#producerconfigs)

堆栈跟踪指示您需要在代理或主题级别更改设置:

为什么首先需要这个:

当您执行KStream-KStream联接时,join运算符建立状态(它必须缓冲两个流中的记录,以便计算联接)。状态默认由Kafaka主题支持--本地状态基本上是一个缓存,而Kafaka主题是真理的来源。因此,您的所有记录都将写入Kafka Streams自动创建的“变更日志主题”。

 类似资料:
  • 我有以下mysql查询: 我已经编辑了我的问题,以添加我正在使用的两个实体。第一个实体包含频率,我想要加入从另一个实体选择的项目。TFrequency表有一个不变的常量值。这个频率使用ID映射到tEXCELSMSTOSENDSchedule实体。但是,我希望从tfrequence中选择name,而不是映射的id。然后我用它来填充我的数据表。我需要创建第三个实体吗?我不知道这是如何工作的,我一直在尝

  • 问题内容: 在我的办公室里,关于sql联接中联接列的顺序正在进行很大的讨论。我很难解释它,所以我只介绍两个sql语句。考虑到sql最佳实践,哪个更好? 或者 因此,在联接的ON部分中,是否编写是否重要? 问题答案: 此处的最佳实践是选择一个并 在团队中 坚持使用。就我个人而言,我更喜欢它,因为它对我来说似乎更干净。

  • 问题内容: 这个问题已经在这里有了答案 : 在同一网页上显示两个表的数据 (1个答案) 7年前关闭。 我在CodeIgniter中使用联接查询,但无法使其正常工作。它仅显示一个表数据,而不显示其他表数据。我是CodeIgniter的新手,无法解决此问题。请别人帮我。提前谢谢。 看法 控制器 模型 编辑 的结果 是一个如下数组: 表结构 证书 cid(PRIMARY),姓名,second_name,

  • 我需要从数据库中选择具有活动地址的公司(address.address_status_id=1)。如果地址不活动,则地址列应包含NULL。 有没有一种方法可以重新表述SQL语句,使它可以用JPA/QueryDSL表示? 编辑:我们将使用Oracle DB和Hibernate JPA provider(如果它能起作用的话)。

  • 问题内容: 我知道SQL Server中的联接。 例如。有两个表Table1,Table2。 它们的表结构如下。 表1的数据如下: 表2数据如下: 如果我执行下面提到的两个SQL语句,则两个输出将是相同的 请在上述SQL语句中说明左右联接之间的区别。 问题答案: Select * from Table1 left join Table2 … 和 确实是完全可以互换的。但是,请尝试(或同一对)进行区

  • 问题内容: 假设我们有下表t1和t2: 我们希望找到以下结果: 这基本上是右连接与左连接的并集。以下代码有效,但感觉很笨拙: 有没有更好的方法来实现这一目标? 问题答案: