当前位置: 首页 > 知识库问答 >
问题:

使用SmallRye反应式消息动态发布/订阅MQTT

谢旻
2023-03-14

我们尝试使用小型反应式消息传递发布和订阅MQTT协议。我们设法通过以下简单代码将消息实际发布到特定主题/频道

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;

@ApplicationScoped
public class Publish {
    
    @Outgoing("pao")
    public Multi<String> generate() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> "A Message in here");
    }
}

我们想要做的是在我们需要生成()方法时以某种方式调用动态主题,用户将在其中定义它。那是我们的问题,但后来我们在github中从该存储库中找到了这些类。包名称io.smallrye.reactive.messaging.mqtt

例如,我们发现有一个类说它对MQTT代理进行了发布调用(Mosquito server up)。

在该语句中SendingMqttMessage

UPDATE(发布完成)最后向mqtt代理(蚊子服务器)发出发布请求,所有这些都使用用户配置的动态主题。正如我们发现之前的ClassSendingMqttMessage根本不应该使用。我们发现我们还需要和发射器来实际发出带有动态主题的发布请求。

    @Inject
    @Channel("panatha")
    Emitter<String> emitter;

    @POST
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response createUser(Device device) {
        System.out.println("New Publish request: message->"+device.getMessage()+" & topic->"+device.getTopic());
        emitter.send(MqttMessage.of(device.getTopic(), device.getMessage()));
        return Response.ok().status(Response.Status.CREATED).build();
    }

现在我们需要了解如何动态订阅主题。


共有1个答案

庾远航
2023-03-14

首先让我们回到同一个页面:
反应式消息不适用于主题,而是适用于频道。这一点很重要,因为您可以专门读取或写入某个频道。因此,如果您想同时提供这两个频道,您需要配置指向同一主题的两个频道,一个传入频道和一个传出频道

你用发射器做了一个很好的开始,但是你仍然缺乏你想要的动态特性。在您的示例中,您通过CDI获得了该发射器
这就是我们所需要的,使其动态化,因为我们可以在运行时使用如下CDI动态注入bean:

private Emitter<byte[]> dynamicEmitter(String topic){
        return CDI.current().select(new TypeLiteral<Emitter<byte[]>>() {}, new ChannelAnnotation(topic)).get();
    }

另请注意,我正在创建一个类型为byte[]的发射器,因为这是根据其留档的小Rey-mqtt连接器(版本3.4.0)目前唯一支持的类型。

要从反应式消息传递通道读取消息,您可以使用发射器的对应物,即发布者。
它可以用于模拟:

private Publisher<byte[]> dynamicReceiver(String topic){
        return CDI.current().select(new TypeLiteral<Publisher<byte[]>>() {}, new ChannelAnnotation(topic)).get();
    }

然后,您可以以任何您喜欢的方式处理这些Date。作为演示,它将其挂在一个简单的RESTendpoint上

@GET
    @Produces(MediaType.SERVER_SENT_EVENTS) 
    public Multi<String> stream(@QueryParam("topic") String topic) {
        return Multi.createFrom().publisher(dynamicReceiver(topic)).onItem().transform(String::new); 
    }
    
    @GET
    @Path("/publish")
    public boolean publish(@QueryParam("msg") String msg, @QueryParam("topic") String topic) {
        dynamicEmitter(topic).send(msg.getBytes());
        return true; 
    }

创建此解决方案时,我遇到了一些您应该知道的陷阱:

  1. Quarkus会删除任何“未使用”的CDI-Bean。因此,如果您想动态注入它们,您需要排除它们,或者关闭该功能。
  2. 必须配置以这种方式注入的所有通道。否则注入将失败。
  3. 由于某些原因,(即使完全禁用删除)我无法动态注入发射器,除非它们曾经被注入其他地方。
 类似资料:
  • 我是新的数据流和发布子工具在GCP。 需要将prem过程中的电流迁移到GCP。 当前流程如下: 我们有两种类型的数据馈送 Full Feed–其adhoc作业–完整XML的大小约为100GB(单个XML–非常复杂的一个–完整的数据–ETL作业处理此XML并将其加载到约60个表中) 单独的ETL作业用于处理完整提要。ETL作业过程完全馈送并创建负载就绪文件,所有表将被截断并重新加载 源系统每30分钟

  • 我想使用这个扩展:[Quarkus Smallrye响应消息Kafka] 但是在我的应用程序中,主题的名称是事先不知道的,它是根据运行时从用户那里收到的消息指定的。如何在没有注释的情况下以编程方式指定主题名称和与主题相关的设置?(仅用于向Kafka发送消息- 或者这些配置应该在运行时以编程方式设置 因为我不认识路,所以我用了本地的Kafka驱动程序

  • 问题内容: 我试图弄清Reactor Project,现在正在寻找取消订阅的方法。我知道在进行例如Flux的订阅后,我可以获取可用于发送onCancel信号的Cancellation对象的引用,但这仅是在进行订阅之后,并且我需要将该引用保留在某种Collection中。 有更好的方法来获取Cancellation对象吗?或只是取消订阅。也许某个地方包含对所有活动订阅的引用-是的,这太棒了… 问题答

  • 问题 你有一个基于线程通信的程序,想让它们实现发布/订阅模式的消息通信。 解决方案 要实现发布/订阅的消息通信模式, 你通常要引入一个单独的“交换机”或“网关”对象作为所有消息的中介。 也就是说,不直接将消息从一个任务发送到另一个,而是将其发送给交换机, 然后由交换机将它发送给一个或多个被关联任务。下面是一个非常简单的交换机实现例子: from collections import default

  • 微信文档:https://developers.weixin.qq.com/miniprogram/dev/api-backend/open-api/subscribe-message/subscribeMessage.addTemplate.html 组合模板并添加至帐号下的个人模板库 $tid = 563; // 模板标题 id,可通过接口获取,也可登录小程序后台查看获取 $kidLi

  • 开普勒消息目前分为三大类:公告,告警和通知。 通知中根据不同的操作事件类型,分为十几个事件。每个事件都跟项目操作相关。便于接收项目操作变更的通知。 分类 事件 公告 Alarm 告警 Proclaim 通知 Build,Apply,Audit,Delete,Rollback,Logging,Reboot,Command,Storage,Extend... 订阅界面: 用户中心,点击头像,下拉菜单→