当前位置: 首页 > 知识库问答 >
问题:

如何使用Apache Flink读取websocket数据

赫连晋
2023-03-14

我正在尝试使用ApacheFlink从websocket读取数据

我的Flink工作是连接到webSocket,但它不是从webSocket拉数据。

下面是我尝试使用ApacheFlink API连接到websocket的示例代码

RichSourceFunction中的run()方法既不执行也不抛出任何错误。

@Slf4j
public class Main {

    public static final int CHECKPOINTING_INTERVAL_MS = 5000;
    private static final String JOB_NAME = "Flink Streaming Java API Skeleton";

    /**
     * Main Flink job.
     *
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final ObjectMapper objectMapper = new ObjectMapper();

        env.setParallelism(4);

        ParameterTool paramTool = ParameterTool.fromArgs(args);
        env.getConfig().setGlobalJobParameters(paramTool);

        DataStreamSource<String> mySocketStream = env.addSource(new MyWebSocketSourceFunc());
        mySocketStream.map(new MapIt()).print();
//        mySocketStream.print();

        env.enableCheckpointing(CHECKPOINTING_INTERVAL_MS);
        env.setRestartStrategy(RestartStrategies.noRestart());
        env.execute(JOB_NAME);
    }

    public static class MyWebSocketSourceFunc extends RichSourceFunction<String> {
        private boolean running = true;
        transient AsyncHttpClient client;
        transient BoundRequestBuilder boundRequestBuilder;
        transient WebSocketUpgradeHandler.Builder webSocketListener;
        private BlockingQueue<String> messages = new ArrayBlockingQueue<>(100);

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            WebSocketUpgradeHandler webSocketUpgradeHandler = webSocketListener.addWebSocketListener(
                    new WebSocketListener() {

                        private final ObjectMapper myMapper = new ObjectMapper();

                        private String getRsvpId(String payload) {
                            try {
                                Map map = myMapper.readValue(payload, Map.class);
                                Object rsvpId = map.get("rsvp_id");
                                return rsvpId != null ? rsvpId.toString() : "NOT FOUND";
                            } catch (IOException e) {
                                log.error("Mapping failed, returning 'null'");
                                return "NULL";
                            }
                        }

                        @Override
                        public void onOpen(WebSocket webSocket) {
                        }

                        @Override
                        public void onClose(WebSocket webSocket, int i, String s) {
                        }

                        @Override
                        public void onError(Throwable throwable) {
                        }

                        @Override
                        public void onTextFrame(String payload, boolean finalFragment, int rsv) {
                            log.debug("onTextFrame({}), rsvp_id={}", hash(payload), getRsvpId(payload));
                            if (payload != null) {
                                try {
                                    messages.put(payload);
                                } catch (InterruptedException e) {
                                    log.error("Interrupted!", e);
                                    Thread.currentThread().interrupt();
                                }
                            }
                        }
                    }).build();
            boundRequestBuilder.execute(webSocketUpgradeHandler).get();

            while (running) {
                ctx.collect(messages.take());
            }
            running = false;
        }

        @Override
        public void cancel() {
            log.info("cancel function called");
            running = false;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            log.info("open function called");
            super.open(parameters);
            client = Dsl.asyncHttpClient();
            boundRequestBuilder = client.prepareGet("ws://stream.meetup.com/2/rsvps");
            webSocketListener = new WebSocketUpgradeHandler.Builder();
        }

        private String hash(String input) {
            if (input == null) {
                return "-- NULL --";
            }

            try {
                MessageDigest md = MessageDigest.getInstance("MD5");
                md.update(input.getBytes());
                byte[] digest = md.digest();
                return DatatypeConverter.printHexBinary(digest).toUpperCase();
            } catch (NoSuchAlgorithmException e) {
                log.error("Cound not instantiate MD5", e);
                return "--NOT CALCULATED--";
            }
        }
    }

    public static class MapIt extends RichMapFunction<String, String> {

        final ObjectMapper objectMapper = new ObjectMapper();

        @Override
        public String map(String value) throws Exception {
            Map<String, Object> mapped = objectMapper.readValue(value, Map.class);
            Object rsvp = mapped.get("rsvp_id");
            return rsvp != null ? rsvp.toString() : "null" ;
        }
    }
}

这是我执行这项工作所遵循的参考文件

蒂娅。


共有1个答案

公孙英飙
2023-03-14

Flink包括一个内置的插座源连接器。您将在文档中找到一个演示如何使用它的示例。这将比调试其他实现更容易。

还要注意,不建议在生产应用程序中使用套接字,因为它们无法提供任何容错保证(因为它们不支持检查点)。

 类似资料:
  • 我只找到TextInputFormat和CsvInputFormat。那么,如何使用ApacheFlink读取HDFS中的拼花文件呢?

  • 所以我必须检索存储在HDFS中的文件的内容,并对其进行某些分析。 问题是,我甚至无法读取文件并将其内容写入本地文件系统中的另一个文本文件。(我是Flink的新手,这只是一个测试,以确保我正确读取了文件) HDFS中的文件是纯文本文件。这是我的密码: 在我运行/tmp之后,它没有输出。 这是一个非常简单的代码,我不确定它是否有问题,或者我只是做了一些别的错误。正如我所说,我对Flink完全是新手 此

  • 问题内容: 以下是表格HTML源代码,对于selenium读取其内容而言似乎非常复杂。有人可以帮助我,使用selenium将数据读入python吗? 问题答案: 要使用Selenium Webdriver读取此表,xpath似乎是简单的方法- 我不太了解python,所以代码可能是错误的,但是这个主意似乎是正确的- 要找出中的div标签数量,我们使用xpath- 它将返回一个 大小为6 的列表。

  • 本文向大家介绍如何使用JSP读取表单数据?,包括了如何使用JSP读取表单数据?的使用技巧和注意事项,需要的朋友参考一下 JSP使用getParameter()方法读取简单参数,并使用getInputStream()方法读取来自客户端的二进制数据流来处理请求。 使用JSP读取表单数据 JSP根据情况使用以下方法自动处理表单数据解析- getParameter():您调用request.getPara

  • 问题内容: 在wss://ws-feed.gdax.com上编写bash脚本以连接到GDAX的Websocket Feed ,但是在我得到curl时似乎不支持此功能 问题答案: 好吧,您可以尝试模拟所需的标头以使用curl获得一些响应: https://gist.github.com/htp/fbce19069187187ec1cc486b594104f01d0或 Linux Bash:如何以客户

  • 问题内容: 我正在尝试将数据从一页传递到另一页。 www.mints.com?name=某物 如何使用JavaScript 阅读? 问题答案: 下面的a代码可以工作,并且在不可用的情况下仍然有用,但是它是在JavaScript中没有本机解决方案的时候编写的。在现代浏览器或Node.js中,更喜欢使用内置功能。 用法如下: 它返回一个像这样的对象: 所以 给