当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka 模板 - 在 Spring 启动时连接到 Kafka 主题

司徒宇
2023-03-14

我实现了一个使用Spring Kafka的基本Spring Boot应用程序。我希望我的制作人在第一个<代码>之前连接到Kafka主题。send()被调用,但我找不到这样做的方法。这可能吗?

日志显示 KafkaTemplate 仅在我在 16:12:44 触发 .send 方法后连接到 Kafka 主题:

2021-11-24 16:12:12.602  INFO 63930 --- [           main] c.e.k.KafkaProducerExampleApplication    : The following profiles are active: dev
2021-11-24 16:12:13.551  INFO 63930 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8080 (http)
2021-11-24 16:12:13.559  INFO 63930 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2021-11-24 16:12:13.559  INFO 63930 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.53]
2021-11-24 16:12:13.613  INFO 63930 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2021-11-24 16:12:13.613  INFO 63930 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 974 ms
2021-11-24 16:12:13.989  INFO 63930 --- [           main] pertySourcedRequestMappingHandlerMapping : Mapped URL path [/v2/api-docs] onto method [springfox.documentation.swagger2.web.Swagger2Controller#getDocumentation(String, HttpServletRequest)]
2021-11-24 16:12:14.190  INFO 63930 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2021-11-24 16:12:14.190  INFO 63930 --- [           main] d.s.w.p.DocumentationPluginsBootstrapper : Context refreshed
2021-11-24 16:12:14.207  INFO 63930 --- [           main] d.s.w.p.DocumentationPluginsBootstrapper : Found 1 custom documentation plugin(s)
2021-11-24 16:12:14.239  INFO 63930 --- [           main] s.d.s.w.s.ApiListingReferenceScanner     : Scanning for api listing references
2021-11-24 16:12:14.336  INFO 63930 --- [           main] c.e.k.KafkaProducerExampleApplication    : Started KafkaProducerExampleApplication in 7.055 seconds (JVM running for 7.341)
2021-11-24 16:12:44.550  INFO 63930 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-11-24 16:12:44.550  INFO 63930 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2021-11-24 16:12:44.551  INFO 63930 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
2021-11-24 16:12:44.649  INFO 63930 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 

共有3个答案

郑正阳
2023-03-14

智能生命周期bean为我们工作,谢谢。

@Component
class KafkaProducer (
    private val userChangeLogTemplate: KafkaTemplate<String, UserChangeLog>
    private val kafkaProperties: MizenKafkaProperties
) : NotificationProducer{

    @Bean
    fun connector(pf: ProducerFactory<String, Any>): SmartLifecycle {
        return object : SmartLifecycle {
            override fun stop() {}
            override fun start() {
                pf.createProducer().close()
            }

            override fun isRunning(): Boolean {
                return false
            }
        }
    }

    override fun sendUserChangeLog(message: UserChangeLog) {
        userChangeLogTemplate.send(kafkaProperties.userChangeLogTopic, message)
    }
}
长孙诚
2023-03-14

使用非事务生产者(不提供TransactionIdPrefix),当您第一次调用KafkaTemplate.send时,它会委托给的生产工厂以获取生产者的单个实例。此时,因为以前没有生产者的单个实例,生产者工厂将为您创建这个实例(这就是为什么您看到了日志生产者配置:生产者配置值…)。这个生产者实例现在被所有客户端使用/共享。

因此,如果您想预先创建上面的生产者实例,您可以直接在< code>ProducerFactory上调用它,例如:

@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
        kafkaProducerFactory.createProducer();
        return kafkaTemplate;
...
湛安宁
2023-03-14

关于Linh Vu的回答,最好不要在bean定义中创建连接——这在应用程序上下文的生命周期中还为时过早。

相反,添加一个实现智能生命周期的 bean 并在 start() 中创建连接;这样,上下文将在连接之前完全初始化。

@Bean
SmartLifecycle connector(ProducerFactory<Object ,Object> pf) {
    return new SmartLifecycle() {
        
        @Override
        public void stop() {
        }
        
        @Override
        public void start() {
            pf.createProducer().close();
        }
        
        @Override
        public boolean isRunning() {
            return false;
        }
        
    };
}
 类似资料:
  • 我有一个Spring启动应用程序,它使用来自 Kafka 集群中某个主题(例如 topic1)的消息。这就是我的代码目前的样子。 现在我想从另一个Kafka集群中的不同主题开始消费。一种方法是为此创建另一个bean。但是有更好的方法吗?

  • 我正在使用 kafka 和 spring boot,我需要将 JSON 对象发送到 kafka,关键是我能够将一个对象作为配置 KafkaTemplate 的 JSON 发送,但仅适用于此对象。 但是如果现在我想发送一个新的DTO对象呢?我是否必须声明一个新的

  • 我将kafka-connect docker映像()部署到ECS/fargate,为我的ECS服务分配了一个安全组,该安全组允许传入的zooper keeper和kafka bootstrap服务器通信量(纯文本和TLS),并分配了一个IAM角色,该角色允许我的ECS任务对MSK集群运行kafka操作,但connect集群在尝试从MSK集群获取代理列表时仍会超时。 在AWS中,kafka conn

  • 我在我的微服务中定义了一个Kafka消费者。我已经部署了我的应用程序的5个实例。我已将ConvoltKafkaListenerContainerFactory中的并发参数设置为2。这是否意味着每个应用程序实例有2个消费者实例,或者我连接的整个主题有2个消费者实例?

  • 我有一个服务器a,在服务器a中,我安装了kafka并启动了kafka和Zookeeper。我还创建了一个主题作为my_topic。现在我有一个应用程序B运行在服务器B中,应用程序B有一些数据,我想把这些数据推送到服务器A中的my_topic。我是否也需要在服务器B中安装kafka并在服务器B中创建一个生产者?如果是,如何将来自服务器B的消息推送到服务器A中的主题?介质是什么?

  • 当我尝试使用start-slave.sh连接到主服务器时,spark://master:port如这里所述 我正在得到这个错误日志 我尝试使用本地ip和本地名称访问主服务器(我设法同时使用和不使用密码ssh到主服务器、用户和root用户) 谢了!