当前位置: 首页 > 知识库问答 >
问题:

在spring boot中创建KafkaTemplate的正确方法

扶文光
2023-03-14

我尝试在Spring Boot应用程序中配置apache kafka。我阅读了这篇文档,并按照以下步骤操作:

1)我将以下行添加到aplication.yaml:

spring:
  kafka:
    bootstrap-servers: kafka_host:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer

2)我创建新主题:

    @Bean
    public NewTopic responseTopic() {
        return new NewTopic("new-topic", 5, (short) 1);
    }
private final KafkaTemplate<String, byte[]> kafkaTemplate;

public KafkaEventBus(KafkaTemplate<String, byte[]> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}

@Bean
public KafkaTemplate<String, byte[]> myMessageKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}
@Bean
public ProducerFactory<String, byte[]> greetingProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka_hist4:9092");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

共有1个答案

袁永贞
2023-03-14

我认为你可以放心地忽略IDEA的警告;我没有问题布线在引导的模板与不同的泛型类型...

@SpringBootApplication
public class So55280173Application {

    public static void main(String[] args) {
        SpringApplication.run(So55280173Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template, Foo foo) {
        return args -> {
            template.send("so55280173", "foo");
            if (foo.template == template) {
                System.out.println("they are the same");
            }
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so55280173", 1, (short) 1);
    }

}

@Component
class Foo {

    final KafkaTemplate<String, String> template;

    @Autowired
    Foo(KafkaTemplate<String, String> template) {
        this.template = template;
    }

}

they are the same
 类似资料:
  • 我正在处理一个数据库,我用AutoCloseabe的extends创建了一个名为Dao的公共接口,所以我有一个实现这个接口的类,但我想在那里创建一些私有方法,但它们仍然需要Autocloseable。所以我的问题是,我不能在接口中创建私有方法而不在接口中定义它们。如果我在类中创建一个私有方法,但不从DAO重写,就会发生这种情况?他们不会有自动关闭的,是吗?。如果没有,我可以实现哪种解决方案?

  • 我想了3种方法来创建一个命令并执行它。 每个命令都有不同方法。IE: 或者3)像第二个but一样,方法作为参数but接收,拆分为 这样做的正确方法是什么?由于某种原因,第三个看起来不对,但我不知道为什么。

  • 假设我有一个使用CustomObject列表的API操作。对于这些对象中的每一个,它都会调用一个创建Mono的服务方法。如何以一种惯用的无阻塞方式从这些单一对象创建流量? 我现在想到的是这个。我更改了方法名称,以更好地反映它们的预期目的。 此外,我需要订阅通量才能真正让它返回一些东西吗?

  • 本文向大家介绍Powershell创建数组正确、更快的方法,包括了Powershell创建数组正确、更快的方法的使用技巧和注意事项,需要的朋友参考一下 通常当新的对象添加到一个数组中,根据经验你最担心其性能问题。下面第一个例子将告诉你一个错误的操作方法: 在这个循环中,这个数组通过符号“+=”增加了许多新的对象。这样做会需要很长时间,因为在你改变其大小时Powershell每次需要去创造一个新的数

  • 我想知道在静态编程语言中创建全局常量的最佳方法是什么。Java,我们将使用一个包含常量的类,我们只需要导入这个类就可以访问所有常量。但是在静态编程语言中,有两种主要的方法: > 您可以创建一个包含所有常量的对象: 对象常量{const valCONST_1="foo"const valCONST_2="bar"const valCONST_3="toto"} 但这不是推荐的方式,正如一位语言开发者

  • 我是一个项目的一部分,我正在尝试添加TypeScript Web Server,这将与Swagger兼容。 什么是最基本的策略来实现它,考虑到容易的可运维性。 > 对于TypeScript,我注意到存在用于从TypeScript接口生成JSON模型的Tyson库。 对于Swagger,我尝试使用“Swagger node restify”库,因为它支持向Swagger添加JSON模型。 然而,我遇