当前位置: 首页 > 知识库问答 >
问题:

不使用netcat服务器从套接字读取Flink中的字符串datastream

昌乐
2023-03-14

我有一个案例场景,其中我有一个流生成器客户机,它正在生成多个流,合并它们并将其发送到socket,我希望Flink程序作为服务器监听它。正如我们所知,服务器必须首先打开,以便它能够监听客户机请求。我尝试使用下面给出的代码来做同样的事情

 public static void main(String[] args)  throws Exception {

    //setting the envrionment variable as StreamExecutionEnvironment
      StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        environment.setParallelism(1);


        DataStream<String> stream1 = environment.socketTextStream("localhost", 9000);
        stream1.print();



                //start the execution
     environment.execute(" Started the execution ");



}// main

下面给出了充当客户端的流生成器的代码

    DataStream<Event> stream1  = envrionment
                .addSource(new EventGenerator(2,60,1,1,100, 200 ))
                .name("stream 1")
                .setParallelism(parallelism_for_stream_rr);

    DataStream<Event> stream2  = envrionment
            .addSource(new EventGenerator(3,60,1,2,10, 20 ))
            .name("stream 2")
            .setParallelism(parallelism_for_stream_rr);


    DataStream<Event> stream3  = envrionment
            .addSource(new EventGenerator(5,60,1,3,30, 40 ))
            .name("stream 3")
            .setParallelism(parallelism_for_stream_rr);


    DataStream<Event> merged = stream1.union(stream2,stream3);

    merged.print();



            // sending data to Mobile Cep via socket

            merged.map(new MapFunction<Event, String>() {

                @Override
                public String map(Event event) throws Exception {
                    String tuple = event.toString();


                    return tuple + "\n";

                }
            }).writeToSocket("localhost", 9000, new SimpleStringSchema() );
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

共有1个答案

卢出野
2023-03-14

Flink的套接字源和它的接收器都不会启动TCP服务器并等待传入连接。它们都是连接到已经启动的TCP服务器的客户端。这也是为什么在启动作业之前必须启动netcat的原因。如果您想从套接字写入和读取,那么您必须编写一个TCP服务器,该服务器可以缓冲传入的数据,并在客户机连接到该数据时转发这些数据。

 类似资料:
  • 我有两个通过字符串与套接字通信的Java项目。一个是客户端,另一个是服务器。服务器通过“ServerSocket”接受连接,并使用新创建的“套接字”创建新的“会话”(线程)。 同时,客户端只有一个“套接字”,一旦连接了这个套接字,客户端就会创建一个“ClientSession”(线程,非常类似于“session”)。 我想要的是服务器通过“username”字符串询问客户机他的用户名,客户机用他的

  • 我正在尝试使用MQTT-Rime网关构建MQTT-SN。我成功地用串行套接字将传感器数据发送到网关,但我的网关也必须将一些数据发送到微尘。我的问题是,我不知道如何从远程的套接字读取数据。有人能帮我吗?

  • **服务器** **客户** 并且我运行它,ip是环回(127.0.0.1),端口是7755。 客户端套接字的对等ip:127.0.0.1 客户端套接字的对等端口6311 -1 0 0 0 我的问题是,如果监听(服务器)套接字只处理客户端的连接,那么它的对等端不应该存在?那个错误的IP和端口是什么?只是垃圾值?

  • 问题内容: 我目前正在尝试使用boost-asio的套接字API通过网络将一些JSON数据从客户端传输到服务器。我的客户基本上是这样做的: 在服务器端,我可以选择各种功能。我想使用JsonCpp解析接收到的数据。在研究JsonCpp API(http://jsoncpp.sourceforge.net/class_json_1_1_reader.html)时,我发现Reader可以在char数组或

  • 我想找个人来澄清一个我相信我有的误解。我在读Java上的套接字编程,并不真正理解事情的实际流程是什么。以下是我对以下场景的两种可能的解释。 创建套接字实例 null 服务器接受客户端之后的连接会是这样的吗?(客户端与服务器创建的套接字通信,而不是与服务器套接字通信) 或者图表会像这样吗?(客户端通过ServerSocket与服务器保持通信。服务器通过接受连接时创建的套接字进行通信。)

  • 线程“main”java.net.ConnectException:连接超时:在java.net.dualStackplainsockeTimpl.Connect0(本机方法)在java.net.dualStackplainsockeTimpl.socketConnect(DualStackplainsockeTimpl.java:69)在java.net.abstractplainsockeTi