<int-kafka:message-driven-channel-adapter
listener-container="container"
auto-startup="true"
send-timeout="30000"
channel="channelA"/>
<bean id="container" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" parent="kafkaMessageListenerContainerAbstract">
<constructor-arg>
<bean class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg
name="topics"
value="test"/>
<property name="transactionManager" ref="KafkaTransactionManager"/>
</bean>
</constructor-arg>
</bean>
.
.
.
<int-kafka:outbound-channel-adapter kafka-template="kafkaTemplate"
auto-startup="true"
channel="channelB"
topic="output"/>
<bean id="dbsenderTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<entry key="bootstrap.servers" value="localhost:9092"/>
</map>
</constructor-arg>
<property name="transactionIdPrefix" value="mytest-"/>
<property name="producerPerConsumerPartition" value="false"/>
</bean>
</constructor-arg>
</bean>
启动应用程序的代码如下:
GenericXmlApplicationContext tempContext = new GenericXmlApplicationContext("beans.xml");
tempContext.close();
//POINT A.
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
GenericXmlApplicationContext context = new GenericXmlApplicationContext();
context.load("beans.xml");
context.refresh();
//POINT B
在A点,我只是关闭了上下文来检查哪些beans已经关闭,然后Hibernate60秒,以便有时间检查JMX控制台。我注意到,即使上下文是关闭的,但是生产者仍然在JMX中注册。之后,我跟踪了代码,注意到在上下文关闭时,KafkaTemplate调用以下代码:
public void flush() {
Producer<K, V> producer = getTheProducer();
try {
producer.flush();
}
finally {
closeProducer(producer, inTransaction());
}
}
protected void closeProducer(Producer<K, V> producer, boolean inTx) {
if (!inTx) {
producer.close(this.closeTimeout);
}
}
这意味着它创建了一个生产者,但是因为它是事务性的,所以它不会被关闭。
if (!this.cache.contains(this)
&& !this.cache.offer(this)) {
this.delegate.close(closeTimeout);
}
这使得在关闭上下文时,DefaultKafkaProducerFactory
的Destroy
方法将清除缓存并物理地关闭生成器。但是在我的情况下,创建了应用程序上下文,但是在消费和生成任何消息之前,上下文是关闭的,只有Kafkatemplate
的flush
方法在内部调用,强制它创建事务生成器,但不将其放入缓存中。由于我没有启动生产者,而KafkaTemplate是在flush上启动的,所以DefaultKafkaProducerFactory
在使用它们之前将它们放在缓存中不是很好吗?
我阅读下面的文章,以了解我建立的ELK环境的日志技术。https://tpodolak.com/blog/tag/kibana/ 我在日志中添加了输入路径 C/日志/*.log。我有测试.log文件,它不为空,它有: 我的日志(C:\监控\logstash\日志\C:\监控\logstash\logs.log):
通过调用一个不带括号的函数,在注释和答案中多次说明在生产中不使用此类代码。为什么? 我是JavaScript的初学者,你可以从问题中猜到。如果有人能用外行的语言表达他们的答案,那就太好了,不过也请各位有经验的JS人员回答,他们可能需要一个更详细和技术上更详细的回答。 在生产中使用没有括号的函数可能或确实出错的例子将是对答案的一个很好的补充。 下面是调用函数的示例代码,不带括号,取自该问题的答案。
我认为1534236469超出范围!Leetcode:7。反向整数我无法通过测试输入1534236469。为什么?返回范围为[Integer.MAX_VALUE,Integer.MIN_VALUE],其他应返回零 谢谢你的帮助
TCP可以检测数据包是否成功发送,所以与其等待pong命令,为什么不在ping命令发送时检查是否有错误呢?我只是不觉得需要乒乓球。
问题内容: 已关闭 。这个问题是基于观点的。它当前不接受答案。 想改善这个问题吗? 更新问题,以便通过编辑此帖子以事实和引用的形式回答。 7年前关闭。 改善这个问题 这个问题困扰了我很长时间(正如我之前的问题所证明的):为什么比(确切地说是更pythonic的)更好? 对于那些不知道的人,该语句已更改为Python 3.0中的函数。正式文档在PEP 3105中 ,动机在Guido van Ross
为什么jaxb在下面生成一个名为的类型参数? 这个文件是由JavaTM体系结构用于XML绑定(JAXB)参考实现生成的: 也许这是一个我不知道的设计模式?