问题:我试图逐行读取一个大文件,并将消息放入RabbitMQ中。我想在文件末尾提交给rabbitMQ。如果文件中的任何记录是坏的,那么我想撤销发布到队列的消息。
技术:Spring boot、Spring cloud stream、RabbitMQ
你能帮我实现这个过渡的东西吗。我知道如何使用spring cloud Stream读取文件并发布到队列。
@Transactional
public void sendToQueue(List<Data> dataList) {
for(Data data:dataList)
{
this.output.send(MessageBuilder.withPayload(data).build());
counter++; // I can see message getting published in the queue though management plugin
}
LOGGER.debug("message sent to Q2");
}
spring:
cloud:
stream:
bindings:
# Q1 input channel
tpi_q1_input:
destination: TPI_Q1
binder: local_rabbit
content-type: application/json
group: TPIService
# Q2 output channel
tpi_q2_output:
destination: TPI_Q2
binder: local_rabbit
content-type: application/json
group: TPIService
# Q2 input channel
tpi_q2_input:
destination: TPI_Q2
binder: local_rabbit
content-type: application/json
group: TPIService
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
rabbit:
bindings:
tpi_q2_output:
producer:
#autoBindDlq: true
transacted: true
#batchingEnabled: true
tpi_q2_input:
consumer:
acknowledgeMode: AUTO
#autoBindDlq: true
#recoveryInterval: 5000
transacted: true
spring.cloud.stream.default-binder: local_rabbit
@EnableTransactionManagement
public class QueueConfig {
@Bean
public RabbitTransactionManager transactionManager(ConnectionFactory cf) {
return new RabbitTransactionManager(cf);
}
}
@StreamListener(JmsQueueConstants.QUEUE_2_INPUT)
@Transactional
public void receiveMesssage(Data data) {
logger.info("Message Received in Q2:");
}
>
将生成器配置为使用事务...producer.transacted=true
在事务范围内发布消息(使用RabbitTransactionManager)。
对#2使用普通的Spring事务机制(@transact
注释或TransactionTemplate
)。
@SpringBootApplication
@EnableBinding(Source.class)
@EnableTransactionManagement
public class So50372319Application {
public static void main(String[] args) {
SpringApplication.run(So50372319Application.class, args).close();
}
@Bean
public ApplicationRunner runner(MessageChannel output, RabbitTemplate template, AmqpAdmin admin,
TransactionalSender sender) {
admin.deleteQueue("so50372319.group");
admin.declareQueue(new Queue("so50372319.group"));
admin.declareBinding(new Binding("so50372319.group", DestinationType.QUEUE, "output", "#", null));
return args -> {
sender.send("foo", "bar");
System.out.println("Received: " + new String(template.receive("so50372319.group", 10_000).getBody()));
System.out.println("Received: " + new String(template.receive("so50372319.group", 10_000).getBody()));
try {
sender.send("baz", "qux");
}
catch (RuntimeException e) {
System.out.println(e.getMessage());
}
System.out.println("Received: " + template.receive("so50372319.group", 3_000));
};
}
@Bean
public RabbitTransactionManager transactionManager(ConnectionFactory cf) {
return new RabbitTransactionManager(cf);
}
}
@Component
class TransactionalSender {
private final MessageChannel output;
public TransactionalSender(MessageChannel output) {
this.output = output;
}
@Transactional
public void send(String... data) {
for (String datum : data) {
this.output.send(new GenericMessage<>(datum));
if ("qux".equals(datum)) {
throw new RuntimeException("fail");
}
}
}
}
和
spring.cloud.stream.bindings.output.destination=output
spring.cloud.stream.rabbit.bindings.output.producer.transacted=true
和
Received: foo
Received: bar
fail
Received: null
Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?
我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,
如何提供一个在条件不匹配时调用的处理程序? 如果我有两个处理程序,第一个有条件,第二个没有条件,当第一个条件匹配时,两个处理程序都被调用。我如何避免这种情况?
我试图在我的应用程序中集成spring cloud stream kinesis,但我找不到手册中的所有配置选项。我看过这个链接: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/as
我不知道spring-cloud-gateway是否支持从领事注册中心读取路由,就像Zuul一样。 另外,我用--debug启动了这一行:
是否可以使用函数()样式,使用多个独立的函数/绑定来实现反应性SCS应用程序?我发现的所有示例总是只注册一个具有默认绑定的函数bean。我想注册多个,每个都有自己的绑定。 传统上,这可以使用来完成,但现在不推荐使用函数支持。