我正在使用KafkaIO API https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/io/Kafka/kafkao.html流式传输来自Kafka主题的消息
管道流程如下:
Window.into[Array[Byte]](FixedWindows.of(Duration.standardSeconds(10)))
.triggering(
Repeatedly
.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(10))
)
)
.withAllowedLateness(Duration.standardSeconds(0))
.discardingFiredPanes()
)
根据documenattion窗口是必要的,如果我们正在进行任何计算,如GroupByKey等。因为我只是解码数组字节消息并将它们存储到BigQuery,所以可能不需要。
请告诉我,我是否需要使用窗口?
已经发布了一个类似问题的答案,其中的数据来自PubSub。主要思想是,不可能收集一个无界PCollections的所有元素,因为新元素不断被添加,因此必须实施以下两个策略之一:
还可能需要通过使用以下命令设置选项的适当arg参数来启用管道中的流:
pipeline_options.view_as(StandardOptions).streaming = True
apachebeam中的核心转换(Map、Filter、flatte)是否使用并行处理来处理数据元素,如果是,具体什么时候应该使用ParDo转换?
尝试通过首先运行 然后运行 并得到以下错误 [错误]执行目标组织失败。科德豪斯。project word count beam上的mojo:exec-maven插件:1.4.0:java(默认cli):执行java类时发生异常。null:InvocationTargetException:java。lang.IllegalStateException:无法找到d的注册器- 然而,如果我运行2017
我正在尝试阅读使用apache Beam上的KafkaIO的多个kafka代理。偏移量管理的默认选项是kafka分区本身(不再使用来自kafka>0.9的zookeper)。使用此设置,当我重新启动作业/管道时,会出现重复和丢失记录的问题。 从我读到的内容来看,处理这一点的最佳方法是管理对外部数据存储的偏移量。用当前版本的apache beam和Kafkaio是否可以做到这一点?我现在使用的是2.
我有一个现有的hbase表,已经把所有的数据在凤凰格式。然后尝试在上面创建一个凤凰桌。在Phoenix4.9中,一切都很好。但是Phoenix4.10有一个列映射https://blogs.apache.org/phoenix/entry/column-mapping-and-immutable-data,所以我的列限定符不能是原始字符串,比如地址、名称等。但是必须映射到某个整数,我该怎么做呢?有
我正在使用Apache Beam的kafkaIO阅读一个主题,该主题在Confluent schema Registry中有一个avro模式。我可以反序列化消息并写入文件。但最终我想写给BigQuery。我的管道无法推断架构。我如何提取/推断模式并将其附加到管道中的数据,以便我的下游进程(写入BigQuery)能够推断模式? 下面是我使用模式注册表url设置反序列化器的代码,以及我从Kafka读到
我们正在使用Apache Phoenix访问HBase数据存储。作为某些需求的一部分,我们需要记录从任何Phoenix客户端发出的每个更新操作,例如写和删除表命令。Phoenix的日志记录是否已经以可解析的格式捕获了这些命令?如果没有,我如何捕捉这些信息?