当前位置: 首页 > 知识库问答 >
问题:

WebFlux:从流内对象添加订阅服务器上下文值

牟子真
2023-03-14

我有一个Webflux应用程序,我正在使用订阅服务器上下文填充MDC值,以便它们向下游传播。我已经实现了这个项目中的类来处理订阅者之间的MDC传输,设置webfilter来向传入的请求添加请求ID,并且可以在日志中看到请求ID作为MDC的一部分。

public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
    return chain.filter(exchange)
            .contextWrite(Context.of("requestId", UUID.randomUUID().toString()));
}

日志输出:

2021-09-30 17:15:29,963 [reactor-tcp-nio-2] INFO  c.p.l.s.RepoService:33 - MDC[requestId=ec0b68cf-ba4d-4c7f-afa4-f67fc97ebcbf] - Found user
public Mono<Response> doProcessing(String userId, Object object) {
    return userRepo.findUserById(userId) //Returns UserEntity object
            .flatMap(userEntity -> service.doMoreProcessing(userEntity, object))
            .contextWrite(Context.of("userId", userId, "email", userEntity.getEmail()));
}
public Mono<Response> doProcessing(String userId, Object object) {
    return userRepo.findUserById(userId) //Returns UserEntity object
            .flatMap(userEntity -> Mono.deferContextual(ctx -> Mono.just(userEntity).contextWrite(Context.of(ctx).putAll(Context.of("userId", userId, "email", userEntity.getEmail()).readOnly()))))
            .flatMap(userEntity -> service.doMoreProcessing(userEntity, object));
}
public Mono<Response> doProcessing(String userId, Object object) {
    return userRepo.findUserById(userId) //Returns UserEntity object
            .transformDeferredContextual((userEntityMono, contextView) -> userEntityMono.flatMap(userEntity -> Mono.just(userEntity).contextWrite(Context.of(contextView).putAll(Context.of("userId", userId, "email", userEntity.getEmail()).readOnly()))))
            .flatMap(userEntity -> service.doMoreProcessing(userEntity, object));
}

...但是什么都没起作用。如何从属于进程流的对象向订阅服务器上下文添加数据?

共有1个答案

高德水
2023-03-14

你可以试试这样的东西

public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        return chain.filter(exchange)
                .contextWrite(Context.of("your-key", new HashMap<String, Object>()));
}

public Mono<Response> doProcessing(String userId, Object object) {
    return userRepo.findUserById(userId)
            .doOnEach(signal -> {
                if (!signal.isOnNext()) return;
                Optional<Map<String, Object>> optional = signal.getContextView().getOrEmpty("your-key");
                optional.ifPresent(map -> {
                    map.put("userId", signal.get().getId());
                    map.put("email", signal.get().getEmail());
                });
            });
}
 类似资料:
  • 我写了一个逻辑,使用Spring反应器库来获取所有运算符,然后在异步模式下为每个运算符(分页)获取所有设备。 创建了一个通量来获取所有运算符,然后订阅它。 现在,对于每个运算符,我正在获取需要多个订阅才能获得设备mono的设备,该设备通过订阅MONO获得所有页面异步。 此代码工作正常。但我的问题是,从内部订阅mono是一个好主意吗?

  • 有一个由 方法以编程方式创建的 flux: 有一个rest控制器: 有几个web客户端(在独立的进程中): JavaScript中有几个EventSource实例: 只有前两个“订户”将开始接收消息(不管它是web客户端还是EventSource实例)。另一个将打开连接,获取HTTP 200状态,但是事件流保持为空。客户端和服务器端都没有错误。 我不明白,对“2个订阅者”的限制在哪里。如果我想支持

  • 我基本上需要从python服务器向设备发送命令,设备将发布对主题的回复,我需要捕获回复服务器端。要从服务器发布到设备,我正在使用boto3物联网数据模块。但是我如何订阅另一个主题以从设备获得回复?似乎没有办法使用aws python库。我需要使用像paho这样的遗传MQTT客户机吗? 谢谢你。

  • 我正在尝试使用spring-integration-kafka-2.1.0。在我公司的项目中发布。但是,由于下面列出的例外情况,它不起作用:org。springframework。信息。MessageDeliveryException:Dispatcher没有频道“org”的订户。springframework。网状物上下文WebApplicationContext:/order。“奥Kafka”

  • 刚开始玩推送通知,我设法处理了所有的订阅过程,我正在数据库中保存endpoint和密钥。我的问题是,如果有的话,我应该遵循什么策略来删除数据库中的旧字幕详细信息?。所以,如果有人允许通知,他们撤销了权限,我怎么知道是谁从数据库中删除了详细信息?。因为如果用户取消订阅,我只会从pushManager获得空订阅。

  • 我试图利用固有的WSO2ESB主题发布到jms队列。我已经创建了主题,并提供了一个订阅者URL:jms:/topictest?transport.jms.destinationtype=queue。然而,当我将消息发布到主题时,它不能被传递到队列。日志生成以下内容 “系统无法从jms:/queue?destination=topictest URL推断传输信息。” 另外,我似乎不知道如何发布到WS