当前位置: 首页 > 工具软件 > vertx-starter > 使用案例 >

springboot 集成 vertx-kafka-client

淳于知
2023-12-01

为什么尝试做这个集成

vertx是一套封装了netty的异步事件驱动的框架,netty采用的线程模型可以高效处理某些情况下的网络通讯,然而这套框架需要程序员使用函数编程的方式,不是传统的方式。本项目主要是为了构建一个框架。熟悉springboot编程的程序员只需要通过注解或者接口编程的式就可以使用到 vertx-kafka-client。

项目依赖

集成demo采用的依赖如下,主要是spring-boot-starter-web和vertx-kafka-client。

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.2.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/io.vertx/vertx-kafka-client -->
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-kafka-client</artifactId>
            <version>4.0.0</version>
        </dependency>

初始化生产者和消费者Bean

初始化生产者

初始化生产者的逻辑很简单,通过ApplicationContext取出所有的Consumer类,再通过反射取到消费者上的注解 MessageHandler所标注的信息(包括topic,msgType等)。在config类中,注册了消费者的事件,并进行topic监听。代码如下:

@Bean
    public List<KafkaConsumer> kafkaConsumers(){
// use consumer for interacting with Apache Kafka
        List<KafkaConsumer> kafkaConsumers = new ArrayList<>();
        Map<String, IKafkaHandler> consumerHandlers = this.context.getBeansOfType(IKafkaHandler.class);
        for(String kafkaHandlerBean : consumerHandlers.keySet()){


            //通过反射获取MessageHandler里的元信息

            try {
                IKafkaHandler handler = consumerHandlers.get(kafkaHandlerBean);
                Class clazz = handler.getClass();
                Method handleMethod = clazz.getDeclaredMethod("handle",Object.class);
                MessageHandler anno = handleMethod.getAnnotation(MessageHandler.class);
                String consumerGroup = anno.consumerGroup();
                Class msgType = anno.msgType();
                Map<String, String> config = new HashMap<>();
                config.put("bootstrap.servers", "localhost:9092");
                config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                if(Strings.isNotBlank(consumerGroup)) {
                    config.put("group.id", consumerGroup);
                }
                config.put("auto.offset.reset", "earliest");
                config.put("enable.auto.commit", "false");
                KafkaConsumer<String,String> consumer = KafkaConsumer.create(vertx, config);

                String topic = anno.topic();
                consumer.handler(message->{
                    String value = message.record().value();
                    String key = message.record().key();
                    try {
                        if (msgType.getSimpleName().equals("String")) {
                            handler.handle(value);
                        } else {
                            Object var1 = JSON.parseObject(value, msgType);
                            handler.handle(var1);
                        }
                    } catch (Exception e){
                        logger.info("consume error,msg = {}",value);
                    }
                });
                consumer.exceptionHandler(error->logger.info("consumer出错{}",error.toString()));
                consumer.subscribe(topic);
                kafkaConsumers.add(consumer);
            } catch (Exception e) {
                logger.error("error",e);
            }

        }
        return kafkaConsumers;
    }
初始化生产者

生产者的初始化很简单,直接初始化一个Bean即可。代码如下:

    @Bean
    public KafkaProducer kafkaProducer(){
        Map<String, String> config = new HashMap<>();
        config.put("bootstrap.servers", "localhost:9092");
        config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        config.put("acks", "1");

// use producer for interacting with Apache Kafka
        KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config);
        return producer;
    }

如何使用消费者

用户可以通过接口编程的方式的实现来进行 consumer 的使用。继承如下接口即可,使用 MessageHanlder 表明其 topic, 示例说明:

public class Topic1Handler implements IKafkaHandler<Message> {

    private static final Logger logger = LoggerFactory.getLogger(Topic1Handler.class);

    /**
     * msgType 需要和 handle里的参数类型相同
     * @param message
     */
    @MessageHandler(topic = "topic1",msgType = Message.class)
    @Override
    public void handle(Message message) {
        logger.info("topic1 收到消息:{}",message);
    }
}

如何使用生产者

通过注入producer即可实现一个默认配置的KafkaProducer,进行消息生产的代码。

@Autowired
KafkaProducer producer;

以上便是大概的代码,在我的git上有完整的demo,大家可以看看。
https://github.com/chifanchen/intergrate-springboot-with-vertx-kafka-client

 类似资料: