当前位置: 首页 > 知识库问答 >
问题:

如何使用api rest传递flink流作为参数并返回转换后的流

咸琪
2023-03-14

我是阿帕奇·Flink的新手。我有一个flink scala项目,它使用来自kafka集群的数据,我需要将流结果作为参数传递,以使用返回此转换流的api。这是我的密码

class Testing {
  def main(args: Array[String]): Unit = {}
  def streamTest(): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "test1.server.local:9092,test2.server.local:9092,test3.server.local:9092")
    val consumer_test = new FlinkKafkaConsumer[String]("topic_test", new SimpleStringSchema(), properties)
    consumer_test.setStartFromEarliest()
    val stream =  env.addSource(consumer_test).setParallelism(5)
    val api_test = "http://api-test.server.local/test/?msg=%s"
    // Here I need pass stream as parameter to api and return transformed stream
    env.execute()
  }   
}

有什么帮助吗?

共有2个答案

富辰阳
2023-03-14

这是我的最终代码希望能帮上忙

class Testing extends Serializable{
  def main(args: Array[String]): Unit = {}
  def streamTest(): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "test1.server.local:9092,test2.server.local:9092,test3.server.local:9092")
    val consumer_test = new FlinkKafkaConsumer[String]("topic_test", new SimpleStringSchema(), properties)
    consumer_test.setStartFromEarliest()
    val stream =  env.addSource(consumer_test)
    // Here I need pass stream as parameter to api and return transformed stream
    val result = stream.flatMap{
      (str, out: Collector[String]) =>
        val api_test = "http://api-test.server.local/test/?msg=%s"
        out.collect {
          getUrl(api_test.format(URLEncoder.encode(str, "UTF-8")))
        }        
    }    
    env.execute()
  }

  def getUrl(url: String): String = {
    val timeout = 5
    val config = RequestConfig.custom.setConnectTimeout(timeout * 1000).setConnectionRequestTimeout(timeout * 1000).setSocketTimeout(timeout * 1000).build
    val client: CloseableHttpClient = HttpClientBuilder.create.setDefaultRequestConfig(config).build
    val request = new HttpGet(url)
    val response = client.execute(request)
    val entity = response.getEntity
    val get_result = EntityUtils.toString(entity)
    get_result
  }     
}
齐振
2023-03-14

您应该使用熟悉的任何http/rest库,然后使用asyncIO。

 类似资料:
  • 问题内容: 我正在为我的Java应用程序设计一个简单的数据访问对象。我有一些类(记录),它们代表像和中的表格中的一行。 我想有一种方法来获取特定类型的所有记录。 就目前而言,我是这样的: 但是我想有一个像这样的多态方法(错误): 使用示例: 如何用Java做到这一点? 问题答案: 既然您说不想在不同的类中使用数据访问方法(在Anish的回答中),所以我想为什么不尝试这样的方法。 编辑: 我想再添加

  • 现在,我有一个类,它能够添加相同数据类型的向量。对于不同的类型,我必须调用显式转换: 是否可以将向量隐式转换为函数“运算符+()”?下面是我的向量代码:

  • 问题内容: 我是存储过程的新手。 假设我有一个IDCategory(int),并将其传递给存储过程。在正常情况下,它会: 找到我所有与IDCategory等于我要告诉您的IDCategory的列表。 因此,它会显示3清单,并创建一个带有列的表: IDListing,IDCategory,价格,卖方,图像。 我怎样才能做到这一点? 问题答案: 要从存储过程填充数据集,您将具有如下代码: 您的连接字符

  • 问题内容: 如何在Java中将 转换为a 并返回? 我正在尝试将a转换为a,以便能够通过TCP连接发送。另一方面,我想将其转换为。 问题答案: 或包装在一个类中,以避免重复创建: 由于它变得如此流行,我只想提一提,我认为在大多数情况下,最好使用像这样的库。而且,如果你对库有一些奇怪的反对意见,则可能应该首先针对本机Java解决方案考虑此答案。我认为我的答案真正要解决的主要问题是,你不必自己担心系统

  • 我有静态方法在我的类 这就是定义 这里用的是 这是我得到的一个错误 E0167类型为“void(TV_DepthCamAgent::)(int count,int copied_file)”的参数与类型为“void()(int,int)”的参数不兼容 错误C3867“TV_DepthCamAgent::progress_callback”:非标准语法;使用' 我做错了什么?

  • 问题内容: 我在将int64_t转换为char数组并返回时遇到麻烦。我不知道下面的代码有什么问题,对我来说,这完全符合逻辑。该代码适用于如图所示的代码,但第二个数字显然不在int64_t范围内。 输出是 该代码还给出了两个我不知道如何摆脱的警告。 问题答案: 这很简单,问题在于您正在移位char数组中的位,但是大小为4个字节(向上转换为),因此您的移位超出了范围。尝试将其替换为您的代码: 这样,您