当前位置: 首页 > 知识库问答 >
问题:

有没有办法根据大小刷新Kafka Streams的Windows Store?

楚嘉
2023-03-14

我正在使用将从kafka主题读取元组的DSL应用编程接口编写Kafka Streams应用程序。在拓扑结构中,我想批处理元组。然后如果(1)30秒已经过去或(2)批处理的大小超过1 GB,我想将浴写入磁盘上的文件。

我写的拓扑使用TimeWindowedKStream对元组进行分组。然后调用聚合并传递窗口存储。

我的问题是,当国有商店试图写入Kafka变更日志时,我得到了一个

组织。阿帕奇。Kafka。常见的错误。RecordTooLargeException

例外。

特别地:

原因:org.apache.kafka.streams.errors.StreamsExctive:任务[1_1]由于用以前的记录捕获错误(key\x00\x00\x00\x06\x00\x00\x01h$\xE7\x88\x00\x00\x00\x00\x00\x00值[B@419761c时间戳1546807396524)主题ibv2-capt-消费者-组-3-记录-存储-变更日志由于org.apache.kafka.common.errors.RecordTooLargeExctive:请求包含的消息大于服务器将接受的最大消息大小。.

我尝试将缓存\u最大字节\u缓冲\u配置设置为1MB,但正如文档中所述,该配置适用于整个拓扑。

这是我的拓扑结构

这是我一直在使用的Scala代码。注意,我在这里使用的是kafka streams scala。

val builder = new StreamsBuilderS()

import com.lightbend.kafka.scala.streams.DefaultSerdes._

implicit val recordSerde = (new RecordSerde).asInstanceOf[Serde[Record]]
implicit val recordSeqSerde = (new RecordSeqSerde).asInstanceOf[Serde[RecordSeq]]

val inputStream: KStreamS[String, Record] = builder.stream[String,Record](topic)

val keyed = inputStream.selectKey[Int]((k,r) => random.nextInt(10)) 

val grouped: TimeWindowedKStreamS[Int, Record] = keyed.groupByKey.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30L)))

import org.apache.kafka.common.utils.Bytes

val windowedStore: Materialized[Int, RecordSeq, WindowStore[Bytes, Array[Byte]]] = Materialized
  .as[Int,RecordSeq,WindowStore[Bytes, Array[Byte]]]("record-store")
  .withKeySerde(integerSerde)
  .withValueSerde(recordSeqSerde)
  .withLoggingEnabled(ChangeLogConfig.getChangeLogConfig.asJava)  // increased max.request.size to 10 x default

val records: KTableS[Windowed[Int], RecordSeq] = grouped.aggregate(
  () => RecordSeq(Seq()),
  (randon: Int, record: Record, recordSeq: RecordSeq) => RecordSeq(recordSeq.records :+ record),
  windowedStore
)

val recordSeqStream: KStreamS[String, RecordSeq] = records.toStream((ws, r) => s"${ws.key()}-${ws.window().start()}-${ws.window().end()}")

recordSeqStream.foreach((k: String, rs: RecordSeq) => WrappedRecordFileWriter.write(k, rs))

注意:case类RecordSeq(记录:Seq[记录])


共有1个答案

羊舌兴文
2023-03-14

主题可以有消息中定义的最大大小的记录。最大字节数属性。这是代理可以在主题中接收和附加的最大消息大小。您的记录大小可能超过该限制。因此,您需要更改此属性的配置,以允许更大的记录大小。

它可以在代理级别和主题级别设置。您可以在这里参考更多详细信息:

http://kafka.apache.org/documentation/#brokerconfigs

http://kafka.apache.org/documentation/#topicconfigs

 类似资料:
  • 我有下一个树状图代码 而这个输出 如何更改每个子情节的大小?

  • 我想知道我是否可以设置div比率响应像一个图像在CSS。例如,我有一个图像(800px x400px),并将css设置为宽度:100%。当我使用桌面屏幕(widht:1400px)时,图像将自动调整大小,得到屏幕全宽=>图像大小(1400px x 700px)。但是元素,我必须同时设置宽度和高度有办法设置空宽度800px,高度400px,它将根据屏幕大小自动调整大小,并且它仍然像图像一样保持比例。

  • 问题内容: 我正在制作带有Netflix徽标的pure徽标,但遇到问题了。我创建了span元素,也创建了span 2的元素。但是,当与.netflix span:nth-​​进行比较时,我不知道如何将.netflix span:nth-​​child(2)放在z轴上。 child(2):之前。基本上,我的问题是,有什么方法可以将元素放置在z轴上,而不是根据它们的父元素而不是我们想要的任何元素。在此

  • 我希望从C中的浮点数组中计算中值: FloatArray包含一个常规的C浮点数组。 我正在使用,但想知道是否有像这样的工具可以处理数据?现在,我正在制作一个副本,然后在扔掉副本之前执行。如果数据没有像这样的东西,是否有更有效的方法使用复制步骤来计算信息,从而避免潜在的额外O(n)循环?也许性能影响可以忽略不计?我的数组大小可能在20亿量级。

  • 我有一个数据框,比如说一些投资数据。我需要根据某些条件(比如说,U类型)从这个数据帧中提取数据。有许多可用的基金类型,我只需要提取与特定基金类型匹配的数据。 funding_type有风险、种子、天使、股权等价值。我只需要数据匹配资金类型比如种子和天使 我试着跟着 这里MF1是我的数据帧。这将提供与种子基金类型相关的所有数据 我需要的条件有点像 MF1[MF1['funding_round_typ

  • 我刚刚开始使用Jetpack作曲工具包。我添加了和一个设置 作为参数,它工作得非常好。但是每次我添加或更改某些内容时,它本身都不会自动刷新,因此我必须手动执行此操作。