当前位置: 首页 > 知识库问答 >
问题:

Quarkus SSE Redis订阅

逄宁
2023-03-14

我喜欢做一个有redis回应的SSE。在quarkus中订阅。

我有一个来自quarkus快速入门的简单SSE示例

 @GET
  @Produces(MediaType.SERVER_SENT_EVENTS)
  @SseElementType(MediaType.TEXT_PLAIN)
  @Path("{name}/streaming")
  public Multi<String> greeting(@org.jboss.resteasy.annotations.jaxrs.PathParam String name) {
    return Multi.createFrom().publisher(vertx.periodicStream(2000).toMulti())
        .map(l -> String.format("Hello %s! (%s)%n", name, new Date()));
  }

这个效果很好,每2秒钟我就会收到Hello。。。。在我的web浏览器中

现在我尝试订阅Redis,所以我应该会收到Redis的消息。

Redis示例:

(cmd window 1)
SUBSCRIBE message-channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "message-channel"
3) (integer) 1

(cmd window 2)
PUBLISH  message-channel HelloWorld
(integer) 1

(cmd window 1)
1) "message"
2) "message-channel"
3) "HelloWorld"

现在,我用quarkus SSE尝试以下方法:

  @Inject
  ReactiveRedisClient reactiveRedisClient;

 @GET
  @Produces(MediaType.SERVER_SENT_EVENTS)
  @SseElementType(MediaType.TEXT_PLAIN)
  @Path("sse/redissse")
  public Multi<String> redissse() {
    List<String> subscriberList = new ArrayList();
    subscriberList.add("message-channel");

    return reactiveRedisClient.subscribe(subscriberList)
        .onItem().transformToMulti(keys -> Multi.createFrom().iterable(keys))
        .onItem().castTo(String.class);
  }

我收到的是一个例外:

WARNING [io.ver.red.cli.imp.RedisConnectionImpl] (vert.x-eventloop-thread-0) No handler waiting for message: [subscribe, message-channel, 1]

有人能支持我吗?有一个简单的例子吗?我对此一无所知,我无法接收带有“订阅”发布的Redis消息。

任何建议。。。

共有2个答案

逑景铄
2023-03-14

现在,我执行以下操作:

  @Inject
  @RedisClientName("second")
  RedisClient redisClient2;

void onStart(@Observes StartupEvent ev) throws IOException {
  this.redisClient2.subscribe(List.of("message-channel"));
}


  @GET
  @Produces(MediaType.SERVER_SENT_EVENTS)
  @SseElementType(MediaType.TEXT_PLAIN)
  @Path("/redis/subscribe")
  public Publisher<String> subscribechannel(){
     return eventBus.<String>consumer("io.vertx.redis.message-channel").toPublisherBuilder()
        .map(Message::body)
        .buildRs();
  }

现在它可以工作了,但是如果我从多个浏览器进行SSE,它们会共享事件。所以每个消费者(浏览器)之后只有一个收到事件。

陈增
2023-03-14

我没有使用Redis pub sub,但我确实使用了Redis streams,我需要做的是:

`

return Multi.createBy().repeating()
    .supplier(() -> this.reactiveRedisClient.subscribe(subscriberList)
                        .onItem().transformToMulti(keys -> Multi.createFrom().iterable(keys))
                        .onItem().castTo(String.class))
        .indefinitely()
        .onItem().disjoint();

`

我想既然pub-sub是非阻塞的,它只运行一次,就不会等到另一条消息到达。您必须以反应式的方式实现自己的while(true)循环。

 类似资料:
  • mysql会员订阅数据表的设计应该如何设计?产品有订阅商品和非订阅的,每次都只能购买一个。 订阅有1个月 3个月的 每次到期自动扣费。如果在一个月类购买了几个订阅商品 则扣费按照最新的一个 然后延长到期时间。其实是不是每次订阅都不需要生成新订单的 翻阅了其他资料都找不到很好的设计

  • 订阅指过滤表(table)的规则,Canal 客户端发送给客户端订阅规则,那么服务端将会推送符合规则的表数据过来,采用正则匹配。 允许所有表:.\*\\\\..\*

  • 我是新的数据流和发布子工具在GCP。 需要将prem过程中的电流迁移到GCP。 当前流程如下: 我们有两种类型的数据馈送 Full Feed–其adhoc作业–完整XML的大小约为100GB(单个XML–非常复杂的一个–完整的数据–ETL作业处理此XML并将其加载到约60个表中) 单独的ETL作业用于处理完整提要。ETL作业过程完全馈送并创建负载就绪文件,所有表将被截断并重新加载 源系统每30分钟

  • 我想问以下问题:例如,考虑一下我购买专业订阅。过了一段时间(几个月左右)我决定取消我的订阅...那么我已经用CodenameOne开发的应用程序会发生什么呢?它们会继续在谷歌Play商店和/或苹果应用商店上提供吗?我主要关心的是推送通知功能...也就是说,Play Store和/或App Store中已经发布的应用程序是否会保持完整的功能?

  • 假设我有10个产品,并想提供他们在一个订阅。 正如你所看到的,每个月的价格都不一样。这在木业商业中是可能的吗?(已订阅产品/计划,但每月支付金额不等) 我在WooCommerce文档中找不到任何讨论此场景的内容。 多谢了。

  • 可用版本: >= 2.3.0 注意: 暂不支持键空间通知功能 Pika 发布订阅命令 以下为Pub/Sub发布订阅命令, 与Redis完全兼容 PUBSUB subcommand [argument [argument ...]] PUBLISH channel message SUBSCRIBE channel [channel ...] PSUBSCRIBE pattern [pattern

  • 什么是 Subscription ? - Subscription 是表示可清理资源的对象,通常是 Observable 的执行。Subscription 有一个重要的方法,即 unsubscribe,它不需要任何参数,只是用来清理由 Subscription 占用的资源。在上一个版本的 RxJS 中,Subscription 叫做 "Disposable" (可清理对象)。 var observ

  • 我必须从两个订阅服务器获取数据,但我总是获取第一个订阅服务器的数据。 我有一个数据共享服务: 在离开搜索组件之前,我调用update方法。 现在,我在results组件上。我得到的共享数据如下: 我的问题是:我需要共享数据来订阅另一个可观察的数据。首先,我构造了一个物体乘坐,在我调用搜索方法之后 问题是我总是从数据服务获取数据,而不是从api调用。api工作导致我在存储中拦截结果,而不是在组件中。