我在Google数据流上运行Beam SDK to 2.10.0作业时遇到问题
流程很简单:我使用Kafka作为源,然后应用固定窗口,然后按键计数元素。但看起来数据永远不会离开计数阶段,直到工作耗尽。计数的输出集合。联合收割机/联合收割机。珀基(计数)/联合收割机。分组值。out0
始终为零。元素仅在排出数据流作业后发出。
代码如下:
public KafkaProcessingJob(BaseOptions options) {
PCollection<GenericRecord> genericRecordPCollection = Pipeline.create(options)
.apply("Read binary Kafka messages", KafkaIO.<String, byte[]>read()
.withBootstrapServers(options.getBootstrapServers())
.updateConsumerProperties(configureConsumerProperties())
.withCreateTime(Duration.standardMinutes(1L))
.withTopics(inputTopics)
.withReadCommitted()
.commitOffsetsInFinalize()
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class))
.apply("Map binary message to Avro GenericRecord", new DecodeBinaryKafkaMessage());
.apply("Apply windowing to records", Window.into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(5)))
.apply("Write aggregated data to BigQuery", MapElements.into(TypeDescriptors.strings()).via(rec -> getKey(rec)))
.apply(Count.<String>perElement())
.apply(
new WriteWindowedToBigQuery<>(
project,
dataset,
table,
configureWindowedTableWrite()));
}
private Map<String, Object> configureConsumerProperties() {
Map<String, Object> configUpdates = Maps.newHashMap();
configUpdates.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return configUpdates;
}
private static String getKey(GenericRecord record) {
//extract key
}
看起来流永远不会离开的阶段。
有人能帮忙吗?
我已经找到原因了。
它与这里使用的时间戳策略有关(. with CreateTime(Duration.standard分钟(1L))
)。
由于在我们的Kafka主题中存在空分区,主题水印从未使用默认的时间戳策略进行推进。我需要实施自定义策略来解决这个问题。
我有一个TestNG测试方法,使用数据提供程序使用多个参数运行: 有时我想并行运行检查测试,有时则不想。我可以通过将代码注释设置为DataProvider(parallel=true)或DataProvider(parallel=false)来控制这一点,但我希望将其作为运行时参数,而不是代码中的设置。 如何设置数据提供程序是否在TestNG套件XML文件中作为命令行参数或属性派生多个线程?
我在创建用于聚合数据的SerDes时遇到了一些问题,需要通过“”发送到另一个主题。然而,我需要为窗口化数据创建一个SerDes,我不知道该怎么做。
我正在遵循入门指南[1],但是我已经从配置设置中删除了MySQL和analytics的内容,因为我不打算使用任何分析函数。但是,scdf服务后来崩溃了,因为没有配置数据源。 好的,所以似乎仍然需要在scdf-config-kafka.yml[2]中配置数据源(尽管从阅读文档来看,我认为它只用于分析内容)。 但为了什么?数据源用于持久化Kafka消息,还是在节点之间建立云流消息? 我找不到任何关于大
我发送字符串从Arduino到PC使用串行通信。消息的格式包括字符、值和空格(分隔数据)。示例消息:。我在Qt中解码这条消息有问题,因为当我使用例如Utf-8解码它时,我将整数转换为字符(以简化的方式),并收到类似的东西:
我正在从Spring XD迁移到Spring Cloud Data Flow。当我寻找模块列表时,我意识到一些源码没有在Spring Cloud Flow中列出--其中一个是Kafka源码。 我的问题是为什么在spring cloud data flow中KAFKA源从标准源列表中删除?
主要内容:1. Tableau数据窗口,2. Tableau数据类型1. Tableau数据窗口 数据窗口是一种显示Tableau和数据源之间连接的方法。可以在单个工作簿中连接到多个不同的数据源。与数据连接关联的小图标提供有关连接性质的其他详细信息。 这是一个以三种不同数据连接显示的工作簿: 全局超级市场数据连接旁边的绿线表示它是工作表中的活动连接。因此,电子表格中的条形图是使用该数据源中的“维度和度量”创建的。因此,使用数据源中的维度和度量创建条形图。 奥林匹克