当前位置: 首页 > 知识库问答 >
问题:

如何将反应流连接到quarkus/smallrye中的AMQP代理

蓬宾白
2023-03-14

我正在尝试将我的Artimis-MQ客户端迁移到quarkus微服务。当试图发送消息时,我总是会收到一个“流未连接”错误。

我试图遵循答案中的建议(使用Microprofile-reactive-messaging):Quarkus与ActiveMQ?

dependencies {
    // ...
    implementation enforcedPlatform("io.quarkus:quarkus-bom:0.15.0")
    implementation 'io.quarkus:quarkus-resteasy'
    implementation 'io.quarkus:quarkus-resteasy-jsonb'
    implementation 'io.quarkus:quarkus-smallrye-metrics'
    implementation 'io.quarkus:quarkus-smallrye-health'
    implementation 'io.quarkus:quarkus-smallrye-reactive-messaging'
    implementation 'io.quarkus:quarkus-vertx'

    implementation 'io.smallrye.reactive:smallrye-reactive-messaging-amqp:0.0.8'
}
@Path("/send")
public class MessageResource {
    @Inject
    @Stream("emitter-topic")
    Emitter<String> topic;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String send(@QueryParam("msg") final String msg) {
        final String message = Objects.requireNonNullElse(msg, "").isBlank() ? "no message" : msg;
        topic.send(message);
        return "sent: " + message;
    }
}
smallrye.messaging.source.emitter-topic.type=io.smallrye.reactive.messaging.amqp.Amqp
smallrye.messaging.source.emitter-topic.address=test-amqp
smallrye.messaging.source.emitter-topic.containerId=test-clientid
smallrye.messaging.source.emitter-topic.host=localhost
smallrye.messaging.source.emitter-topic.port=5672

我不断地看到“非法状态例外”。我可以从日志中看出,smallrye找到了amqp连接器,但从未实际初始化连接。

2019-06-02 12:19:50,055 INFO  [io.sma.rea.mes.ext.MediatorManager] (main) Deployment done... start processing
2019-06-02 12:19:50,101 INFO  [io.sma.rea.mes.imp.ConfiguredStreamFactory] (main) Found incoming connectors: [class io.smallrye.reactive.messaging.amqp.Amqp]
2019-06-02 12:19:50,102 INFO  [io.sma.rea.mes.imp.ConfiguredStreamFactory] (main) Found outgoing connectors: [class io.smallrye.reactive.messaging.amqp.Amqp]
2019-06-02 12:19:50,103 INFO  [io.sma.rea.mes.imp.ConfiguredStreamFactory] (main) Stream manager initializing...
2019-06-02 12:19:50,106 INFO  [io.sma.rea.mes.imp.LegacyConfiguredStreamFactory] (main) Stream manager initializing...
2019-06-02 12:19:50,125 INFO  [io.sma.rea.mes.ext.MediatorManager] (main) Initializing mediators
2019-06-02 12:19:50,127 INFO  [io.sma.rea.mes.ext.MediatorManager] (main) Connecting mediators
2019-06-02 12:19:50,136 INFO  [io.quarkus] (main) Quarkus 0.15.0 started in 1.487s. Listening on: http://[::]:8080
2019-06-02 12:19:50,137 INFO  [io.quarkus] (main) Installed features: [cdi, resteasy, resteasy-jsonb, smallrye-health, smallrye-metrics, smallrye-reactive-messaging, smallrye-reactive-streams-operators, vertx]
2019-06-02 12:20:01,964 ERROR [io.und.request] (executor-thread-1) UT005023: Exception handling request to /send: org.jboss.resteasy.spi.UnhandledException: java.lang.IllegalStateException: Stream not yet connected
        at org.jboss.resteasy.core.ExceptionHandler.handleApplicationException(ExceptionHandler.java:106)
        at org.jboss.resteasy.core.ExceptionHandler.handleException(ExceptionHandler.java:372)
        at org.jboss.resteasy.core.SynchronousDispatcher.writeException(SynchronousDispatcher.java:209)
        at org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:496)
        at org.jboss.resteasy.core.SynchronousDispatcher.lambda$invoke$4(SynchronousDispatcher.java:252)
        at org.jboss.resteasy.core.SynchronousDispatcher.lambda$preprocess$0(SynchronousDispatcher.java:153)
        at org.jboss.resteasy.core.interception.jaxrs.PreMatchContainerRequestContext.filter(PreMatchContainerRequestContext.java:362)
        at org.jboss.resteasy.core.SynchronousDispatcher.preprocess(SynchronousDispatcher.java:156)
        at org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:238)
        at org.jboss.resteasy.plugins.server.servlet.ServletContainerDispatcher.service(ServletContainerDispatcher.java:234)
        at io.quarkus.resteasy.runtime.ResteasyFilter$ResteasyResponseWrapper.sendError(ResteasyFilter.java:72)
        at io.undertow.servlet.handlers.DefaultServlet.doGet(DefaultServlet.java:175)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:686)

共有1个答案

贲永思
2023-03-14

好吧,我解决了我的问题。在application.properties中,我反向使用了sourcesink。将发射器-主题描述为接收器而不是源解决了这个问题。

 类似资料:
  • 目前我有一个Quarkus应用程序,它从一个Kafka话题消费,并在另一个Kafka话题上生产。它使用SmallRye反应消息传递。效果很好。由于外部更改,要在其上生成的主题和要从其消费的主题将在不同集群上的Kafka服务器上(并且不应该/不能组合在一个集群中)。 在这里添加一个服务器并没有帮助,它会尝试将数据传播到代理上,这不是我的本意。 是否可能连接到多个集群(可能每个主题设置一个服务器)?在

  • 我遵循以下指南https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2/testing/testing.html在没有kafka代理的情况下进行测试。我已设置了以下QuarKustestResource:

  • 我有两个可用的gRPCendpoint和一个ServerInterceptor,它应该在抛出异常时拦截异常。 gRPC服务定义如下: gRPC服务使用SmallRye Mutiny Responsive来处理请求。 其他帖子解释说,应该重写onHalfClose方法,并插入try/catch块来捕获自定义异常,然后映射到StatusRuntimeException gRPC可以使用的状态。我尝试了

  • 我对反应式编程非常陌生,希望转换一个返回<代码>列表的方法 我尝试创建Publisher等,但找不到合适的示例来进一步进行。有人能提供一些指导或建议吗? 正如下面的代码所提供的,我有一个方法可以提供随机字符串列表。在这里,我需要等到所有元素都被添加到List后才能返回。我想实现反应式流方法,而不是等待所有元素被添加到列表中,我希望在生成后立即返回它。 我相信使用可以实现这一点,您能让我知道如何实现

  • 我有一个Quarkus应用程序,它使用hibernate-reactive-panache运行一些查询,然后处理结果并通过Rest调用返回JSON。对于每个Rest调用,执行5 DB查询,最后一个查询将加载大约20k行: 前4次都很好,但第5次我接到电话 我检查了https://quar kus . io/guides/reactive-SQL-clients # pooled-connectio

  • 我想使用这个扩展:[Quarkus Smallrye响应消息Kafka] 但是在我的应用程序中,主题的名称是事先不知道的,它是根据运行时从用户那里收到的消息指定的。如何在没有注释的情况下以编程方式指定主题名称和与主题相关的设置?(仅用于向Kafka发送消息- 或者这些配置应该在运行时以编程方式设置 因为我不认识路,所以我用了本地的Kafka驱动程序