当前位置: 首页 > 知识库问答 >
问题:

Spring Webflux型合并流量的安全性

阎裕
2023-03-14

在服务器上,我添加了一个字符串类型的流量连接:

Flux<String> addDataConnection() {
        return Flux.create(emitter -> {
            fluxSinks.add(emitter);
        });
    }

然后,每当我收到消息时,它就会传播消息:

void propagateMessage(String messageContent) {
        fluxSinks.parallelStream()
                .forEach(fluxSink -> fluxSink.next(messageContent));
    }

这些数据来自外部来源,因此传入事件的频率超出了我的控制范围。

Flux<String> addHeartbeat() {
    return Flux.interval(Duration.ofSeconds(2))
            .map(i -> "heartbeat");
}
Flux<String> initiateFluxStreams() {
        return Flux.merge(addDataConnection(), addHeartbeat());
    }

共有1个答案

公孙森
2023-03-14

对于不同类型的合并通量,没有什么“神奇”的方法可以做到这一点,因为您不能保证这些类型将有任何共同的祖先(除了Object,但正如您所指出的,这并不太有用)

如果您的类型是foo,则可以使用foo的“特殊”实例(或子类)来表示空的heartbeat类型,但我并不特别喜欢这种方法--它可能会做出设计牺牲,以便在其中插入某个类型。

我首选的方法是使用流量 > optional,其中的“heartbeat”触发器只是空选项。类似于:

Flux<Foo> addDataConnection() {
    return Flux.create(emitter -> {
        fluxSinks.add(emitter);
    });
}

Flux<Optional<Foo>> initiateFluxStreams() {
    return Flux.merge(addDataConnection().map(f -> Optional.of(f)), addHeartbeat().map(h -> Optional.empty()));
}

Flux<Long> addHeartbeat() {
    return Flux.interval(Duration.ofSeconds(2));
}

然后,您只需筛选并映射结果流量,以便在处理之前清除心跳:

fluxOfOptionalFoo
        .filter(Optional::isPresent)
        .map(Optional::get);

我更喜欢这种方法,原因有很多:

  • 不需要对基础对象结构进行任何更改
  • 更具描述性,您有一个流量类型“可能在那里,也可能不在那里”
  • 容易过滤掉空选项(如果您想用heartbeat函数做一些特别的事情,则不需要)
 类似资料:
  • 问题内容: 我在寻找可以收集流的方法,但它是空安全的。如果collection为null,则返回空流。像这样: 我创建了自己的方法: 但是我很好奇,标准的JDK中是否有类似的东西? 问题答案: 您可以使用org.apache.commons.collections4.CollectionUtils :: emptyIfNull函数:

  • 我有两个AWS实例,< code>i-1和< code>i-2。它们分别位于不同的安全组中:< code>sg-1和< code>sg-2。两款机器都有弹性IP。 < code>sg-2被配置为允许来自< code>sg-1的所有流量,而不考虑端口、源IP或协议。 当试图与的流量实际上来自其弹性IP这一事实。 这是意料之中的吗?除了手动将< code>i-1的弹性IP添加到< code>sg-2之

  • 有没有办法通过合并多个单声道来创建流量,合并的单声道正在读取前一个响应的值。 诸如此类:

  • 问题内容: 请参阅下面的简单示例,该示例计算列表中每个单词的出现次数: 最后是。 但是我的数据流很大,我想并行化作业,所以我写: 但是我注意到这很简单,所以我想知道是否需要显式请求并发映射以确保线程安全: 非并行收集器可以安全地与并行流一起使用吗?从并行流中收集时,我是否应该仅使用并发版本? 问题答案: 非并行收集器可以安全地与并行流一起使用吗?从并行流中收集时,我是否应该仅使用并发版本? 在并行

  • 尽管在通常情况下编写一个按照预期运行的软件很简单, 但想要确保没有人能够以出乎意料的方式使用它就困难多了。 在 Solidity 中,这一点尤为重要,因为智能合约可以用来处理通证,甚至有可能是更有价值的东西。 除此之外,智能合约的每一次执行都是公开的,而且源代码也通常是容易获得的。 当然,你总是需要考虑有多大的风险: 你可以将智能合约与公开的(当然也对恶意用户开放)、甚至是开源的网络服务相比较。

  • 为了安全地运行CGI等程序,Ruby设置了安全结构。 Ruby的安全模型由“对象的污染”和“安全级别”构成。 对象的污染 Ruby有时会认为对象“遭到了污染”,这主要有两种用途。 第一,以不安全的输入为基础制成的对象就是“受污染”的对象,不能用作“危险操作”的参数。这主要是为了防止恶意数据导致程序作出一些意外的危险动作。 第二,可以使安全对象(未遭污染的对象)得到保护,免遭不安全对象的威胁。若安全