当前位置: 首页 > 知识库问答 >
问题:

在Flink/Kafka中从同一个插座读写

郦磊
2023-03-14

美好的一天

我正在通过flink/kafka接收数据(流数据)。我连接的端口与我需要写回消息的端口相同

TCP/IP-

// 1. Connect to TCP Stream (TCP (Socket) -> Kafka Stream INPUT)
val consumer_stream = env.socketTextStream(url, port, '\n')

// 2. Processing Data
.....

// 3. Write result to kafka topic 
consumer_stream.addSink(new FlinkKafkaProducer09[String](broker_url, topic_name, new SimpleStringSchema()))

// 4. Send result back to connected url ie.(Ref Step 1 URL) (url+port)
(This is where I need Assistance)

连接到 URL 和端口工作正常。我接收并处理写入主题的数据 现在我还需要写回我连接到的同一URL和端口。{由于 Url 和端口可以同时发送和接收数据}

我把它写到另一个端口

// write to Different PORT
val socket_write: DataStreamSink[String] = out_data.writeToSocket(url, diff_port, new SimpleStringSchema())

这个管用...问题是试图写入同一个端口。当我使用我从中读取的同一个端口时...flink作业失败

有什么想法吗

问候

共有1个答案

闾丘照
2023-03-14

您可以使用自定义的SinkFunction将数据写回URL。

stream.addSink(new SinkFunction<String>() {
    // initialise the client to send the data
    public void invoke(String value) throws Exception {
        // send here.               
    }
}

或使用<code>SocketClientSink</code>进行

env.socketTextStream("localhost", 5555).map(x => { println(x); x }).addSink(new SocketClientSink[String]("localhost", 5555, new SimpleStringSchema))
 类似资料:
  • 我试图连接到我的本地机器上的Kafka(2.1),并在Flink(1.7.2)附带的scalashell中读取。 下面是我正在做的: 之后,最后一条语句我得到了以下错误: 我已经创建了一个名为“topic”的主题,我能够通过另一个客户端正确地生成和读取来自它的消息。我正在使用java版本1.8.0\u 201,并遵循https://ci.apache.org/projects/flink/flin

  • 我有一个用例,我需要处理存储在s3中的文件中的数据,并将处理后的数据写入本地文件。s3 文件会不断添加到存储桶中。 每当一个文件被添加到bucket中时,完整的路径被发布到一个kafka主题。 我想在一份工作中实现以下目标: 从kafka(无界流)读取文件名 一个计算器,它接收文件名,从s3(第二个源)读取内容并创建数据流 处理数据流(为每行添加一些逻辑) 接收到文件 我设法完成了设计的第一、第三

  • 我们有一个Kafka主题,有源源不断的数据。为了处理它,我们有一个无状态的Flink管道,它使用该主题并写入另一个主题。 我们是不是漏掉了什么?我们误会什么了吗?有没有更好的解决办法? 谢了!

  • 在Flink中,我执行以下代码: 我推出3次同样的工作。 如果我用一个代理执行这段代码,它工作得很好,但是用3个broke(在3个不同的机器上)只读取一个分区。 null

  • 我试图了解Socketchannes和NIO的总体情况。我知道如何使用常规套接字,以及如何为每个客户机服务器创建一个简单的线程(使用常规阻塞套接字)。 所以我的问题是: 什么是袜子通道 当我使用SocketChannel而不是Socket时,额外得到了什么 通道和缓冲区之间的关系是什么 什么是选择器 文档中的第一句话是

  • 我一直在努力阅读java项目中的文本文件,我一整天都在寻找解决方案,我尝试了很多方法,但没有一个有效。其中一些:(另外,我必须使用文件和扫描程序类) 异常线程"main"java.nio.file.NoSuchFileExc0019: test\fileTest.txt 异常线程"main"java.lang.NullPointerExc0019 线程“main”java中出现异常。木卫一。Fil