我有来自不同Apache Kafka主题的4个输入数据流(JSON消息)的Apache Flink作业,而我只有一个对象XFilterFunction--它执行一些筛选。我写了一些数据管道逻辑(原始示例): 在作业中使用一个新对象XFilterFunction是好还是坏? 还是使用两个新对象XFilterFunction更好?(2个流->2个新筛选器对象)
不幸的是,Kafka Flink连接器只支持-csv、json和avro格式。因此,我不得不使用较低级别的API(数据流)。 问题:如果我可以从datastream对象中创建一个表,那么我就可以接受在该表上运行的查询。它将使转换部分无缝和通用。是否可以在数据流对象上运行SQL查询?
我有一个带有一些键的流,我想为每个键存储一些状态。我的流看起来如下所示: 在KeyedProcessFunction中,我有一个状态变量: 我对此还这么陌生,我做错了什么?
RichAsyncFunction的Flink文档说明: 尚不支持RuntimeContext中与状态相关的API,因为在访问工作线程中的状态时,键可能会发生更改。 的文档在这里。
我有一个高频率的流发布事件,每个事件包含一些关于汽车的信息。我需要处理这个事件流,但排除具有特定城市和车牌号组合的事件。这些列入黑名单的城市和车牌号码组合的信息来自一个每天更新的S3文件。 S3文件如下所示:例如。假设它被称为 方法: 将S3文件用作流源。 转换事件以产生以下结构: 广播状态解决方案: 但我现在碰到了冷启动问题。在处理主数据之前,确保广播状态可用的可靠方法是什么。
这个问题在这里已经被问到了,但是两年过去了,我想知道是否有什么改变。 我有一个用例,我希望在两个Flink操作符之间共享状态: > A流是主流,它连续流动 流B只是富集数据的数据集。它很大(几个GBs),因此不能作为广播流。 流B有一个与之相关联的运算符(FlatMap,但可以是任何实际的),它充当状态加载器,并将浓缩数据作为列表状态加载到RocksDB中。 null
我读到了四个Kinesis流的数据。每个流中的数据都是不同的数据类型。读取所有四个流后,我分配时间戳和水印,并聚合来自每个流的数据。四个聚合的结果都是使用相同的泛型对象输出的。我想合并四个流的结果,这样我就可以将合并后的流发送到一个ProcessFunction。这基本上允许我像使用CoProcessFunction一样使用ProcessFunction,但我可以处理来自两个以上流的数据(在本例中
问题: 在运行时,引擎会为每个数据流创建一个线程吗?还是每个操作员一个线程? 是否可以在作业启动时在运行时动态创建数据流?(即,如果作业启动时从文件中读取N,并且需要创建相应的N个流) 当创建大量流(N~10000个)时,与在单个流中创建N个分区相比,是否有任何特定的性能影响?
我可以在文档中看到: Flink目前只为没有迭代的作业提供处理保证。对迭代作业启用检查点会导致异常。为了在迭代程序上强制检查点,用户需要在启用检查点时设置一个特殊的标志:env.enablecheckpointing(interval,force=true)。 如果是一个而不是一个(这意味着它也可以保存状态),会有什么变化吗?
我需要知道flinkcep的实现背后是否存在文件。如果有,那它们是什么?
我正在使用一个Flink流式Java应用程序,输入源为Kafka。在我的应用程序中总共使用了4个流。一个是主数据流,另一个3个用于广播流。 我加入了使用任何一种类型的三个广播流。我已经作为流B广播,并且能够在广播过程函数上下文状态(即在processBroadcastElement())中接收。 我的问题是, > 是否可以在广播状态下存储大数据? 注意:根据我的理解,Flink广播状态在运行时保存
在Flink-Job中,我目前有两个流,一个是每分钟从Kafka主题更新的主数据流,另一个流(广播流)用于KeyedBroadcastProcessFunction的process元素函数中,用于对主流数据进行一些计算。 2)主数据可以有两个广播流吗? 3)由于流数据是完全不同的数据,广播,第三个数据流不经常变化,所以连接是不起作用的。它就像一个主数据,在计算中和主数据流一起使用,找不到任何解决方
我正在使用apache flink构建一个相当复杂的数据流网络。其思想是,用Flink实现一个规则引擎。 作为应用程序的基本描述,它应该是这样工作的: 数据由kafka消费者源接收,并用多个数据流处理,直到最终发送到kafka生产者接收器。传入的数据包含具有逻辑键(“object-id”)的对象,传入的消息可能引用相同的object-id。对于每个给定的object-id,必须在整个应用程序中保留
我的问题可能会引起一些混乱,所以请先看说明。找出我的问题可能会有帮助。我会在问题的后面添加我的代码(也欢迎任何关于我的代码结构/实现的建议)。感谢您事先提供的任何帮助! 我的问题: 有什么用途?我的代码会输出没有这句话的结果。如果我加上这句话,就会出现一个例外: - 描述:编程新手。最近我需要使用Flink批处理来处理一些数据(分组数据、计算标准差等)。然而,我到了一个点,我需要输出两个数据集。结