当前位置: 首页 > 知识库问答 >
问题:

为什么两个火花流作业从具有相同组id的相同Kafka主题拉消息不平衡负载,但得到相同的消息?

淳于坚壁
2023-03-14

Kafka 0.8官方文档对Kafka消费者描述如下:

“消费者用一个消费者组名称给自己贴标签,发布到主题的每条消息都被传递到每个订阅消费者组中的一个消费者实例。消费者实例可以在不同的进程中或在不同的机器上。如果所有消费者实例都有相同的消费者组,那么这就像传统的队列平衡消费者的负载一样。”

我用Kafka0.8.1.1设置一个Kafka集群,并使用Spark Streaming作业(Spark 1.3)从其主题中提取数据。Spark Streaming代码如下:

    ... ...

    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokerList);
    kafkaParams.put("group.id", groupId);

    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
        );

    messages.foreachRDD(new Function<JavaPairRDD<String, String>, Void>() {

        @Override
        public Void call(JavaPairRDD<String, String> rdd) throws Exception {
            long msgNum = strJavaRDD.count();
            System.out.println("There are " + msgNum + " messages read from Kafka.");

        ... ...

        return null;}});

然后我提交了两个Spark流媒体作业,以访问具有相同组id的同一主题。我假设当我向该主题发送100条消息时,这两个作业总共收到100条消息(例如job1 get 50和job2 get 50;或者job1 get 100和job2 get 0)。然而,他们分别得到了100分。这样的结果似乎与Kafka博士所说的有所不同。

我的密码有什么问题吗?我是否正确设置了组id配置?这是createDirectStream()的错误还是设计?

测试环境:Kafka0.8.1.1Spark 1.3.1

共有2个答案

壤驷坚
2023-03-14

创建两个不同的spark应用程序来用相同的消息做相同的事情是没有意义的。使用一个有更多执行者的应用程序。

金飞
2023-03-14

组是Kafka0.9版之前的高级消费API的一个功能,它在简单消费API中不可用createDirectStream使用简单的消费者API。

一些提示:

>

  • 使用简单消费者实现的主要原因是,您希望对分区消耗的控制比消费者组提供的更大。(例如:多次阅读一条消息)

    CreateDirectStream:这种方法不是使用接收器来接收数据,而是定期查询Kafka每个主题分区中的最新偏移量,并相应地定义每个批处理中要处理的偏移量范围。

    参考:

    1. Spark Streaming Kafka集成指南

    Kafka 0.9.0版本添加了一个新的Java使用者,以取代现有的基于ZooKeeper的高级使用者和低级使用者API。然后您可以使用group,同时提交偏移量手册。

  •  类似资料:
    • 我对Kafka是陌生的。我用spring boot创建了一个kafka消费者(spring-kafka dependency)。在我的应用程序中,我使用了consumerFactory和producerfactory beans进行配置。所以在我的应用程序中,我创建了如下的kafka消费者。 我的配置如下 所以我想并行消费,因为我可能会收到更多的消息。关于使用并行主题,我发现我需要为一个主题创建多

    • 我正在尝试对消费者群体进行实验 这是我的代码片段 } 当我同时运行两个spark流媒体作业时,它会出错 线程“main”java中出现异常。lang.IllegalStateException:当前没有分配给组织上的分区venkat4-1。阿帕奇。Kafka。客户。消费者内部。订阅状态。组织上的assignedState(SubscriptionState.java:251)。阿帕奇。Kafka。

    • 我是Kafka的初学者。我知道具有相同组id的多个消费者不能在一个主题中使用来自同一个分区的消息。我想知道如果来自一个消费组的多个Kafka消费者从一个分区读取相同的消息会发生什么,为什么这是一件坏事。 。

    • 我正在编写一个概念验证应用程序来使用Apache Kafka0.9.0.0中的消息,看看是否可以使用它而不是通用的JMS消息代理,因为Kafka提供了好处。这是我的基本代码,使用新的消费者API: 我使用默认设置启动了一个kafka服务器,并使用shell工具启动了一个kafka生产者,以便将消息写入我的主题。然后,我使用这段代码与两个使用者连接,发送正确的服务器来连接,发送主题来订阅,其他一切都

    • 问题内容: 在以下代码中,我不明白为什么当它属于两个不同的对象时具有相同的ID? 问题答案: 我认为这是正在发生的事情: 取消引用时,将在内存中创建其副本。该存储位置由以下位置返回 由于没有引用到刚刚创建的方法的副本,因此GC将其回收,并且该内存地址再次可用 取消引用时,将在相同的内存地址(可用)中创建它的副本,您可以再次使用该地址。 第二个副本是GCd 如果您要运行一堆其他代码并再次检查实例方法

    • 我有一个SOAP Web服务,它发送一个kafka请求消息,并等待一个kafka响应消息(例如,consumer.poll(10000))。 每次调用web服务时,它都会创建一个新的Kafka生产者和一个新的Kafka消费者。 每次调用web服务时,使用者都会收到相同的消息(例如,具有相同偏移量的消息)。 我使用的是Kafka0.9,启用了自动提交,并且自动提交频率为100毫秒。 更新0001