# p is beam.Pipeline()
_ = (p | beam.io.ReadFromPubSub(subscription="my_sub")
| beam.Map(add_timestamping)
| beam.WindowInto(window.FixedWindows(60))
| beam.Map(lambda elem: elem) # exracting the key somehow, not relevant here
| beam.GroupByKey()
# (...)
| beam.io.WriteToPubSub("output_topic")
)
p.run()
def add_timestamping(elem):
import json
import apache_beam as beam
msg = json.loads(elem)
unix_timestamp = msg['timeStamp'] / 1000
return beam.window.TimestampedValue(msg, unix_timestamp)
sc.pubsubSubscription[String]("my_sub")
.applyTransform(ParDo.of(new CustomTs()))
.withFixedWindows(Duration.standardSeconds(60))
.map(x => x) // exracting the key somehow, not relevant here
.groupByKey
// (...)
.saveAsPubsub("output_topic")
import io.circe.parser._
class CustomTs extends DoFn[String, String] {
@ProcessElement
def processElement(@Element element: String, out: DoFn.OutputReceiver[String]): Unit = {
val json = parse(element).right.get
val timestampMillis: Long = json.hcursor.downField("timeStamp").as[Long].getOrElse(0)
out.outputWithTimestamp(element, new Instant(timestampMillis))
}
}
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.IllegalArgumentException:
Cannot output with timestamp 2019-03-02T00:51:39.124Z.
Output timestamps must be no earlier than the timestamp of the current input
(2019-03-28T14:57:53.195Z) minus the allowed skew (0 milliseconds).
我的问题是:如何使用自定义时间戳处理数据,并能够在使用Beam API定义的windows上操作?
我正在构建一个事件驱动的微服务架构,它应该是云不可知的(尽可能多)<由于这最初是在GCP中进行的,我不想在配置和所有这些方面花费太长时间,我打算直接将GCP的发布/订阅用于事件队列,并在稍后处理其他云实现,但后来我遇到了Spring云数据流,这看起来很好,因为这些是Spring Boot微服务,我需要一种方法来协调它们 Spring Cloud数据流是否支持Pub Sub作为事件队列? 在配置和设
我曾经使用过SpringCloudDataFlow、rabbitmq和kafka,但我想知道是否可以使用GooglePub/sub安装scdf。 我不想创建一个流(新的应用程序spring cloud stream),将源或接收器连接到gcp,我希望google pub/sub over spring cloud data flow server用作中间消息代理。 有什么建议吗?
还有其他实现“数据绑定”的模式吗?
在我的项目中,我希望在Google Dataflow中使用流式管道来处理发布/订阅消息。在清理输入数据时,我还希望从BigQuery获得一个侧面输入。这会导致两个输入中的一个不工作。 我在流媒体的Pipeline选项中设置了=True,这允许Pub/Sub输入正确处理。但是BigQuery与流媒体管道不兼容(见下面的链接): https://cloud.google.com/dataflow/do
如果我想发送消息到谷歌PubSub并使用它的消息。您建议使用Spring cloud GCP库还是只使用Google cloud Java API。 有人能区分这两者吗?或者与谷歌云pubsub库相比,Spring Cloud gcp提供了哪些功能。
我有一些问题。 基于类中的时间戳,我想做一个逻辑,排除在1分钟内输入N次或更多次的数据。 UserData类有一个时间戳变量。 起初我试着用一个翻滚的窗户。 但是,滚动窗口的时间计算是基于固定时间的,因此无论UserData类的时间戳如何,它都不适合。 如何处理流上窗口UserData类的时间戳基? 谢谢。 附加信息 我使用这样的代码。 我试了一些测试。150个样本数据。每个数据的时间戳增加1秒。