当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者与Spark-Kafka-Consumer的区别

章昆琦
2023-03-14

我有一个Kafka主题,我正在通过Kafka生产者发送数据。现在在消费者方面,我有两个选择。

  @EnableKafka
@Configuration
@PropertySource("kaafka.properties")
public class RawEventKafkaConsumer {

    private static final Logger logger = LoggerFactory.getLogger(RawEventKafkaConsumer.class);
    @Autowired
    private DataModelServiceImpl dataModelServiceImpl;

    private PolicyExecutor policyExecutor;

    public RawEventKafkaConsumer() {
         policyExecutor = new PolicyExecutor();
    }


    @Value("${spring.kafka.topic}")
    private String rawEventTopicName;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootStrapServer;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;



    @Bean
    public DefaultKafkaConsumerFactory<String, BaseDataModel> rawEventConsumer() {
        Map<String, Object> consumerProperties = new HashMap<>();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        return new DefaultKafkaConsumerFactory<>(consumerProperties);
    }

    @Bean(name="kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, BaseDataModel> kafkaListenerContainerFactory() {
         logger.info("kafkaListenerContainerFactory called..");
        ConcurrentKafkaListenerContainerFactory<String, BaseDataModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(rawEventConsumer());
        return factory;
    }

    @KafkaListener(topics = "rawEventTopic",  containerFactory = "kafkaListenerContainerFactory")
    public void listen(String baseDataModel) {

        ObjectMapper mapper = new ObjectMapper();
        BaseDataModel csvDataModel;
        try {
            csvDataModel = mapper.readValue(baseDataModel, BaseDataModel.class);

            //saving the datamodel in elastic search.
            //dataModelServiceImpl.save(csvDataModel);
            System.out.println("Message received " + csvDataModel.toString());
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

}
 @Service
    public class RawEventSparkStreamConsumer {

        private final Logger logger = LoggerFactory.getLogger(RawEventSparkStreamConsumer.class);

        @Autowired
        private DataModelServiceImpl dataModelServiceImpl;


        @Autowired
        private JavaStreamingContext streamingContext;

        @Autowired
        private JavaInputDStream<ConsumerRecord<String, String>> messages;


        @PostConstruct
        private void sparkRawEventConsumer() {

            ExecutorService executor = Executors.newSingleThreadExecutor();
            executor.execute(()->{
                messages.foreachRDD((rdd) -> {
                    System.out.println("RDD coming *************************______________________________---------------------.." + rdd.count());
                    rdd.foreach(record -> {
                        System.out.println("Data is comming...." + record);
                    });
                });

                streamingContext.start();

                try {
                    streamingContext.awaitTermination();
                } catch (InterruptedException e) { // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            });

        }
    }
    null

共有1个答案

柏高丽
2023-03-14

简单的回答是,您需要一个Spark集群以分布式方式运行Spark代码,而Kafka使用者只在一个JVM中运行,您手动运行同一个应用程序的多个实例来扩展它。

换句话说,您将以不同的方式运行它们。spark-submitvsjava-jar。我不相信使用Spring会改变

另一个区别是“普通消费者”对Kafka配置有更多的控制,您一次只获得一个记录。Spark RDD可以是许多事件,并且它们必须都属于相同的“模式”,除非您需要复杂的解析逻辑,否则使用RDD对象编写复杂的解析逻辑比使用为您提取的ConsumerRecord值更难。

总的来说,我认为把它们结合起来不是个好主意。

如果他们读的是相同的主题,那么Kafka使用者协议只能为每个分区分配一个使用者...不清楚您的主题有多少个分区,但这可以解释为什么一个可以工作,而另一个不行

 类似资料:
  • 这部分包含新的 Apache Kafka consumer API. 兼容性 Apache Kafka 版本 0.10+ 写入Kafka 您可以通过创建 org.apache.storm.kafka.bolt.KafkaBolt 实例并将其作为组件附加到您的topology.如果您使用 trident ,您可以通过使用以下对象完成 org.apache.storm.kafka.trident.Tr

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我有一个Kafka代理,有多个主题,每个主题都有一个分区。 我有一个消费者,它可以很好地使用主题中的消息 我的问题是,我需要通过增加分区的数量来提高消息队列的吞吐量,比如说,我在一个主题上有四个分区,有没有一种方法可以让我编写四个消费者,每个消费者都指向该主题上的各个分区??? }

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • 本文向大家介绍Kafka 新旧消费者的区别相关面试题,主要包含被问及Kafka 新旧消费者的区别时的应答技巧和注意事项,需要的朋友参考一下 旧的 Kafka 消费者 API 主要包括:SimpleConsumer(简单消费者) 和 ZookeeperConsumerConnectir(高级消费者)。SimpleConsumer 名字看起来是简单消费者,但是其实用起来很不简单,可以使用它从特定的分区