我想在由安全kafka集群的kafka主题支持的Flink SQL表上执行一个查询。我能够以编程方式执行查询,但无法通过Flink SQL客户端执行。我不知道如何通过Flink SQL客户端传递JAAS配置()和其他系统属性。 FlinkSQL以编程方式查询 这很好。 通过SQL客户端Flink SQL查询 运行此命令将导致以下错误。 中没有任何内容,除了以下注释 SQL客户端运行命令 Flink
我正在尝试每 提供了Kafka主题中的数据,但它不保留顺序。我在循环中做错了什么?此外,必须将Flink的版本从< code>1.13.5更改为< code>1.12.2。 我最初使用的是< code > Flink < code > 1 . 13 . 5 、< code >连接器和< code>2.11的< code>Scala。我到底错过了什么?
我有一个这样的数据集 我想选择第3列和第4列作为我的键和值,我如何在Apache Flink中执行平均操作。 我最多能做到“按键分组”。但是我无法对每个键的值执行平均运算。
我正在评估Apache Flink的流处理,作为Apache Spark的替代品/补充。我们通常使用Spark解决的任务之一是数据扩充。 也就是说,我有来自物联网传感器的带有传感器ID的数据流,并且我有一组传感器元数据。我想将输入流转换为传感器测量传感器元数据流。 在星火中,我可以和RDD一起加入数据流。 我可以用Apache Flink做同样的技巧吗?我在这方面没有看到直接的API。我唯一的想法
在Apache Flink流处理中,连接操作与连接有何不同,因此CoProcessFunction和ProcessJoinFunction有何不同,这是CoProcessFunction提供的onTimer函数吗?您能否提供一个适用于以相互排斥的方式连接/连接的示例用例。
我有两个事件流 L=(l1, l3, l8,...)-更稀疏,表示用户登录IP E=(e2, e4, e5, e9,...)-是特定IP的日志流 较低的索引表示时间戳...如果我们将两个流连接在一起并按时间对它们进行排序,我们将得到: l1, e2, l3, e4, e5, l8, e9,... 能否实现自定义的< code > Window /< code > Trigger 函数来将事件分组到
我正在启动一个新的Flink应用程序,允许我的公司执行大量报告。我们有一个现有的遗留系统,我们需要的大部分数据都保存在SQL Server数据库中。在开始使用新部署的Kafka流中的更多数据之前,我们首先需要使用这些数据库中的数据。 我花了很多时间阅读Flink的书和网页,但我有一些简单的问题和假设,我希望你能帮助我进步。 首先,我想使用DataStream API,这样我们既可以使用历史数据,也
我一直在关注Flink 1.14中针对有界数据的不同全局数据排序选项。我发现Stackoverflow和其他网站上关于这个的很多问题都是好几年前的问题了,关于不推荐使用的API或者没有完全回答这个问题。由于Flink正在快速发展,我想问一下最新稳定的Flink (1.14)中的可用选项。 以下是我如何理解当前的情况(这可能是错误的)。我的问题也附上。Flink 有两个 API —— 和 , 它们可
假设我有两个
我想在 Flink 中的输入数据流上应用 ProcessFunction(),以使用单个缓存对象处理每个传入元素。我的代码看起来像这样: 当我并行化此作业时,我假设作业的每个并行实例都有自己的缓存对象,因此,单个缓存键可能存在于多个缓存对象中。但是,我希望特定键有一个缓存条目,也就是说,对应于特定键的所有记录必须由单个实例和单个缓存对象处理。在 myStream 上使用 keyBy() 是否可确保
在Apache Flink中,如果我在一个主键上连接两个数据集,我会得到一个元组2,其中包含每个数据集中相应的数据集条目。 问题是,当将方法应用于即将到来的tuple 2数据集时,它看起来并不漂亮,尤其是如果两个数据集的条目都具有大量功能。 在两个输入数据集中使用元组会给我一些这样的代码: 我不介意使用POJO或case类,但我不明白这会如何使它变得更好。 问题1:有没有一个很好的方法来扁平化元组
我正在尝试使用< code > DataSet . writeastext(" file:///path/to/my/file ")将数据集API程序的结果写入一个文件。 但是,该程序不产生任何输出。也不会创建输出文件。这可能是什么原因?
来自Flink的官方文件: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/joining.html#interval-join 示例代码是: 从上面的代码中,我想知道如何指定执行此间隔连接的开始时间(例如,从今天开始)(开始时间之前的数据将不考虑在内)。 例如,我已经运行了3天的程
我一直在批处理模式下测试使用TableApi和DataStream api的简单联接。然而,我得到了非常糟糕的结果,所以它一定是我做错了什么。用于联接的数据集约为 900gb 和 3gb。用于测试的环境是具有 10 * m5.xlarge 工作节点的 EMR。 使用的TableApi方法是在数据s3路径上创建一个表,并在目标s3路径上对创建的表执行插入语句。通过调整任务管理器内存、numberOf
我尝试通过IDs连接两个数据流,发现有两个API集可以这样做, < Li > https://ci . Apache . org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining . html < Li > https://ci . Apache . org/projects/flink/flink-docs-r