美好的一天 我正在通过flink/kafka接收数据(流数据)。我连接的端口与我需要写回消息的端口相同 TCP/IP- 连接到 URL 和端口工作正常。我接收并处理写入主题的数据 现在我还需要写回我连接到的同一URL和端口。{由于 Url 和端口可以同时发送和接收数据} 我把它写到另一个端口 这个管用...问题是试图写入同一个端口。当我使用我从中读取的同一个端口时...flink作业失败 有什么想
我使用Flink数据流API中的< code > keyedcorprocessfunction 类来实现一个超时用例。场景如下:我有一个输入kafka主题和一个输出kafka主题,一个服务从输入主题中读取并处理它(持续可变的时间),然后在输出Kafka主题中发布响应。 现在要实现超时(必须使用Flink datastream API),我有一个从kafka输入主题读取的和另一个从kafka输出主
我们有一个在 Java 上编写并在 AWS Kinesis Data Analytics 上运行的 flink 应用程序。应用程序从 AWS 托管服务 Kafka(kafka 主题 1)读取输入流,然后应用业务逻辑(一些计算),最后将输出写入另一个 Kafka 主题(kafka 主题 2)。 并行度为 10,主题有 15 个分区。预计在 5 分钟内处理 ~20K 并发数据。但是,经过所有优化后,我
我有一个流应用程序,它从Kafka主题读取数据,从文件读取数据,聚合数据并创建结果。 每5分钟,我想得到多少记录被消耗和记录从文件中读取的计数,并将其发送到另一个流。 我该怎么做?
我有以下配置: 一个具有 2 个分区的 kafka 主题 一个动物园管理员实例 一个 kafka 实例 具有相同组 ID 的两个使用者 Flink 作业片段: 方案 1: 我在eclipse上写了一个flink job (Producer ),它从一个文件夹中读取一个文件,并在kafka主题上放置msgs。 所以当我使用eclipse运行这段代码时,它工作得很好。 例如:如果我放置一个有100条记
我想打印Flink已开始读取的Kafka主题的每个分区的起始偏移量?
在Flink中有没有任何方法可以自动推断出Kafka主题DDL,而不需要手动查询,就像Spark中的情况一样。
这就是flink-quickstart-scala的建议: 它还与Flink项目配置保持一致: 然而,Flink Hive Integration docs的建议恰恰相反: 如果您正在构建自己的程序,那么在mvn文件中需要以下依赖项。建议不要在生成的jar文件中包含这些依赖项。您应该在运行时添加上面所述的依赖项。
null 其中lambda1、2等是条件检查函数,例如 但不知什么原因对我不起作用,也许还有其他方法?正如我从文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html)中了解到的,OutputTag用于创建标记为tag的附加消息。还是我错了?
我有两个flink。例如:和。我想将这两个流合并成1个流,这样我就可以使用与的dag相同的处理函数来处理它们。
我有一个案例场景,其中我有一个流生成器客户机,它正在生成多个流,合并它们并将其发送到socket,我希望Flink程序作为服务器监听它。正如我们所知,服务器必须首先打开,以便它能够监听客户机请求。我尝试使用下面给出的代码来做同样的事情 下面给出了充当客户端的流生成器的代码
我正在学习如何使用Flink处理流数据。 根据我的理解,我可以多次使用函数进行各种转换。 表示数据源不断向Flink发送字符串。所有字符串都是JSON格式的数据,如下所示: 下面是我的代码: 正如您所看到的,我的示例非常简单:获取并反序列化数据-->将string转换为Json对象-->将Json对象转换为string并获取所需内容(这里只需要)。 就目前而言,似乎一切都很好。我确实从日志文件中获
我们和Flink玩了一会儿。到目前为止,我们一直在Hadoop2.x/Yarn上使用Spark和标准M/R。 除了YARN上的Flink执行模型,即AFAIK不像spark那样是动态的,执行者动态地获取和释放YARN中的虚核,问题的要点如下。 Flink似乎很神奇:对于流媒体API,我只想说它很棒,太棒了。 不幸的是,这并不是一个小问题,因为在90%的用例中,您在HDFS上有一个大数据分区存储,通
另外,如果我想进行性能比较并自己实现它,是否有任何流API(如twitter API)比twitter提供更高的吞吐量,并且是开源的? 谢谢!