当前位置: 首页 > 知识库问答 >
问题:

flink水槽只支持bio吗?

束雅达
2023-03-14

接收器的调用方法似乎没有办法使异步io?例如返回Future

例如,redis连接器使用jedis lib同步执行redis命令:

https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java

然后,它将阻止flink的任务线程等待redis服务器对每个命令的网络响应?!其他操作符是否可以与sink在同一线程中运行?如果是这样,那么它也会阻止他们吗?

我知道flink有asyncio api,但它似乎不是为接收器impl使用的?

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html

共有1个答案

濮嘉茂
2023-03-14

正如@Dexter所提到的,您可以使用RichAsyncFunction。下面是一个示例代码(可能需要进一步更新才能正常工作;)

    AsyncDataStream.orderedWait(ds, new RichAsyncFunction<Tuple2<String,MyEvent>, String>() {
        transient private RedisClient client;
        transient private RedisAsyncCommands<String, String> commands;
        transient private ExecutorService executor;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            client = RedisClient.create("redis://localhost");
            commands = client.connect().async();
            executor = Executors.newFixedThreadPool(10);
        }

        @Override
        public void close() throws Exception {
            // shut down the connection and thread pool.
            client.shutdown();
            executor.shutdown();

            super.close();
        }

        public void asyncInvoke(Tuple2<String, MyEvent> input, final AsyncCollector<String> collector) throws Exception {
            // eg.g get something from redis in async
            final RedisFuture<String> future = commands.get("key");
            future.thenAccept(new Consumer<String>() {
                @Override
                public void accept(String value) {
                     collector.collect(Collections.singletonList(future.get()));
                }
            });
        }
    }, 1000, TimeUnit.MILLISECONDS);
 类似资料:
  • 我有一个Flink 1.11作业,它使用来自Kafka主题的消息,键入它们,过滤它们(keyBy后跟自定义ProcessFunction),并通过JDBC接收器将它们保存到db中(如下所述:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html) Kafka消费者使用以下选项初始化:

  • 我正在尝试编写一个Flink应用程序,它从Kafka读取事件,从MySQL丰富这些事件并将这些数据写入HBase。我正在中进行MySQL丰富,我现在正在尝试弄清楚如何最好地写入HBase。我想批量写入HBase,所以我目前正在考虑使用,后跟标识(仅返回),然后编写,它获取记录列表并批处理放入。 这是正确的做事方式吗?仅仅为了进行基于时间的缓冲而使用所有窗口和应用窗口感觉很奇怪。

  • 我正在编写一个flink代码,其中我正在从本地系统读取一个文件,并使用“writeUsingOutputFormat”将其写入数据库。 现在我的要求是写入hdfs而不是数据库。 你能帮我在Flink怎么办吗。 注意:hdfs已启动并在本地计算机上运行。

  • 我的要求是将数据发送到不同的ES接收器(基于数据)。例如:如果数据包含特定信息,则将其发送到sink1,否则将其发送到sink2等(基本上是根据数据动态发送到任何一个接收器)。我还想分别为ES sink1、ES sink2、ES sink3等设置并行度。 有什么简单的方法可以在flink中实现上述目标吗? 我的解决方案:(但并不满意) 我可以想出一个解决方案,但有中间Kafka主题,我写(topi

  • 我正在尝试使用hdfs水槽运行水槽。hdfs在不同的机器上正常运行,我甚至可以与水槽机器上的hdfs交互,但是当我运行水槽并向其发送事件时,我收到以下错误: 同样,一致性不是问题,因为我可以使用hadoop命令行与hdfs交互(水槽机不是datanode)。最奇怪的是,在杀死水槽后,我可以看到tmp文件是在hdfs中创建的,但它是空的(扩展名仍然是. tmp)。 关于为什么会发生这种情况的任何想法

  • 我遇到了Flume的问题(Cloudera CDH 5.3上的1.5): 我想做的是:每5分钟,大约20个文件被推送到假脱机目录(从远程存储中抓取)。每个文件包含多行,每行是一个日志(在JSON中)。文件大小在10KB到1MB之间。 当我启动代理时,所有文件都被成功推送到HDFS。1分钟后(这是我在flume.conf中设置的),文件被滚动(删除. tmp后缀并关闭)。 但是,当在假脱机目录中找到