当前位置: 首页 > 知识库问答 >
问题:

Kafka StreamsExcture TimeoutExcture:正在物化KTable的N条过期记录

房泉
2023-03-14

我的流应用程序只是从一个记录主题中物化一个KTable。与100K记录在主题,没有问题。但是,由于主题中15M记录,一旦我们得到几百万条记录,实例将崩溃,出现以下异常:

Thread“公司-de1f21f9-b445-449e-a59b-5e0cecfa54d1-StreamThread-1”组织中的例外情况。阿帕奇。Kafka。溪流。错误。StreamsException:任务[0_0]中止发送,因为以前的记录(时间戳1601327726515)捕获到错误,因此发送到主题公司。由于组织原因,读取变更日志。阿帕奇。Kafka。常见的错误。TimeoutException:公司的40条记录过期。read-changelog-0:120001 ms自批创建以来已通过]

下面是我们正在运行的服务的详细示例的要点。

令我不解的是,使我的streams应用程序崩溃的错误(如下)是引用了一个过载的生产者,然而,这个服务只是实现了一个KTable。

java prettyprint-override">streamsBuilder
  .stream(egressTopic, Consumed.with(Serdes.String(), companySerde))
  .toTable(Materialized.<String, Company, KeyValueStore<Bytes, byte[]>>as(companyKTableName)
    .withKeySerde(Serdes.String())
    .withValueSerde(companySerde));

我已经调整过的属性试图让它名义上运行:

  • batch.size10000
  • linger.ms1000
  • request.timeout.ms300000
  • max.block.ms300000
  • retry.backoff.ms1000
  • replication.factor3

共有1个答案

慕容恩
2023-03-14

每个表都有一个用于容错的changelog主题作为支持。因此,每次写入KTable也是写入相应的变更日志主题。

如果您输入的主题配置了日志压缩,您可以重写您的程序

streamsBuilder.table(
    egressTopic,
    Materialized.<String, Company, KeyValueStore<Bytes, byte[]>>as(companyKTableName)
        .withKeySerde(Serdes.String())
        .withValueSerde(companySerde)
);

此外,还可以启用拓扑。optimization=“all”:在这种情况下,输入主题将被重新用作变更日志以恢复状态,并且不会创建其他变更日志主题。

 类似资料:
  • 如何识别主题的KTable物化何时完成? 例如,假设KTable只有几百万行。下面的伪代码: 在某个时间点,我想安排一个线程来调用以下内容,该内容写入主题:kt.toStream().to(“output_topic_name”); 跟进问题: 约束 1)好的,我看到kstream和ktable在kafkastream启动后是无界/无限的。但是,ktable物化(压缩主题)不会在指定的时间段内为同

  • 2018-04-19 15:12:57[kafka-producer-network-thread producer-1]错误O.s.K.s.LoggingProducerListener-向主题xxxxx-v1:org.apache.kafka.common.errors.TimeoutException:自批处理创建后已超过xxxxx-v1-3:60043毫秒的过期1条记录,有效负载=“{79

  • 问题内容: 我想做一个查询,从中选择一堆数据,但是我希望能够通过仅选择每三个记录,甚至每个百分之一的记录来降低数据的分辨率。任何。 有什么简单的方法可以用ActiveRecord做到这一点吗? 问题答案: 在Oracle中,我将其编写如下: 这样做的好处是,过滤器发生在数据库中,因此不会检索所有内容。 在PostgreSQL中,这称为(实际上是SQL标准)。在MySQL中,不支持此功能。 在mys

  • 问题内容: 我在数据库上有一些性能测试结果,我要做的是将每1000条记录 分组 (以前按日期升序排列),然后将结果与 AVG 进行汇总。 我实际上正在寻找标准的SQL解决方案,但是任何T-SQL特定的结果也值得赞赏。 查询如下所示: 问题答案: 这样的事情应该会让您入门。如果您可以提供实际的架构,我可以进行适当的更新。

  • 下面是我用来推送主题的方法: 使用命令shell脚本 ./kafka-console-producer.sh--broker-list 10.0.1.15:9092--主题DomainEntityCommandStream ./kafka-console-consumer.sh--boostrap-server 10.0.1.15:9092-topic DomainEntityCommandStr

  • 问题内容: 假设我有这样的pandas DataFrame: 我想获得一个新的DataFrame,其中每个ID的前2个记录如下: 我可以对分组依据中的记录进行编号: 但是,有没有更有效/更优雅的方法来做到这一点?还有一种更优雅的方法来对每个组中的数字进行记录(例如SQL窗口函数row_number())。 问题答案: 你试过了吗 Ouput生成: (请记住,根据数据,你可能需要先进行订购/排序)