我实现了一个使用Spring Kafka的基本Spring Boot应用程序。我希望我的制作人在第一个<代码>之前连接到Kafka主题。send()被调用,但我找不到这样做的方法。这可能吗?
日志显示 KafkaTemplate 仅在我在 16:12:44
触发 .send
方法后连接到 Kafka 主题:
2021-11-24 16:12:12.602 INFO 63930 --- [ main] c.e.k.KafkaProducerExampleApplication : The following profiles are active: dev
2021-11-24 16:12:13.551 INFO 63930 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)
2021-11-24 16:12:13.559 INFO 63930 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2021-11-24 16:12:13.559 INFO 63930 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.53]
2021-11-24 16:12:13.613 INFO 63930 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2021-11-24 16:12:13.613 INFO 63930 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 974 ms
2021-11-24 16:12:13.989 INFO 63930 --- [ main] pertySourcedRequestMappingHandlerMapping : Mapped URL path [/v2/api-docs] onto method [springfox.documentation.swagger2.web.Swagger2Controller#getDocumentation(String, HttpServletRequest)]
2021-11-24 16:12:14.190 INFO 63930 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2021-11-24 16:12:14.190 INFO 63930 --- [ main] d.s.w.p.DocumentationPluginsBootstrapper : Context refreshed
2021-11-24 16:12:14.207 INFO 63930 --- [ main] d.s.w.p.DocumentationPluginsBootstrapper : Found 1 custom documentation plugin(s)
2021-11-24 16:12:14.239 INFO 63930 --- [ main] s.d.s.w.s.ApiListingReferenceScanner : Scanning for api listing references
2021-11-24 16:12:14.336 INFO 63930 --- [ main] c.e.k.KafkaProducerExampleApplication : Started KafkaProducerExampleApplication in 7.055 seconds (JVM running for 7.341)
2021-11-24 16:12:44.550 INFO 63930 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-11-24 16:12:44.550 INFO 63930 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2021-11-24 16:12:44.551 INFO 63930 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2021-11-24 16:12:44.649 INFO 63930 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
智能生命周期
bean为我们工作,谢谢。
@Component
class KafkaProducer (
private val userChangeLogTemplate: KafkaTemplate<String, UserChangeLog>
private val kafkaProperties: MizenKafkaProperties
) : NotificationProducer{
@Bean
fun connector(pf: ProducerFactory<String, Any>): SmartLifecycle {
return object : SmartLifecycle {
override fun stop() {}
override fun start() {
pf.createProducer().close()
}
override fun isRunning(): Boolean {
return false
}
}
}
override fun sendUserChangeLog(message: UserChangeLog) {
userChangeLogTemplate.send(kafkaProperties.userChangeLogTopic, message)
}
}
使用非事务生产者
(不提供TransactionIdPrefix),当您第一次调用KafkaTemplate.send时,它会委托给的生产工厂
以获取生产者
的单个实例。此时,因为以前没有生产者
的单个实例,生产者工厂
将为您创建这个实例(这就是为什么您看到了日志生产者配置:生产者配置值…
)。这个生产者实例现在被所有客户端使用/共享。
因此,如果您想预先创建上面的生产者实例,您可以直接在< code>ProducerFactory上调用它,例如:
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
kafkaProducerFactory.createProducer();
return kafkaTemplate;
...
关于Linh Vu的回答,最好不要在bean定义中创建连接——这在应用程序上下文的生命周期中还为时过早。
相反,添加一个实现智能生命周期
的 bean 并在 start()
中创建连接;这样,上下文将在连接之前完全初始化。
@Bean
SmartLifecycle connector(ProducerFactory<Object ,Object> pf) {
return new SmartLifecycle() {
@Override
public void stop() {
}
@Override
public void start() {
pf.createProducer().close();
}
@Override
public boolean isRunning() {
return false;
}
};
}
我有一个Spring启动应用程序,它使用来自 Kafka 集群中某个主题(例如 topic1)的消息。这就是我的代码目前的样子。 现在我想从另一个Kafka集群中的不同主题开始消费。一种方法是为此创建另一个bean。但是有更好的方法吗?
我正在使用 kafka 和 spring boot,我需要将 JSON 对象发送到 kafka,关键是我能够将一个对象作为配置 KafkaTemplate 的 JSON 发送,但仅适用于此对象。 但是如果现在我想发送一个新的DTO对象呢?我是否必须声明一个新的
我将kafka-connect docker映像()部署到ECS/fargate,为我的ECS服务分配了一个安全组,该安全组允许传入的zooper keeper和kafka bootstrap服务器通信量(纯文本和TLS),并分配了一个IAM角色,该角色允许我的ECS任务对MSK集群运行kafka操作,但connect集群在尝试从MSK集群获取代理列表时仍会超时。 在AWS中,kafka conn
我在我的微服务中定义了一个Kafka消费者。我已经部署了我的应用程序的5个实例。我已将ConvoltKafkaListenerContainerFactory中的并发参数设置为2。这是否意味着每个应用程序实例有2个消费者实例,或者我连接的整个主题有2个消费者实例?
我有一个服务器a,在服务器a中,我安装了kafka并启动了kafka和Zookeeper。我还创建了一个主题作为my_topic。现在我有一个应用程序B运行在服务器B中,应用程序B有一些数据,我想把这些数据推送到服务器A中的my_topic。我是否也需要在服务器B中安装kafka并在服务器B中创建一个生产者?如果是,如何将来自服务器B的消息推送到服务器A中的主题?介质是什么?
当我尝试使用start-slave.sh连接到主服务器时,spark://master:port如这里所述 我正在得到这个错误日志 我尝试使用本地ip和本地名称访问主服务器(我设法同时使用和不使用密码ssh到主服务器、用户和root用户) 谢了!