当前位置: 首页 > 知识库问答 >
问题:

如何根据分区的闪烁流完成触发气流作业?

何星鹏
2023-03-14

我有一个flink流媒体作业,它从Kafka读取数据并写入文件系统中适当的分区。例如,作业被配置为使用一个bucketing接收器,该接收器写入/数据/日期=${date}/小时=${hour}。

如何检测分区是否已准备好使用,以便相应的气流管道可以在这一小时内进行批处理?

共有1个答案

林鸿飞
2023-03-14

您可以查看ContinuousFileMonitoringSource的实现,以了解它是如何监视文件系统的。然后做一些类似于David Anderson在你的另一个问题中建议的事情,重新创建一个定制的ProcessFunction。

 类似资料:
  • 我在Google Cloud Composer中从Airflow调用数据流作业, a、 b和c是调用数据流作业的任务。我只想在数据流作业完成后运行b,问题是它们都同时运行。 我怎么能等到之前的工作完成?

  • 我有一个关于在Kinesis流中分片数据的问题。我想在向我的kinesis流发送用户数据时使用一个随机分区键,这样碎片中的数据是均匀分布的。为了使这个问题更简单,我想通过在Flink应用程序中键入用户ID来聚合用户数据。

  • 我正在使用flink 1.5.2解决CEP问题。 我的数据来自一个列表,当系统运行时,其他一些进程将向该列表添加新的事件对象。它不是套接字或网络消息。我一直在阅读官方网站的示例。以下是我想我应该做的步骤。 使用env创建数据流。fromCollection(列表) 定义图案图案 使用CEP获取PatternStream。模式(数据流,模式) 使用pattern\u流。选择(…实现选择接口…)以数据

  • 我有一个使用Apache Flink(Flink版本:1.8.1)使用Scala进行流式处理的工作。flow作业要求如下:Kafka->写给Hbase->用不同的主题再次发送给Kafka 在向Hbase写入过程中,需要从另一个表中检索数据。为确保数据不为空(NULL),作业必须(在一定时间内)重复检查数据是否为空。 编辑:我的意思是,有了我在内容中描述的问题,我想过必须在作业流中创建某种类型的作业

  • 我想运行流作业。 当我尝试使用和Flink Web界面在本地运行该作业时,没有问题。 但是,我当前正在尝试使用Flink on YARN(部署在Google Dataproc上)运行我的作业,并且当我尝试取消它时,取消状态将永远持续,并且TaskManager中仍有一个插槽被占用。 这是我得到的日志:

  • 我正在尝试对Flink中的KeyedStream执行映射操作: JsonToObjectMapper运算符的输出是类MessageObject的POJO,它有一个字符串字段'keyfield'。然后在该字段上键入流。 代码抛出NullPointer异常: 似乎其中一个KeyedStream的keyedState中的键为null,尽管我已经验证了'keyfield'始终是有效字符串。根据Flink文