当前位置: 首页 > 知识库问答 >
问题:

Quarkus从Kafka主题中提取并将JSON有效负载发送到RESTendpoint

羊时铭
2023-03-14

因此,我使用Quarkus和Microprofile Reactive Messaging framework(带有SmallRye Kafka连接器)以及RxJava2 Flowable streams对象来进行Reactive消息接收/发送。我有一个微服务,它使用@incoming和@outgoing注释来正确地使用通道来拉回主题并将消息推送到主题。

但是,现在我想修改它,这样我仍然可以从Kafka主题中提取,然后将JSON负载发送到RESTendpoint。据我所知,没有Quarkus HTTP兼容的SmallRye连接器。有没有人碰巧知道有什么方法可以让它工作?

示例函数

    @Incoming("pre-check")
    @Outgoing("post-check")
    @Broadcast
    public Flowable<CustomMessage> publishToApi(CustomMessage customMessage) {

        LOGGER.info("Message received from topic = {}", customMessage);

        if (customMessage.ready) {
            return Flowable.just(customMessage);
        }
        else {
            return Flowable.empty();
        }
    }

共有1个答案

江鹏
2023-03-14

删除@outgoing并使用任何HTTP客户端处理消息以将其发送到某个服务器

或者使输出通道成为客户端的响应

 类似资料:
  • 控制器 Json有效负载从邮递员发送

  • 我正在尝试从同一个Kafka主题反序列化不同的JSON有效负载。这里问的其他问题引导我进行了第一次尝试,但我无法让它运行。 正如Gary所提到的(这里),有一些提示(JsonSerializer.ADD\u TYPE\u INFO\u HEADERS),但当我发送和接收这两条消息时,我会收到一个异常。 ... LoggingErrorHandler在ConsumerRecord中已经提到了一个(正

  • 问题内容: 我想从以下位置检索JSON数据:https : //git.eclipse.org/r/#/c/11376/ 要求网址: 请求方法: 请求标头: 请求有效负载: 我已经尝试过这个答案,但是我得到了。 谁能帮我解决这个问题? 谢谢。 问题答案: 以下代码对我有用。 方法实现:

  • 我的任务是编写一个java程序,从一个主题中读取xml,将其转换为JSON并发送到另一个主题。我已经创建了一个将xml转换为json的程序,但我不知道接下来该怎么做,比如如何使用该主题中的xml并将其发送给另一个主题。

  • 问题内容: 我有我的自定义Java对象,希望利用JVM的内置序列化将其发送到Kafka主题,但是序列化失败并出现以下错误 org.apache.kafka.common.errors.SerializationException:无法将com.spring.kafka.Payload类的值转换为value.serializer中指定的org.apache.kafka.common.serializ

  • 我试图使用pyspark将每日批次的数据发送到Kafka主题,但我当前收到以下错误: Traceback(最近的最后一次调用): File", line 5, in File"/usr/local/rms/lib/hdp26_c5000/park2/python/pyspark/sql/readwriter.py", line 548, in保存自己。_jwrite.save()File"/usr