当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream功能生产者/消费者

郎恺
2023-03-14

我的应用程序由一个带有POST方法的REST控制器组成,用于提交我必须使用生产者发送到主题的数据。

这是控制器

@RestController
@RequestMapping("/producer/v1")
public class ApiController {
    @Autowired
    Producer producer;

    @ApiOperation(value = "Invia un messaggio al topic", response = String.class)
    @ApiResponses(value = {
            @ApiResponse(code = 200, message = "Success"),
            @ApiResponse(code = 400, message = "Bad Request"),
            @ApiResponse(code = 500, message = "Failure")})
    @PostMapping(path = "/sendMessage", consumes = "application/json", produces = "application/json")
    public void sendMessage(@RequestBody() @Valid MyMessage myMessage) {
        producer.produce(myMessage.getId(), myMessage);
    }
}

使用Spring-Cloud-Stream版本

@Component
public class Producer {
    private final ProducerChannelInterface producerChannelInterface;

    public Producer(ProducerChannelInterface producerChannelInterface) {
        this.producerChannelInterface = producerChannelInterface;
    }
    public void produce(int messageId, Object message) {
        MessageChannel messageChannel = producerChannelInterface.kafkaOutChannel();
        messageChannel.send(
                MessageBuilder
                        .withPayload(message)
                        .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                        .setHeader(PARTITION_KEY, messageId)
                        .build()
        );
    }
}

从3.1版开始,@Output@EnableBinding注释被弃用,所以我尝试切换到新的方式来设置生产者,我就是这样工作的

@Component
public class Producer {
    @Bean
    public Supplier<Message<Object>> produce() {
        return () -> {
            int messageId = ...;
            Object message = new MyMessage();
            return MessageBuilder
                .withPayload(message)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .setHeader(PARTITION_KEY, messageId)
                .build();
        };
    }
}

最后在应用程序中。yaml我有这个

spring:
  cloud:
    stream:
      bindings:
        #Channel name
        produce-out-0:
          destination: spring-cloud-topic
          contentType: application/json
          producer:
            partitionKeyExpression: headers['partitionKey']
            partitionCount: 1
            errorChannelEnabled: true

...
      kafka:
        bindings:
          produce-out-0:
            producer:
              configuration:
                retries: 10
                max.in.flight.requests.per.connection: 1
                request.timeout.ms: 20000

现在的问题是,当我启动应用程序时,方法产生()被无限调用(我在主题中看到消息)。然后使用供应商似乎我被迫在供应商内部定义消息数据。我需要从Rest控制器传递数据。

你能帮忙吗?谢谢

共有1个答案

谭曦
2023-03-14

普通的SCS3供应商有几个“问题”,但您可以解决这些问题。主要问题是投票。默认情况下,SCS将每隔几秒钟轮询一次消息。当然,您可以使用DefaultPollerProperties来配置它。但是,当它轮询消息时,您必须提供一个。如果您可以设置适当的轮询超时,那么您可以使用队列来提供消息,并且在供应商中,您可以对该队列使用轮询。但这不是正确的解决方案,因为在rest中,您无法计算消息之间的延迟,您希望立即发布消息,当然,不要等待rest调用之间的最长时间。。。更好的方法是使用反应流。因此,当消息通过rest接口到达时,您可以发送消息,然后消息将立即发布。

您可以使用StreamBridge将消息推送到任何流中。这会将消息推送到流中,但流必须存在(非反应流也存在同样的问题)。你可以把它注入到你的服务中,并通过网桥发送信息。请注意,它接受一个对象参数,因此必须为给定流提供正确的类型。

@Autowired
private lateinit var bridge: StreamBridge

然后你就可以打电话了

bridge.send("produce-out-0", messageObject)

反应流是更好的方法。

您可以使用前面的空Flux方法。这不会发出任何消息。所以您只能使用StreamBridge发布消息。

@Bean
public Supplier<Flux<Message<MyType>>> produce() {
    return () -> {
      Flux.empty<>()
    };
}

更好的方法是创建一个接收器,并通过以下方式发送消息:

Sinks.Many<Object> processor = Sinks.many().unicast().onBackpressureBuffer<Object>()

然后你可以用它作为供应商的来源

@Bean
public Supplier<Flux<Message<Object>>> produce() {
    return () -> {
      processor.asFlux()
    };
}

并在其中插入一条信息:

 processor.emitNext(messageObject, Sinks.EmitFailureHandler.FAIL_FAST)
 类似资料:
  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f

  • 所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职

  • 我正在尝试编写具有两种方法的批处理邮件服务: :可以发送邮件,由生产者调用 :刷新服务。消费者应该取一个List,然后调用另一个(昂贵的)方法。通常只有在达到批量大小后才应该调用昂贵的方法。 这有点类似于这个问题:生产者/消费者-生产者将数据添加到集合而不阻塞,消费者批量消费集合中的数据 使用具有超时的< code>poll()可以做到这一点。但是,如果生产者不想等待超时,它应该能够刷新邮件服务,

  • 我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统

  • 一、线程间通信的两种方式 1.wait()/notify() Object类中相关的方法有notify方法和wait方法。因为wait和notify方法定义在Object类中,因此会被所有的类所继承。这些方法都是final的,即它们都是不能被重写的,不能通过子类覆写去改变它们的行为。 ①wait()方法: 让当前线程进入等待,并释放锁。 ②wait(long)方法: 让当前线程进入等待,并释放锁,