我的应用程序由一个带有POST方法的REST控制器组成,用于提交我必须使用生产者发送到主题的数据。
这是控制器
@RestController
@RequestMapping("/producer/v1")
public class ApiController {
@Autowired
Producer producer;
@ApiOperation(value = "Invia un messaggio al topic", response = String.class)
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"),
@ApiResponse(code = 400, message = "Bad Request"),
@ApiResponse(code = 500, message = "Failure")})
@PostMapping(path = "/sendMessage", consumes = "application/json", produces = "application/json")
public void sendMessage(@RequestBody() @Valid MyMessage myMessage) {
producer.produce(myMessage.getId(), myMessage);
}
}
使用Spring-Cloud-Stream版本
@Component
public class Producer {
private final ProducerChannelInterface producerChannelInterface;
public Producer(ProducerChannelInterface producerChannelInterface) {
this.producerChannelInterface = producerChannelInterface;
}
public void produce(int messageId, Object message) {
MessageChannel messageChannel = producerChannelInterface.kafkaOutChannel();
messageChannel.send(
MessageBuilder
.withPayload(message)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader(PARTITION_KEY, messageId)
.build()
);
}
}
从3.1版开始,@Output
和@EnableBinding
注释被弃用,所以我尝试切换到新的方式来设置生产者,我就是这样工作的
@Component
public class Producer {
@Bean
public Supplier<Message<Object>> produce() {
return () -> {
int messageId = ...;
Object message = new MyMessage();
return MessageBuilder
.withPayload(message)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader(PARTITION_KEY, messageId)
.build();
};
}
}
最后在应用程序中。yaml我有这个
spring:
cloud:
stream:
bindings:
#Channel name
produce-out-0:
destination: spring-cloud-topic
contentType: application/json
producer:
partitionKeyExpression: headers['partitionKey']
partitionCount: 1
errorChannelEnabled: true
...
kafka:
bindings:
produce-out-0:
producer:
configuration:
retries: 10
max.in.flight.requests.per.connection: 1
request.timeout.ms: 20000
现在的问题是,当我启动应用程序时,方法产生()
被无限调用(我在主题中看到消息)。然后使用供应商似乎我被迫在供应商内部定义消息数据。我需要从Rest控制器传递数据。
你能帮忙吗?谢谢
普通的SCS3供应商有几个“问题”,但您可以解决这些问题。主要问题是投票。默认情况下,SCS将每隔几秒钟轮询一次消息。当然,您可以使用DefaultPollerProperties
来配置它。但是,当它轮询消息时,您必须提供一个。如果您可以设置适当的轮询超时,那么您可以使用队列来提供消息,并且在供应商
中,您可以对该队列使用轮询
。但这不是正确的解决方案,因为在rest中,您无法计算消息之间的延迟,您希望立即发布消息,当然,不要等待rest调用之间的最长时间。。。更好的方法是使用反应流。因此,当消息通过rest接口到达时,您可以发送消息,然后消息将立即发布。
您可以使用StreamBridge
将消息推送到任何流中。这会将消息推送到流中,但流必须存在(非反应流也存在同样的问题)。你可以把它注入到你的服务中,并通过网桥发送信息。请注意,它接受一个对象
参数,因此必须为给定流提供正确的类型。
@Autowired
private lateinit var bridge: StreamBridge
然后你就可以打电话了
bridge.send("produce-out-0", messageObject)
反应流是更好的方法。
您可以使用前面的空Flux
方法。这不会发出任何消息。所以您只能使用StreamBridge发布消息。
@Bean
public Supplier<Flux<Message<MyType>>> produce() {
return () -> {
Flux.empty<>()
};
}
更好的方法是创建一个接收器
,并通过以下方式发送消息:
Sinks.Many<Object> processor = Sinks.many().unicast().onBackpressureBuffer<Object>()
然后你可以用它作为供应商的来源
@Bean
public Supplier<Flux<Message<Object>>> produce() {
return () -> {
processor.asFlux()
};
}
并在其中插入一条信息:
processor.emitNext(messageObject, Sinks.EmitFailureHandler.FAIL_FAST)
本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要
生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f
所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职
我正在尝试编写具有两种方法的批处理邮件服务: :可以发送邮件,由生产者调用 :刷新服务。消费者应该取一个List,然后调用另一个(昂贵的)方法。通常只有在达到批量大小后才应该调用昂贵的方法。 这有点类似于这个问题:生产者/消费者-生产者将数据添加到集合而不阻塞,消费者批量消费集合中的数据 使用具有超时的< code>poll()可以做到这一点。但是,如果生产者不想等待超时,它应该能够刷新邮件服务,
我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统
一、线程间通信的两种方式 1.wait()/notify() Object类中相关的方法有notify方法和wait方法。因为wait和notify方法定义在Object类中,因此会被所有的类所继承。这些方法都是final的,即它们都是不能被重写的,不能通过子类覆写去改变它们的行为。 ①wait()方法: 让当前线程进入等待,并释放锁。 ②wait(long)方法: 让当前线程进入等待,并释放锁,