当前位置: 首页 > 知识库问答 >
问题:

Spring Aws动态信息未按顺序使用

戚甫
2023-03-14

我正在用1个shrad将100条消息推送到流媒体。

spring:
  cloud:
    stream:
      bindings:
        myOutBound:
          destination: my-stream
          contentType: application/json

我正在循环推送消息以进行测试

@EnableBinding(MyBinder.class)
public class MyProcessor {

  @Autowired
  private MyBinder myBinder;

  public void processRollup() {
    List<MyObject> myObjects =  IntStream.range(1, 100)
        .mapToObj(Integer::valueOf)
        .map(s-> new MyObject(s))
        .collect(toList());
    myObjects.forEach(messagePayload ->{
      System.out.println(messagePayload.getId());
      myBinder.myOutBound()
          .send(MessageBuilder.withPayload(messagePayload)
              .build());
        }
    );
  }

}

我正在消费如下消息

spring:
  cloud:
    stream:
      bindings:
        RollUpInboundStream:
          group: my-consumer-group
          destination: my-stream
          content-type: application/json

消息使用未排序。

我错过了什么吗?

共有1个答案

巫马泓
2023-03-14

有几件事需要考虑。首先,Binder中的生产者默认基于异步模式的Kinesis MessageHandler

messageHandler.setSync(producerProperties.getExtension().isSync());

因此,即使它寻找您以正确的顺序一个接一个地发送这些消息,这并不意味着它们以相同的顺序到达AWS上的流。

此外,即使您以同步模式发送它们,也不能保证它们以相同的顺序在AWS上结算。

请参见此处:亚马逊运动和保证订购

此外,您还可以通过显式的sequenceNumber在同一切分中实现订单保证:

要保证严格增加排序,请串行写入分片并使用SequenceNumberForOrding参数。

https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html

不幸的是,Kinesis活页夹目前不支持该选项,但我们可以通过显式AwsHeaders来克服它。在将消息发送到活页夹的输出目的地之前,在消息中设置序列号:

String sequenceNumber = messageHeaders.get(AwsHeaders.SEQUENCE_NUMBER, String.class);
    if (!StringUtils.hasText(sequenceNumber) && this.sequenceNumberExpression != null) {
        sequenceNumber = this.sequenceNumberExpression.getValue(getEvaluationContext(), message, String.class);
    }
 类似资料:
  • 消费者输出: 我对Kafka是新手,不知道我错过了什么来让它正确地工作。根据Kafka文档,消息的顺序是为我的用例保证的,所以一定有一些愚蠢的错误,我正在做,无法找出它。 除了Kafka,我还有别的选择吗? 谢谢

  • 我每个websocket接收几十条消息,这些消息可能只差几毫秒就能到达。我需要用操作来处理这些数据,这些操作有时会花费一些时间(例如,在DB中的插入)。为了处理接收到的新消息,必须完成对前一个消息的处理。 我的第一个想法是用Node.js Bull(用Redis)准备一个队列,但恐怕太长了,无法运行。这些消息的处理必须保持快速。 我尝试使用JS迭代器/生成器(直到现在我还从未使用过),我测试了如下

  • 尽管有些系统管理员喜欢将他们自己与其它使用成堆的老旧打印机的办公室相隔绝, 但是我们还是需要随时与其他部门交换信息。 例如,你可能需要插入数据到你的 Puppet 配置清单,这些数据是从外部数据源派生出来的。 generate 函数在这方面相当有用。 准备工作 在 Puppetmaster 上使用如下代码创建脚本 /usr/local/bin/latest-puppet.rb: #!/usr/bi

  • 我在一个返回一个巨大选择的过程中工作,我不想定义它是如何排序的。我不想从三个字段中选择一个,如果它是升序还是降序,如果三个选项都没有定义,它会默认返回降序中的第一个字段 这边 当然,这不起作用... mysql指责单词DESC和ASC中的错误,我怎么能让这个工作??

  • 我正在尝试从页面对象示例运行示例 它在Windows 7上运行良好,但当我试图在Linux ( Fedora 15)上运行时,我遇到了一个错误: 我尝试将系统.set属性设置为: 但是得到了同样的错误。

  • 当我使用chrome驱动程序时,它显示驱动程序服务器启动超时。当我尝试使用Firefox时,它显示会话意外退出。我正在尝试使用python中的selenium在服务器上运行远程驱动程序,我应该怎么做?无法创建新会话。 未知错误:等待驱动程序服务器启动时超时。构建信息:版本:'3.141.0',版本:'2ecb7d9a',时间:'2018-10-31T20:22:52'系统信息:主机:'server