当前位置: 首页 > 知识库问答 >
问题:

带Kafka源代码和数据流运行程序的Beam java SDK 2.10.0:窗口计数。perElement从不释放数据

燕扬
2023-03-14

我在Google数据流上运行Beam SDK to 2.10.0作业时遇到问题

流程很简单:我使用Kafka作为源,然后应用固定窗口,然后按键计数元素。但看起来数据永远不会离开计数阶段,直到工作耗尽。计数的输出集合。联合收割机/联合收割机。珀基(计数)/联合收割机。分组值。out0始终为零。元素仅在排出数据流作业后发出。

代码如下:

public KafkaProcessingJob(BaseOptions options) {

    PCollection<GenericRecord> genericRecordPCollection = Pipeline.create(options)
                     .apply("Read binary Kafka messages", KafkaIO.<String, byte[]>read()
                           .withBootstrapServers(options.getBootstrapServers())
                           .updateConsumerProperties(configureConsumerProperties())
                           .withCreateTime(Duration.standardMinutes(1L))
                           .withTopics(inputTopics)
                           .withReadCommitted()
                           .commitOffsetsInFinalize()
                           .withKeyDeserializer(StringDeserializer.class)
                           .withValueDeserializer(ByteArrayDeserializer.class))

                    .apply("Map binary message to Avro GenericRecord", new DecodeBinaryKafkaMessage());

                    .apply("Apply windowing to records", Window.into(FixedWindows.of(Duration.standardMinutes(5)))
                                       .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
                                       .discardingFiredPanes()
                                       .withAllowedLateness(Duration.standardMinutes(5)))

                    .apply("Write aggregated data to BigQuery", MapElements.into(TypeDescriptors.strings()).via(rec -> getKey(rec)))
                            .apply(Count.<String>perElement())
                            .apply(
                                new WriteWindowedToBigQuery<>(
                                    project,
                                    dataset,
                                    table,
                                    configureWindowedTableWrite()));   
}

private Map<String, Object> configureConsumerProperties() {
    Map<String, Object> configUpdates = Maps.newHashMap();
    configUpdates.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return configUpdates;
}

private static String getKey(GenericRecord record) {
    //extract key
}

看起来流永远不会离开的阶段。

有人能帮忙吗?


共有1个答案

万俟棋
2023-03-14

我已经找到原因了。

它与这里使用的时间戳策略有关(. with CreateTime(Duration.standard分钟(1L)))。

由于在我们的Kafka主题中存在空分区,主题水印从未使用默认的时间戳策略进行推进。我需要实施自定义策略来解决这个问题。

 类似资料:
  • 我有一个TestNG测试方法,使用数据提供程序使用多个参数运行: 有时我想并行运行检查测试,有时则不想。我可以通过将代码注释设置为DataProvider(parallel=true)或DataProvider(parallel=false)来控制这一点,但我希望将其作为运行时参数,而不是代码中的设置。 如何设置数据提供程序是否在TestNG套件XML文件中作为命令行参数或属性派生多个线程?

  • 我在创建用于聚合数据的SerDes时遇到了一些问题,需要通过“”发送到另一个主题。然而,我需要为窗口化数据创建一个SerDes,我不知道该怎么做。

  • 我正在遵循入门指南[1],但是我已经从配置设置中删除了MySQL和analytics的内容,因为我不打算使用任何分析函数。但是,scdf服务后来崩溃了,因为没有配置数据源。 好的,所以似乎仍然需要在scdf-config-kafka.yml[2]中配置数据源(尽管从阅读文档来看,我认为它只用于分析内容)。 但为了什么?数据源用于持久化Kafka消息,还是在节点之间建立云流消息? 我找不到任何关于大

  • 我发送字符串从Arduino到PC使用串行通信。消息的格式包括字符、值和空格(分隔数据)。示例消息:。我在Qt中解码这条消息有问题,因为当我使用例如Utf-8解码它时,我将整数转换为字符(以简化的方式),并收到类似的东西:

  • 我正在从Spring XD迁移到Spring Cloud Data Flow。当我寻找模块列表时,我意识到一些源码没有在Spring Cloud Flow中列出--其中一个是Kafka源码。 我的问题是为什么在spring cloud data flow中KAFKA源从标准源列表中删除?

  • 主要内容:1. Tableau数据窗口,2. Tableau数据类型1. Tableau数据窗口 数据窗口是一种显示Tableau和数据源之间连接的方法。可以在单个工作簿中连接到多个不同的数据源。与数据连接关联的小图标提供有关连接性质的其他详细信息。 这是一个以三种不同数据连接显示的工作簿: 全局超级市场数据连接旁边的绿线表示它是工作表中的活动连接。因此,电子表格中的条形图是使用该数据源中的“维度和度量”创建的。因此,使用数据源中的维度和度量创建条形图。 奥林匹克