当前位置: 首页 > 知识库问答 >
问题:

弗林Kafka消费环境小组。消费时id不能正常工作

丌官翰采
2023-03-14

我用flink1。10.0,然后发现一个奇怪的问题。

我提交同一份工作两次。

$ flink list -r
28.02.2020 18:04:24 : f9ad14cb86a14c388ed6a146c80988fd : ReadKafkaJob (RUNNING)
28.02.2020 18:07:23 : e05bf26ee986573ffc01af8b1f5d1d59 : ReadKafkaJob (RUNNING)

两份工作属于同一组。id,但他们每个人都可以读取数据。下面的日志显示同一事件消耗两次。

2020-02-28 18:08:29,600 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.1,"eventTs":1582884509239,"seqNo":0}
2020-02-28 18:08:29,601 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.1,"eventTs":1582884509239,"seqNo":0}
2020-02-28 18:08:34,442 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.5,"eventTs":1582884514437,"seqNo":1}
2020-02-28 18:08:34,442 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.5,"eventTs":1582884514437,"seqNo":1}
2020-02-28 18:08:39,448 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.2,"eventTs":1582884519443,"seqNo":2}
2020-02-28 18:08:39,448 INFO  com.stc.sls.stream_process.examples.ReadKafkaJob              - eventDo={"id":"odd奇数","value":0.2,"eventTs":1582884519443,"seqNo":2}

我已经设定了“团队”。html" target="_blank">代码中的id。

String kafkaTopic = params.get("kafka-topic", "flink-test");
String brokers = params.get("brokers", "192.168.0.100:9092");
String groupId = "simple.read.kafka.job";

Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", brokers);
kafkaProps.setProperty("group.id", groupId);

FlinkKafkaConsumer<EventDo> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new EventDeSerializer(), kafkaProps);

那么,为什么同一群体的两名客户会两次消费Kafka的产品呢?

FlinkKafkaConsumer有什么特别的实现吗?

更新:

我做了一些测试,启动了两个控制台消费者和一个flink消费者。

如果我使用Kafka控制台消费者消费,如下所示,客户端。id=123

kafka-console-consumer --bootstrap-server 192.168.0.100:9092 --topic flink-test --consumer-property group.id=simple.read.kafka.job --consumer-property client.id=123

另一个client.id=456的消费者

kafka-console-consumer --bootstrap-server 192.168.0.100:9092 --topic flink-test --consumer-property group.id=simple.read.kafka.job --consumer-property client.id=456

然后我开始闪烁工作在IDEA消费主题flink.test与group.id="simple.read.kafka.job"

20:38:17,107 INFO  org.apache.kafka.clients.consumer.KafkaConsumer               - [Consumer clientId=cid0931c3, groupId=simple.read.kafka.job] Subscribed to partition(s): flink-test-0

我可以检查连接并找到两个消费者。

➜  bin descKafkaConsumerGroup simple.read.kafka.job --members          
Java HotSpot(TM) Server VM warning: G1 GC is disabled in this release.

GROUP                 CONSUMER-ID                              HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT  
simple.read.kafka.job 123-5925201d-a767-4216-acdc-b46f058db0df /192.168.0.100  123             1               flink-test(0)
simple.read.kafka.job 456-01190de7-5d4e-43c1-9cb6-b599c9c69b41 /192.168.0.101  456             0               -

但是临时工消费者在哪里?

两个控制台消费者在一个组中行为,Flink job消费者在另一个组中行为。

更新2

我已经结账了。整个代码如下。

import com.stc.sls.stream_process.examples.model.StringDeSerializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;
import java.util.UUID;


@Slf4j
public class SimpleReadKafkaJob {

    final static String clientId = "cid" + StringUtils.remove(UUID.randomUUID().toString(), '-').substring(0, 6);

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);

        String kafkaTopic = params.get("kafka-topic", "flink-test");
        String brokers = params.get("brokers", "192.168.0.100:9092");
        String groupId = "simple.read.kafka.job";

        System.out.printf("Reading kafka topic %s @ %s\n", kafkaTopic, brokers);
        System.out.println();

        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", brokers);
        kafkaProps.setProperty("group.id", groupId);
        kafkaProps.setProperty("client.id", clientId);

        FlinkKafkaConsumer<String> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new StringDeSerializer(), kafkaProps);
        kafka.setStartFromGroupOffsets();
        kafka.setCommitOffsetsOnCheckpoints(true);

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(20000);
        env.setStateBackend((StateBackend) new FsStateBackend("file:///Users/ym/tmp/checkpoint"));
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


        DataStream<String> dataStream = env.addSource(kafka);
        dataStream.map((MapFunction<String, String>) s -> {
            log.info("message={}", s);
            return s;
        }).addSink(new DiscardingSink<>());

        env.execute(SimpleReadKafkaJob.class.getSimpleName());
    }


}

并记录如下

23:00:47,643 INFO  org.apache.kafka.clients.consumer.KafkaConsumer               - [Consumer clientId=cidb30879, groupId=simple.read.kafka.job] Subscribed to partition(s): flink-test-0
23:00:47,673 INFO  org.apache.kafka.clients.Metadata                             - Cluster ID: LsItbMw1T_SHYQvJMt_6Fw
23:00:47,677 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=cidb30879, groupId=simple.read.kafka.job] Discovered group coordinator 192.168.0.100:9092 (id: 2147483647 rack: null)
23:01:02,160 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1583420462134 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:01:02,572 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 437 ms).
23:01:22,136 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1583420482135 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:01:22,147 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 10 ms).
23:01:42,139 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 3 @ 1583420502138 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:01:42,151 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 3 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 12 ms).
23:02:02,138 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 4 @ 1583420522137 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:02:02,147 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 4 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 9 ms).
23:02:22,141 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 5 @ 1583420542139 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:02:22,152 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 5 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 12 ms).
23:02:42,138 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 6 @ 1583420562137 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:02:42,149 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 6 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 11 ms).
23:03:02,140 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 7 @ 1583420582139 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:03:02,148 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 7 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 8 ms).
23:03:18,031 INFO  com.stc.sls.stream_process.examples.SimpleReadKafkaJob        - message={seqNo: 1, eventTs: 1583420589670, id: even偶数, value: 2.76}
23:03:22,141 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 8 @ 1583420602141 for job 4d97a614ef4f928b93c8159f30e9c22e.
23:03:22,152 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 8 for job 4d97a614ef4f928b93c8159f30e9c22e (1236 bytes in 10 ms).
23:03:25,544 INFO  com.stc.sls.stream_process.examples.SimpleReadKafkaJob        - message={seqNo: 2, eventTs: 1583420598370, id: even偶数, value: 5.18}
23:03:33,181 INFO  com.stc.sls.stream_process.examples.SimpleReadKafkaJob        - message={seqNo: 3, eventTs: 1583420605939, id: odd奇数, value: 0.89}
23:03:40,659 INFO  com.stc.sls.stream_process.examples.SimpleReadKafkaJob        - message={seqNo: 4, eventTs: 1583420613564, id: even偶数, value: 9.29}


但使用Kafka消费群体显示没有活跃成员

bin descKafkaConsumerGroup simple.read.kafka.job --members
Java HotSpot(TM) Server VM warning: G1 GC is disabled in this release.

Consumer group 'simple.read.kafka.job' has no active members.

共有2个答案

罗韬
2023-03-14

在Flink 1.10中,Flink使用FlinkKafkaConsumer提供Kafka消费能力。

flinkkafaconsumer将使用名为KafkaFetcher的类来消耗数据。

真正的消费工作由KafkaFetcher担任的KafkaConsumerThread不使用KafkaConsumer#subscribe()API,而是使用KafkaConsumer#assign()API。

这意味着Flink不使用Kafka的消费者组功能(如果您打开检查点),而是通过检查点机制管理分区偏移本身。

因此,group pid机制将无法在Flink应用中工作。

董权
2023-03-14

默认情况下,Flink通过检查点实现偏移量保存机制。这意味着偏移量将在检查点保存到Kafka。这样,当作业崩溃时,可以安全地重播尚未完全处理的事件。

因此,当您部署两个作业时,完全有可能并且预期这两个作业都会在定期保存偏移量时处理记录。有各种可能改变这种行为,例如启用具有适当间隔的自动提交。不过,这可能会对性能产生负面影响,或导致重新启动时数据丢失。

有关更多信息,请参阅此处的文档。

 类似资料:
  • 我是一个新的Kafka。我开始做Kafka,我面临以下问题,请帮助我解决这一个,提前谢谢。首先,我正在编写生产者API,它工作良好,但在编写消费者API时,消息不会显示。 我的代码是这样的: 已订阅主题Hello-Kafka records::org.apache.kafka.clients.consumer.consumerRecords@76b0bfab org.apache.kafka.cl

  • 我的结构是这样的:日志文件 但我卡在Kafka到Logstash部分。 首先,Filebeat可以向Kafka生成消息,我可以使用以下方式检查它: 也可以由命令使用: 但是,当我尝试使用logstash来消费主题时,没有任何东西可以被检索到,Zoomaster一直在抛出: 2017-11-13 16:11:59205[myid:]-信息[NIOServerCxn.工厂:0.0.0.0/0.0.0.

  • 我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我是Kafka的新手,我对消费者的理解是,基本上有两种类型的实现 1)高级消费者/消费者群体 2)简单消费者 高级抽象最重要的部分是当Kafka不关心处理偏移量,而Simple消费者对偏移量管理提供了更好的控制时使用它。让我困惑的是,如果我想在多线程环境中运行consumer,并且还想控制偏移量,该怎么办。如果我使用消费者组,这是否意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?