当前位置: 首页 > 知识库问答 >
问题:

Apache波束处理序列单元

牧熙云
2023-03-14
driver_id,x,y,event_time,processing_time
1,1,1,100001,20001
1,5,5,100004,20002
1,4,5,100003,20003
driver_id, distance, event_time
1,3,100001
1,1,100003

所以我的问题是:

  1. 如何使用Apache beam计算相对于事件晚入的距离?
  2. 如何处理位于两个不同窗口中的连续坐标?
  3. 还有其他可供选择的流处理框架可以解决我的问题吗?
  4. 事件可能在事件时间后两天交付,但我希望在确保它们都交付之前有一个大致的结果。

共有1个答案

闾丘冠玉
2023-03-14

我建议您花一些时间来研究如何在Windows中处理延迟到达的数据。

您可以创建一个固定窗口,然后允许延迟数据;但是,您还必须定义如何处理临时累加。

根据您的键结构,也许您可以在driver上键,然后提供一个更具体的启发式来确定完整性。

 类似资料:
  • 我没有找到任何文档允许将错误处理应用于此步骤,也没有找到将其重写为DOFN的方法。对此应用错误处理有什么建议吗?谢谢

  • 在我的数据流作业中,我需要初始化配置工厂,并在实际处理开始之前将某些消息记录在审核日志中。 我将配置工厂初始化代码审计日志记录放在父类PlatformInitializer中,并在我的主管道类中扩展它。 因此,我还必须在我的管道类中实现可序列化接口,因为beam抛出了错误-<代码>java。io。NotSerializableException:组织。德维塔姆。自定义作业 在PlatformIni

  • 我正在构建一个读取Avro通用记录的管道。为了在各个阶段之间传递GenericRecord,我需要注册avrocoder。文档表明,如果我使用通用记录,模式参数可以是任意的:https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/coders/avrocoder.html#of-java.lang.class-org.a

  • 我正在将我的google dataflow java 1.9迁移到beam 2.0,并尝试使用BigtableIO。写 在大舞台前的巴黎公园里,我正在努力让它变得更容易接受。 上述代码引发以下异常InvalidProtocolBufferException:协议消息结束组标记与预期标记不匹配 v是对象列表(Vitals.class)。hbase api使用Put方法创建变异。如何创建将与Bigta

  • 我试图在y有最高值时显示x位置。在if语句中,我试图将y值与120进行比较,但它似乎从来都不是真的,所以它没有使用文本函数显示我的x位置。我也试着取整y值,但结果仍然不是我想要的。有人能帮我吗?

  • 填充序列pad_sequences keras.preprocessing.sequence.pad_sequences(sequences, maxlen=None, dtype='int32', padding='pre', truncating='pre', value=0.) 将长为nb_samples的序列(标量序列)转化为形如(nb_samples,nb_timesteps)

  • 填充序列pad_sequences keras.preprocessing.sequence.pad_sequences(sequences, maxlen=None, dtype='int32',padding='pre', truncating='pre', value=0) 将长为nb_samples的序列(标量序列)转化为形如(nb_samples,nb_timesteps)2D num

  • 嗨,我已经创建了一个apache beam管道,测试了它,并在eclipse内部运行了它,包括本地和使用dataflow Runner。我可以在eclipse控制台中看到管道正在运行。e.控制台上的日志。 这是我在cmd提示符中使用的maven命令, 这是我用来创建管道和设置选项的代码段。