我有一个场景,我看到了不同的行为。共有3种不同的服务
现在,在我在第二个服务中启用事务和隔离级别后,问题是我无法读取任何消息,如果我在第二个服务中禁用了事务,我就能够读取所有消息。
编辑:下面是我的yml的样子
- kafka:
- binder:
- transaction:
- transaction-id-prefix:
- brokers:
- configuration:
all my consumer properties (ssl, sasl)
已更新(yml with spring cloud):
spring:
cloud.stream:
bindings:
input:
destination: test_input
content-type: application/json
group: test_group
output:
destination: test_output
content-type: application/json
kafka.binder:
configuration:
isolation.level: read_committed
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
ssl.truststore.location: jks
ssl.truststore.password:
ssl.endpoint.identification.algorithm: null
brokers: broker1:9092,broker2:9092,broker3:9092
auto-create-topics: false
transaction:
transaction-id-prefix: trans-2
producer:
configuration:
retries: 2000
acks: all
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
ssl.truststore.location: jks
ssl.truststore.password:
ssl.endpoint.identification.algorithm: null
更新(带有Spring kafka的yml):
spring:
kafka:
bootstrap-servers: broker1:9092,broker2:9092,broker3:9092
consumer:
properties:
isolation.level: read_committed
ssl.truststore.location: truststore.jks
ssl.truststore.password:
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
producer:
transaction-id-prefix: trans-2
retries: 2000
acks: all
properties:
ssl.truststore.location: truststore.jks
ssl.truststore.password:
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
admin:
properties:
ssl.truststore.location: truststore.jks
ssl.truststore.password:
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
使用动态目标更新
Caused by: java.lang.IllegalStateException: Cannot perform operation after producer has been closed
at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:810) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:819) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.0.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:423) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:351) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:209) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
尝试了动态目标解析器问题的两种方法:动态目标解析器
这对我来说很好;这些都在同一个应用程序中,但这不会有什么不同。。。
@SpringBootApplication
@EnableBinding(Channels.class)
public class So55419549Application {
public static void main(String[] args) {
SpringApplication.run(So55419549Application.class, args);
}
@Bean
public IntegrationFlow service1(MessageChannel out1) {
return IntegrationFlows.from(() -> "foo", e -> e
.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
.log(Level.INFO, m -> "s1 " + m.getPayload())
.channel(out1)
.get();
}
@StreamListener("in2")
@SendTo("out2")
public String service2(String in) {
System.out.println("s2 " + in);
return in.toUpperCase();
}
@StreamListener("in3")
public void service3(String in) {
System.out.println("s3 " + in);
}
}
interface Channels {
@Output
MessageChannel out1();
@Input
MessageChannel in2();
@Output
MessageChannel out2();
@Input
MessageChannel in3();
}
和
spring:
cloud:
stream:
bindings:
out1:
destination: topic1
in2:
group: s2
destination: topic1
out2:
destination: topic2
in3:
group: s3
destination: topic2
kafka:
binder:
transaction:
transaction-id-prefix: tx
bindings:
in2:
consumer:
configuration:
isolation:
level: read_committed
in3:
consumer:
configuration:
isolation:
level: read_committed
kafka:
producer:
# needed again here so boot declares a TM for us
transaction-id-prefix: tx
retries: 10
acks: all
logging:
level:
org.springframework.kafka.transaction: debug
和
2019-03-29 12:57:08.345 INFO 75700 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler
: s1 foo
2019-03-29 12:57:08.353 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2019-03-29 12:57:08.353 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@6790c874, txId=txs2.topic1.0]]
s2 foo
2019-03-29 12:57:08.357 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2019-03-29 12:57:08.358 DEBUG 75700 --- [container-0-C-1] o.s.k.t.KafkaTransactionManager : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@820ef3d, txId=txs3.topic2.0]]
s3 FOO
编辑
绑定器未在事务管理器上启用事务同步。作为解决方法,添加
TransactionSynchronizationManager.setActualTransactionActive(true);
发送到您的StreamListener。
我在活页夹上打开了一个错误。
对于已由侦听器容器启动的事务,我们需要为所有应用实例设置相同的事务id前缀。对于仅生产事务,我们需要为每个实例设置不同的值。 我在应用程序中使用了Spring Cloud Stream Kafka活页夹,它既有事务类型,也有属性Spring。云流动Kafka。粘合剂交易事务id前缀用于创建公共事务管理器。 我想知道如何使这一切正常工作,因为似乎你不能同时拥有这两种方式。
我正在编写一个程序,其中几个生产者生成一些应该由几个消费者处理的数据。由于每条数据的消耗大约需要100ms,而目标平台有很多处理器,所以在我看来,每个生产者和每个消费者都得到自己的线程似乎是很自然的。我的问题是:Qt信号/插槽是将数据块从生产者传递到消费者的好方法吗?还是建议更好的解决方案(强烈首选Qt)。 为了防患于未然,制作者每小时产生几十万个数据。
我发现Kafka有一些非常奇怪的地方。 我有一个制片人,有三个经纪人: 然后,我尝试使用新的API运行消费者: 我什么都没有!但是如果我使用旧的API: 我收到我的留言了! 我怎么了? PS:我用的是Kafka10
我正在与Java一起研究生产者-消费者问题的多生产者和消费者用例。代码在GitHub上。同样的实现适用于单个生产者和消费者用例,但对于多生产者和消费者用例却表现得很奇怪。 我有一些关于输出的问题: 一开始,所有生产者和一个消费者都有锁: 我想所有的线程都应该竞争锁,并且最多应该有一个线程拥有所有时间的锁?是不是所有的制作人都共用这个锁?当生产者线程t1持有锁时,使用者线程t5是如何获得锁的? 它运
我试图用阻塞队列实现一些消费者-生产者问题。为了达到某种目的,我决定编写文件搜索工具。 我认为搜索机制是递归工作的,每个新目录都将有新的线程池来提高搜索速度。 我的问题是,我不知道如何实现最终停止打印线程(消费者)的机制——当搜索线程完成工作时。 我试图用一些想法来做到这一点,比如毒丸,但它效果不佳(线程在打印任何结果之前停止)。任何想法我该怎么做? 下面是一些代码: 搜索机制: } 打印机: }
与服务人员一起进行脱机工作,从https://angular.io/guide/service-worker-config 应用程序脱机工作正常唯一的问题是,如果我使用应用程序脱机工作正常,但缓存中的文件在服务器上的文件更新时从不更新。如果我使用“installMode”:“lazy”`应用程序无法脱机工作。 以下是。 还尝试了,但仍然缓存未在更改时更新的文件