Kafka SDK

优质
小牛编辑
138浏览
2023-12-01

概述

现如今,越来越多流行的架构模式甚至是商业模式都是围绕着实时数据展开的。在技术上,基于实时数据,我们可以实现事件驱动的响应式架构,来对复杂系统进行更好的解耦,并显著提高系统的性能和可扩展性;而在业务上,基于实时数据可以做的事情就更多了:比如,进行实时的数据统计、将营销活动在最恰当的时机推送给用户、通过跟踪用户行为并配合各种计算模型来及时掌控安全隐患等等。

诸葛通过在业务中进行埋点的方式,实时采集了用户所有的行为事件。这就意味着,当您选择了诸葛,您不但拥有了一套软件,同时也构建起了自己的实时数据体系。而为了能让客户充分利用起这些实时数据,我们在私有部署的最初版,就开放了各个数据处理阶段的Kafka数据格式,客户可以根据自己的业务需要来选择合适的Topic进行接入。

但在服务客户的过程中,我们发现,接入Kafka并不仅仅只是调用一下API这么简单,需要处理很多技术方面的细节或者重复的劳动,诸如:

  1. 如何将Kafka中的数据解码成业务中的领域对象?Kafka原始数据格式中的字段增删,会对领域对象解码造成怎样的影响?
  2. 对于Offset的提交,有几种语义,比如At most once、At least once、Exactly once,我们的业务适合怎样的语义?
  3. 错误应该被如何处理?一旦出错就终止消费?还是可以忽略错误?还是可以在一定程度上忽略错误?(比如一分钟发生的错误不超过10个就可以忽略)
  4. 是否实现了平滑关闭?当对进程执行终止时,是否能从容不迫的将当前没有处理完的数据处理完?
  5. 业务是否对时效性要求非常严格?是否需要通过固定partition消费的方式来避免Kafka再均衡对业务产生波动?
  6. 业务是计算密集型的,还是IO密集型的?该采用怎样的线程模型对读取到的消息进行处理?
  7. 业务中是否使用了线程池?当消息读取的速度大于消息处理的速度,线程池会有怎样的行为?队列大小或者线程数目是否会随之暴涨?设置的RejectPolicy是否会导致数据丢失甚至系统崩溃?
  8. 是否向外界暴露了足够的监控信息?比如当前消费线程是否存活?
  9. 是否能以HTTP接口/操作系统信号的方式控制消费的进程(比如暂时停止消费),便于运维操作?

……

为了能简化Kafka接入的二次开发工作,我们实现了对接Kafka的SDK。通过该SDK,您可以更容易的掌控并利用好您的实时数据流。

1. 快速接入

建议通过Maven在项目中集成SDK:

<dependency>
  <groupId>com.zhugeio</groupId>
  <artifactId>zhuge-server-sdk-kafka</artifactId>
  <version>1.0-SNAPSHOT</version>
</dependency>

SDK使用了与Kafka官方客户端一致的日志接口 —— slf4j,因此,如果您的项目依赖中没有slf4j的具体实现,那么需要引入一个,比如logback,或者log4j与slf4j的桥接:

logback:

<dependency>
  <groupId>ch.qos.logback</groupId>
  <artifactId>logback-classic</artifactId>
  <version><!-- 按需选择版本 --></version>
</dependency>

log4j和slf4j的桥接:

<dependency>
  <artifactId>log4j-slf4j-impl</artifactId>
  <groupId>org.apache.logging.log4j</groupId>
  <version><!-- 按需选择版本 --></version>
</dependency>

依赖集成完毕后,接下来,我们通常只需要两行代码,就可以使用SDK来处理数据了:

package com.zhugeio;

import com.zhugeio.sdk.common.consumer.ZhugeDataConsumer;
import com.zhugeio.sdk.kafka.ZhugeETLTopics;
import com.zhugeio.sdk.kafka.ZhugeKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestKafkaConsumer {

  private static final Logger logger = LoggerFactory.getLogger(TestKafkaConsumer.class);

  public static void main(String[] args) {
    final ZhugeDataConsumer consumer = new ZhugeKafkaConsumer("localhost:9092", "test-kafka-sdk", ZhugeETLTopics.PAY_ZG_TOTAL_RANDOM);
    consumer.consumeEvent(event -> logger.info("Event:" + event));
  }
}
  • 在main方法的第一行代码,我们通过三个参数构建了一个消费者对象:第一个参数是Kafka broker的地址,如果有多个节点,那么需要用逗号对地址进行分隔;第二个参数是Kafka Consumer Group ID,命名最好能体现出当前消费者的具体业务;第三个参数是要消费的Topic,诸葛在不同的ETL处理阶段,会将数据写入到不同的Topic中,这里选择的是pay_zg_total_random这个Topic,即做了id映射之后的数据。
  • 在main方法的第二行代码,就是开始实时消费数据流中事件类型的对象(即dt字段为evt的数据)。SDK会专门启动一个消费者线程,从Kafka读取数据,过滤出dt为evt类型的数据,并解码为ZhugeEvent对象,然后执行用户所指定的回调逻辑。在这里,我们只是把事件通过日志打印了出来。

以上代码会源源不断的执行:从Kafka读取并筛选数据、解码成ZhugeEvent对象、执行回调逻辑、提交Offset、再次读取数据…… 除非我们对consumer对象执行了暂停/关闭操作,或者杀掉相关进程,整个过程才会停止。

而对于同一个consumer对象,我们只能调用一次consume类型的方法。

2. 可消费的数据类型

诸葛分析平台包含了三种主要的数据类型:用户、设备、行为事件。SDK也分别为这三种数据类型提供了不同的领域对象,分别为:

  • com.zhugeio.sdk.common.domain.ZhugeUser
  • com.zhugeio.sdk.common.domain.ZhugePlatform
  • com.zhugeio.sdk.common.domain.ZhugeEvent

它们都是继承自了com.zhugeio.sdk.common.domain.ZhugeData

2.1 消费用户数据

如果我们只是要处理用户类型的数据,那么可以通过consumeUser或consumeUsers方法,前者提供了针对单个数据对象的处理视图,后者提供了针对批量数据的处理视图。

消费单个对象:

    consumer.consumeUser(zhugeUser -> {
      // 获取唯一用户ID
      final String cuid = zhugeUser.getCUID();
      logger.info(String.format("CUID: %s", cuid));

      // 对于系统属性的获取,也可以通过getSystemXxxProperty方法
      final String cuid2 = zhugeUser.getSystemStringProperty("cuid", "");
      logger.info(String.format("CUID: %s", cuid2));

      // 获取诸葛ID,只有在经过Identified之后的Topic才有该属性,其它Topic将会返回0
      final long zgId = zhugeUser.getZhugeId();
      logger.info(String.format("诸葛ID: %d", zgId));

      // 获取字符串类型的自定义属性,如果该属性不存在,则返回一个默认值
      // 支持各种类型的数据转换:包括:String、int、long、float、double、BigDecimal
      final String name = zhugeUser.getCustomStringProperty(
          "name", "unknown");
      logger.info(String.format("姓名:%s", name));

      // 获取IP地址
      final String ipAddress = zhugeUser.getIP();
      logger.info(String.format("IP地址:%s", ipAddress));

      // 获取数据上传时间
      final LocalDateTime uploadTime = zhugeUser.getUploadTime();
      logger.info(String.format("数据上传时间:%s", uploadTime));
    });

消费批量数据:

    consumer.consumeUsers(zhugeUsers -> {
      zhugeUsers.forEach(zhugeUser -> {
        logger.info(StringUtils.repeat('-', 10));
        final String cuid = zhugeUser.getCUID();
        logger.info(String.format("CUID: %s", cuid));

        final long zgId = zhugeUser.getZhugeId();
        logger.info(String.format("诸葛ID: %d", zgId));

        final String name = zhugeUser.getCustomStringProperty(
            "name", "unknown");
        logger.info(String.format("姓名:%s", name));

        final String ipAddress = zhugeUser.getIP();
        logger.info(String.format("IP地址:%s", ipAddress));

        final LocalDateTime uploadTime = zhugeUser.getUploadTime();
        logger.info(String.format("数据上传时间:%s", uploadTime));
      });
    });

如上所示,我们可以通过ZhugeUser对象来很方便的获取用户的各种系统属性和自定义属性。

需要值得注意的是,对于自定义属性的获取,由于埋点不周等原因,我们并不能确保属性总是存在,为了避免引发空指针等错误,在获取自定义属性的时候,应该总是为其指定一个默认值。

关于属性类型

诸葛在Kafka中的数据拥有三种属性类型,分别为:

  • 系统属性,比如cuid、诸葛ID等等,这些都是诸葛的系统内置属性,在Kafka原始的json数据中,属性名都是以$开头。而在SDK中,这些属性都有对应的方法可以获取值。比如像上文的getCUID。如果要是个别系统属性在SDK中找不到对应的获取方法,也可以通过getSystemXxxProperty方法,以字符串指定属性名作为参数进行获取。
  • 用户自定义属性,这些属性都是跟业务息息相关的。比如用户名称、性别、是不是VIP等等。它们在Kafka原始的json数据中,属性名都是以_开头。需要通过getCustomXxxProperty方法来获取。
  • 上传属性,这些属性描述了客户端上传环境的相关信息,比如IP地址、上传时间、UserAgent等等。在SDK中,这些属性也都有对应的方法可以获取到值。

在一些任务中,比如数据导出,我们可能需要一次性获取某种类型所有的属性,而非按照单个属性去读取,对此,SDK也提供了相应的方法:

    consumer.consumeUser(user -> {
      // 获取所有系统属性
      final Map<String, Object> systemProperties = user.getSystemProperties();
      logger.info(String.format("系统属性: %s", systemProperties));

      // 获取所有自定义属性
      final Map<String, Object> customProperties = user.getCustomProperties();
      logger.info(String.format("自定义属性:%s", customProperties));

      // 获取所有上传属性
      final Map<String, Object> uploadProperties = user.getUploadProperties();
      logger.info(String.format("上传属性:%s", uploadProperties));

      // 获取所有属性
      final Map<String, Object> allProperties = user.getProperties();
      logger.info(String.format("所有属性:%s", allProperties));
    });

关于数据类型

我们所采集的数据来源于不同平台下不同语言的SDK,因此对于数据类型,难以采取什么严格的标准。比如,对于货币类型,有些开发者在埋点时可能采用了字符串来表示,而有些则可能采用了浮点数来表示,但我们都知道,在Java平台上,表示货币最好的类型是BigDecimal。甚至客户端自己做了诸如AOP之类的自动化工作,采集的所有数据类型都是字符串。因此,我们在Kafka SDK这一侧,根据用户读取的意愿来实现了灵活的数据类型转换。

final BigDecimal price = zhugeUser.getCustomDecimalProperty("价格", "0.00");

上传数据的价格可能是字符串格式的"125.34",也可能是浮点数格式的125.34,但是当我们通过getCustomDecimalProperty方法,我们读取出来的都是一个BigDecimal类型。

同样,对于其它读取方法,也都实现了自动的类型转化:

  • getCustomStringProperty 无论原始数据是什么类型,都会被转化为字符串。比如1.0会被转化成"1.0"
  • getCustomIntProperty 无论原始数据是什么类型,都会尝试转化为数字,如果转化失败,则抛出异常。比如,像字符串"123"是可以被转化为数字123的,但是对于"abc",却无法被转化为数字。

关于单个和批量消费模式

如上所示,我们的SDK提供了单个和批量两种消费模式。开发者可能会问,为什么要提供两种?原因主要还是考虑到不同的场景有不同的需要。

假设我们的程序只是在内存中为符合筛选条件的用户计数,那么可以使用单个对象的消费模式;而如果我们的程序是要把符合条件的数据通过RPC接口提交到某个远程服务,或者是插入到MySQL之类的数据库中,那么要是再按照单个对象进行处理,显然是非常不合适的,这会引发大量的IO操作,导致整体吞吐量不佳,我们更希望对数据进行批量处理,以此来减少IO次数,从而提升吞吐量。

consumer.consumeUsers(zhugeUserDAO::batchSaveUsers);  // 批量保存用户信息到数据库

那么如何控制批量消费一次的对象数目呢?这个就需要通过Kafka消费者客户端的max.poll.records选项来进行配置了。该选项默认是500。我们将在专门的章节来介绍如何修改Kafka消费者客户端的参数。

需要注意:max.poll.records只是描述获取Kafka原始记录的数目,而为了提升客户端SDK的上传效率,诸葛在Kafka中的原始数据记录格式属于复合型。也就是说,在一条记录里面会包含多个事件、用户、设备的上传记录。因此该参数并不能精准控制一次批量消费的记录数,只能给出一个大概的限制,如果您的业务依赖精准的记录数目,那么还需要在消费逻辑中自行进行计数。

比如,我们有个数据保存接口,一次最多只能接收100条记录,那么就需要这样处理:

consumer.consumeUsers(zhugeUsers -> {
  final List<ZhugeUser> collector = new LinkedList<>();
  for (ZhugeUser zhugeUser : zhugeUsers) {
    collector.add(zhugeUser);
    if (collector.size() == 100) {
      userDAO.batchSave(collector);
      collector.clear();
    }
  }
  if (collector.size() > 0) {
    userDAO.batchSave(collector);
  }
})

这样就能严格的确保每次向业务接口写入的记录数目是小于等于100条的。

2.2 消费事件数据

对于事件类型数据,我们也可以采取同用户数据一致的方式进行消费。

消费单个对象:

    consumer.consumeEvent(event -> {
      logger.info(StringUtils.repeat('-', 20));

      // 事件名称
      final String eventName = event.getEventName();
      logger.info(String.format("事件名称:%s", eventName));

      // 事件唯一ID
      final String uuid = event.getUUID();
      logger.info(String.format("事件唯一ID:%s", uuid));

      // 用户唯一ID,如果是匿名用户的行为事件,则会返回空字符串
      final String cuid = event.getCUID();
      logger.info(String.format("用户唯一ID: %s", cuid));

      // 诸葛ID
      final long zgId = event.getZhugeId();
      logger.info(String.format("诸葛ID:%d", zgId));

      // 获取自定义的事件属性
      final String page = event.getCustomStringProperty("页面", "");
      logger.info(String.format("页面:%s", page));

      // IP地址
      final String ipAddress = event.getIP();
      logger.info(String.format("IP地址:%s", ipAddress));

      // 数据上传时间
      final LocalDateTime uploadTime = event.getUploadTime();
      logger.info(String.format("数据上传时间:%s", uploadTime));
    });

消费批量数据:

    consumer.consumeEvents(this::batchHandleEvents);

2.3 消费平台数据

平台数据中包含了客户端SDK上传的设备信息,比如操作系统、分辨率、语言环境、设备生产厂商等等。

消费单个对象:

    consumer.consumePlatform(platform -> {
      logger.info(StringUtils.repeat('-', 20));

      // 操作系统
      final String os = platform.getOS();
      logger.info(String.format("操作系统:%s", os));

      // 设备品牌
      final String brand = platform.getBrand();
      logger.info(String.format("设备品牌:%s", brand));

      // 运营商
      final String carrier = platform.getCarrier();
      logger.info(String.format("运营商:%s", carrier));

      // IMEI编号
      final String imei = platform.getIMEI();
      logger.info(String.format("IMEI:%s", imei));

      // 设备所属的用户唯一标识ID
      final String cuid = platform.getCUID();;
      logger.info(String.format("用户唯一ID:%s", cuid));

      // 数据上传时间
      final LocalDateTime uploadTime = platform.getUploadTime();
      logger.info(String.format("数据上传时间:%s", uploadTime));
    });

消费批量数据:

    consumer.consumePlatforms(this::batchHandlePlatforms);

2.4 消费混合数据

上述消费者逻辑都是对单一的数据类型进行处理,SDK自动根据数据类型做了过滤。而如果我们要在同一个消费者逻辑中处理所有的数据类型,那么就需要一种混合类型的消费模式。

consumer.consume(zhugeData -> {
      logger.info(StringUtils.repeat('-', 20));
      if (zhugeData.isUser()) {
        logger.info("接收到用户类型数据");
        final ZhugeUser zhugeUser = zhugeData.asUser();
        logger.info(String.format("用户信息:%s", zhugeUser));
      } else if (zhugeData.isEvent()) {
        logger.info("接收到事件类型数据");
        final ZhugeEvent zhugeEvent = zhugeData.asEvent();
        logger.info(String.format("事件信息:%s", zhugeEvent));
      } else if (zhugeData.isPlatform()) {
        logger.info("接收到平台类型数据");
        final ZhugePlatform zhugePlatform = zhugeData.asPlatform();
        logger.info(String.format("平台信息:%s", zhugePlatform));
      }
});

通过以上方法,我们就能在消费者逻辑中同时处理各种类型的对象了。

ZhugeData是所有数据类型的父类,它提供了isXxx方法来判断具体的数据类型,以及asXxx方法来将数据转化为具体的子类。

另外,ZhugeData也提供了一些公共方法,使得在一些场景中,我们无需关心具体的数据类型也能进行工作:

    consumer.consume(zhugeData -> {
      logger.info(StringUtils.repeat('-', 20));

      final String cuid = zhugeData.getSystemStringProperty("cuid", "");
      logger.info(String.format("CUID: %s", cuid));

      final ZhugeDataType zhugeDataType = zhugeData.getType();
      logger.info(String.format("数据类型:%s", zhugeDataType));

      // 获取所有系统属性
      final Map<String, Object> systemProperties = zhugeData.getSystemProperties();
      logger.info(String.format("系统属性: %s", systemProperties));

      // 获取所有自定义属性
      final Map<String, Object> customProperties = zhugeData.getCustomProperties();
      logger.info(String.format("自定义属性:%s", customProperties));

      // 获取所有上传属性
      final Map<String, Object> uploadProperties = zhugeData.getUploadProperties();
      logger.info(String.format("上传属性:%s", uploadProperties));

      // 获取所有属性
      final Map<String, Object> allProperties = zhugeData.getProperties();
      logger.info(String.format("所有属性:%s", allProperties));
    });

我们当然也可以对混合数据进行批量处理:

    consumer.consumeBatch(this::batchHandleZhugeData);

3. 配置Kafka消费者

通常,我们仅仅需要Kafka Broker地址、Consumer Group ID以及目标Topic的名称,就能配置出一个可以满足大多数场景的消费者对象。而如果当默认配置并不能满足要求,SDK也允许我们自定义Kafka消费者参数。

    final ZhugeKafkaDataReaderOptions readerOptions = new ZhugeKafkaDataReaderOptions(
        "localhost:9092",
        "test-kafka-sdk",
        ZhugeETLTopics.PAY_ZG_TOTAL_RANDOM
    );
    readerOptions.setMaxPollRecords(100);
    readerOptions.setHeartbeatIntervalMs(500);
    readerOptions.setKafkaConsumerProperty("fetch.max.wait.ms", "1000");
    final ZhugeDataConsumer consumer = new ZhugeKafkaConsumer(readerOptions);
    consumer.consume(event -> logger.info("Event: " + event));

在构建消费者对象之前,我们需要先通过ZhugeKafkaDataReaderOptions对象来配置相关参数。对于常用的Kafka Consumer配置参数,比如max.poll.records,我们在readerOptions中都提供了现成的方法,可以直接通过这些方法进行设置。

也可以通过setKafkaConsumerProperty以字符串格式的kv进行设置。

3.1 常用的Kafka Consumer配置参数

在这里,我们整理了常用的Kafka Consumer配置参数,方便开发者进行参考:

  • bootstrap.servers Kafka集群连接字符串
  • group.id Kafka Consumer Group,最好起跟业务相关的名字
  • key.deserializer Key反序列化器
  • value.deserializer Value反序列化器
  • fetch.min.bytes 消费者从服务器获取记录的最小字节数,当broker收到请求时,如果可用的数据量小于fetch.min.bytes所指定的大小,那么它会等到有足够可用数据时才会返回给消费者。这样可以降低消费者和broker之间的通信频率,减少broker的负载。
  • fetch.max.wait.ms 获取数据最大等待时长,与fetch.min.bytes配合使用,哪个配置先达到条件,哪个就生效。
  • max.partition.fetch.bytes 服务器从每个分区里面返回给消费者的最大字节数,默认是1MB。注意,该项必须必broker配置的max.message.size大,否则可能导致无法读取消息。另外需要避免poll()一次返回的数据量太多,而当消费者在一次消费周期处理太多数据,可能会导致心跳会话过期,因此也要避免将该值设置的过大。
  • session.timeout.ms 指定心跳超时时间,如果超过该时间没有发送心跳给broker,则会认为consumer已经死亡,系统会触发再均衡。
  • heartbeat.interval.ms 指定了poll方法向broker发送心跳的频率,heartbeat.interval.ms一定要比session.timeout.ms小,大了会出问题。
  • auto.offset.reset 在从ZK或者broker中找不到偏移量的情况下,Consumer的读取行为,默认是latest,从最新记录开始,还可以设置为earliest,从最早记录开始。
  • enable.auto.commit 是否允许自动提交偏移量,true or false。
  • auto.commit.interval.ms 自动提交偏移量时间间隔。
  • max.poll.records 控制单次调用poll方法能够返回的记录数目。
  • send.buffer.bytes Socket请求缓冲区大小,设置为-1,会使用操作系统默认值。
  • receive.buffer.bytes Socket接收缓冲区大小,设置为-1,会使用操作系统默认值。
  • partition.assignment.strategy 分区分配策略

3.2 偏移量提交策略

在Kafka Consumer所有的配置当中,偏移量提交策略应该是最为重要的,因为如果配置不当,会出现数据被重复消费或者丢弃的情况。

自动提交

SDK可以通过KafkaOffsetCommitPolicy来灵活设置偏移量的提交策略。首先,我们的提交策略分为了自动和手动。当选择了自动提交,无需任何操作,Kafka客户端会以固定的时间间隔来对偏移量进行提交。设置自动提交的方法:

    final ZhugeKafkaDataReaderOptions readerOptions = new ZhugeKafkaDataReaderOptions(
        "localhost:9092",
        "test-kafka-sdk",
        ZhugeETLTopics.PAY_ZG_TOTAL_RANDOM,
        new KafkaOffsetCommitPolicy().auto()  // 设置为自动提交策略
    );
    readerOptions.setAutoCommitIntervalMs(1000);  // 设置自动提交时间间隔
    final ZhugeDataConsumer consumer = new ZhugeKafkaConsumer(readerOptions);
    consumer.consumeEvent(event -> logger.info("Event: " + event));

自动提交在多数场景显然是不可控的,它的最大优点是省心,我们只需要把offset全权交给Kafka客户端去处理,而不需要关心别的细节,自动提交显然也只适合那些不那么严谨的业务场景,在指定的时间间隔左右(之所以是”左右“,是因为自动提交是采用异步的方式,因此并不能确保在规定的时间间隔总是提交成功),丢失少量数据或者重复消费对业务造成的影响可以忽略不计。

手动提交

而当选择手动提交偏移量时,我们需要重点考虑三个问题:

  1. 在什么时机进行提交,是在记录被消费之前还是之后?
  2. 以怎样的消费规模进行提交,是批量消费多条记录后进行提交还是每消费一条记录就提交?
  3. 以怎样的方式进行提交,同步还是异步,同步就是指只有offset提交成功,才可以进行接下来的动作,而异步则不保证提交成功,就开始执行下一步操作。

这三个问题相互结合,会影响到数据是否被重复消费或丢失,以及消费者整体的吞吐量。

通过KafkaOffsetCommitPolicy,我们可以对这三个问题进行控制:

new KafkaOffsetCommitPolicy().before().batch().sync();  // 在通过poll读取的一批数据被消费之前,采用同步的方式进行提交
new KafkaOffsetCommitPolicy().after().batch().async();  // 当一批数据被消费完毕之后,采用异步的方式进行提交
new KafkaOffsetCommitPolicy().before().single().async();  // 在单条数据被消费之前进行异步提交
new KafkaOffsetCommitPolicy().after().single().async();  // 在单条数据被消费之后进行异步提交

KafkaOffsetCommitPolicy允许我们将上述三个问题的答案按照自己的需要进行组合,来定制适合业务的偏移量提交策略。下面,我们将结合消费者的三大语义,来对上述策略进行详细说明。

At most once

At most once为最多一次语义,确保数据最多只被消费一次。在Kafka消费者实现中,我们一般通过在正式消费数据之前来提交offset的方式来实现At most once。以下策略均可以实现At most once:

new KafkaOffsetCommitPolicy().before().batch().sync();
new KafkaOffsetCommitPolicy().before().single().sync();

At most once的缺点在于,当消费逻辑出现错误时,容易出现数据丢失的情况。比如,假设我们的策略是new KafkaOffsetCommitPolicy().before().batch().sync(),当我们通过KafkaConsumer的poll方法从broker读取了一批数据时,我们就立即提交了offset,如果此时消费线程因为异常而终止,数据还没来得及被完整处理,而当它再次被恢复时,因为包含这批数据的offset已经被提交了,消费者会从下一批数据开始消费,所以这批数据其实在业务的角度看来就算是丢失了。

At most once最适合那种”宁少勿多“的场景,最典型的就是发送营销所需要的短信、推送、邮件。我们往往希望,当出现故障时,宁愿少发一些,也不愿意对同一个用户发送多次,造成打扰。

At least once

At lease once为最少一次语义,确保数据会被消费大于等于1次。在Kafka消费者实现中,我们一般通过在正式消费数据之后来提交offset的方式来实现At least once。以下策略均可以实现At least once:

new KafkaOffsetCommitPolicy().after().batch().sync();
new KafkaOffsetCommitPolicy().after().single().sync();

At least once的缺点在于,当消费逻辑出现错误时,容易出现重复消费的情况。比如,假设我们的策略是new KafkaOffsetCommitPolicy().after().batch().sync(),当我们通过KafkaConsumer的poll方法从broker读取了一批数据,然后开始执行消费逻辑,假设这个时候消费线程崩溃了,那么这批数据的offset并没有被提交,当消费者再次被恢复时,这批数据就会被重复消费。

At least once适合那种拥有良好幂等性的业务场景,即向同样的逻辑,应用多次同样的数据,所得到的最终状态是相同的。最典型的例子就是我们把消费者接收的数据写入到一个拥有主键并支持upsert语义的数据库,如果是重复的记录,数据库会自动忽略掉。

我们当前SDK默认的配置就是基于批量的最多一次,即KafkaOffsetCommitPolicy().after().batch().sync()

Exactly once

Exactly once为只有一次语义,数据被严格保证只会消费一次,这是最为理想的消费者语义。但事实上,我们无法只通过配置偏移量提交策略就能够实现这个语义。要实现该语义,通常要结合”外力“。比如通过和数据库事务相配合,来决定偏移量的提交时机。或者采取更为简单的方法,在At least once的基础上,通过构建具有幂等性的业务来实现与Exactly once同样的效果。很多时候,让业务具备幂等性并不是一件很难做到的事情,通过对业务逻辑进行去重检查往往就可以实现(当然,如果确实是非常严肃的场景,这里可能需要事务支持,那就不简单了)。如果你的业务确实需要严格的Exactly once语义,在对记录进行消费时,做一下这些额外工作往往是值得的。

3.3 按固定Partition进行消费

通常,我们并需要显式的为Kafka Consumer选择partition,Kafka会自动完成这些工作。而且除了自动分配partition之外,Kafka还拥有”再均衡“的特性,这个能力可以在多个消费者当中实现高可用。但是,并不是所有的场景均适合”再均衡“这一特性,比如:

  1. 消费者进程是有状态的,一个消费者进程应当总是处理固定的partition,否则会发生错误。
  2. 业务对时间极为敏感,如果发生再均衡,系统抖动可能会影响业务。

如果您的业务属于这些特殊情况,那么可以通过固定partition的方式进行消费,即同一个进程应该总是消费某个或几个固定的partiton。

    final ZhugeKafkaDataReaderOptions readerOptions = new ZhugeKafkaDataReaderOptions(
        "localhost:9092",
        "test-kafka-sdk",
        ZhugeETLTopics.PAY_ZG_TOTAL_RANDOM,
        new KafkaOffsetCommitPolicy().batch().after().sync(),
        Collections.singletonList(0)  // 指定固定要消费的partition编号
    );

    final ZhugeDataConsumer consumer = new ZhugeKafkaConsumer(readerOptions);
    consumer.consume(zhugeData -> logger.info(String.format("Data: %s", zhugeData)));

4. 控制消费者线程

SDK允许我们对消费者线程执行很多控制操作:暂停、恢复、查看消费状态、关闭、强制关闭等等。

当我们使用consume一族的方法开始对数据进行消费时,消费逻辑并不会在调用该方法的线程中执行,SDK会单独启动一个专门的消费者线程,源源不断的从Kafka读取数据并回调指定的消费逻辑。消费并不会阻塞当前调用SDK的线程,我们可以继续在这个线程中做其它大的工作,例如:

  public static void main(String[] args) throws Exception {
    final ZhugeDataConsumer consumer = new ZhugeKafkaConsumer(
        "localhost:9092",
        "test-kafka-sdk",
        ZhugeETLTopics.PAY_ZG_TOTAL_RANDOM
    );
    consumer.consumeEvent(event -> logger.info("Event:" + event));
    logger.info("我可以继续运行呢");
    TimeUnit.SECONDS.sleep(10L);
    consumer.pause();
    TimeUnit.SECONDS.sleep(5L);
    consumer.rerun();
    TimeUnit.SECONDS.sleep(3L);
    consumer.close();
  }

consumeEvent方法被调用之后,我们发现主线程仍然可以运行,它会先打印一行日志,然后休眠10秒钟,将消费者线程暂停,然后再过5秒钟,将暂停的消费者恢复运行,再等三秒钟,平滑关闭消费者线程。这时候,消费者线程和主线程都没有逻辑可以执行了,整个JVM进程退出。

我们也可以通过join方法来让主线程阻塞,直到消费者线程被关闭为止:

  public static void main(String[] args) {
    final ZhugeDataConsumer consumer = new ZhugeKafkaConsumer(
        "localhost:9092",
        "test-kafka-sdk",
        ZhugeETLTopics.PAY_ZG_TOTAL_RANDOM
    );
    final ScheduledExecutorService scheduledExecutorService =
        Executors.newSingleThreadScheduledExecutor();
    scheduledExecutorService.schedule(
        consumer::close,
        10,
        TimeUnit.SECONDS
    );
    consumer.consume(event ->
        logger.info(String.format("Event: %s", event)));
    consumer.join();
    logger.info("我要走了,再见");
    scheduledExecutorService.shutdown();
 }

在调用consumeEvent方法之后,主线程会执行join,进入阻塞状态,之后输出日志的代码将无法执行,只有等10秒后,时间调度器线程关闭了消费者线程,主线程才会被解除阻塞,往下执行输出日志的语句,然后退出。

4.1 平滑关闭

有时候,即便我们精心配制了偏移量的提交策略,并且小心翼翼的处理了所有异常,如果在最终关闭消费者进程时使用了不恰当的操作(比如执行了kill -9),那么也会导致数据丢失或者业务逻辑被中断而出现错误。

SDK提供了平滑关闭消费者进程的方法close() ,当该方法被调用时,消费者线程会将当前已经读取的数据都消费完,才会结束运行,以避免在数据一致性上出现问题。并且SDK默认通过addShutdownHook方法注册了关闭钩子,当我们通过kill -term来终止进程时,只有当消费者线程被平滑关闭,进程才会退出。

如果您希望能自己控制消费者线程的关闭时机,而不需要自动注册关闭钩子,那么可以配置消费者选项得以实现:

  public static void main(String[] args) {
    final ZhugeKafkaDataReaderOptions readerOptions = new ZhugeKafkaDataReaderOptions(
        "localhost:9092",
        "test-kafka-sdk",
        ZhugeETLTopics.PAY_ZG_TOTAL_RANDOM
    );
    final BasicZhugeDataConsumerOptions consumerOptions = new BasicZhugeDataConsumerOptions();
    consumerOptions.setJvmShutdownHook(false);  // 取消SDK对关闭钩子的注册
    final ZhugeDataConsumer consumer = new ZhugeKafkaConsumer(
        readerOptions,
        consumerOptions
    );
    consumer.consume(event ->
        logger.info(String.format("Event: %s", event)));
    logger.info("HEHE");

    Runtime.getRuntime().addShutdownHook(new Thread(() -> {  // 自行添加关闭钩子
      consumer.close();
      consumer.join();  // 这里务必需要用join,因为close()方法是一个异步的关闭请求,如果没有join,JVM在这里可能会直接退出。
    }));
 }

通过上述配置,我们就禁止了SDK自行注册关闭钩子,而使用了我们自己编写的关闭钩子逻辑。当我们在消费逻辑中使用了多种外部资源,需要严格控制关闭的顺序,就需要开发者自己去严格的定义关闭钩子的逻辑。比如当我们在消费过程中使用了数据库连接,如果先关闭了数据库连接,再关闭消费者,那么再处理残余数据的时候就会出现问题 —— 此时数据库连接已经被关闭了。这种情况下,正确的顺序应该是先关闭消费者线程,再关闭数据库连接,才能实现真正的平滑关闭。

总之,无论如何,当要终止进程时,请务必首先确保消费者被平滑关闭。如果实在无法实现平滑关闭(比如进程长时间没有响应),才可以通过强行关闭来终止进程。

4.2 性能监控日志

为了能够了解消费者在各个数据处理阶段的执行效率,SDK内置了一套性能监控日志。在默认情况下,性能监控日志是被关闭的,可以通过调用API的方式来将其设置为开启:

    PerfLog.setEnable(true);  // 开启性能监控日志
    final ZhugeDataConsumer consumer = new ZhugeKafkaConsumer(
        "localhost:9092",
        "test-kafka-sdk",
        ZhugeETLTopics.PAY_ZG_TOTAL_RANDOM
    );
    consumer.consume(events -> {
      logger.info(String.format("Events: %s", events));
    });

该日志开启之后,我们就会在日志输出中发现一个名为zhuge_sdk_performance的logger,输出的内容大致如下:

10:17:20.166 [Thread-2] INFO zhuge_sdk_performance - --------------------
10:17:20.166 [Thread-2] INFO zhuge_sdk_performance - consumer_loop.poll_from_kafka used time: 6
10:17:20.166 [Thread-2] INFO zhuge_sdk_performance - consumer_loop.consume_data.decode_data used time: 182
10:17:20.166 [Thread-2] INFO zhuge_sdk_performance - consumer_loop.consume_data.run_callback used time: 0
10:17:20.166 [Thread-2] INFO zhuge_sdk_performance - consumer_loop.consume_data.decode_data used time: 2
10:17:20.167 [Thread-2] INFO zhuge_sdk_performance - consumer_loop.consume_data.run_callback used time: 0
10:17:20.167 [Thread-2] INFO zhuge_sdk_performance - consumer_loop.consume_data used time: 184
10:17:20.167 [Thread-2] INFO zhuge_sdk_performance - consumer_loop.commit_offset used time: 4
10:17:20.167 [Thread-2] INFO zhuge_sdk_performance - consumer_loop used time: 194

以上就是SDK执行一次数据消费各个阶段的性能消耗:

  • consumer_loop.poll_from_kafka 通过Kafka客户端poll API读取数据所使用的时间。
  • consumer_loop.consume_data.decode_data 将原始数据解码成领域对象所使用的时间
  • consumer_loop.consume_data.run_callback 执行回调逻辑所使用的时间
  • consumer_loop.consume_data 整个数据消费过程,包含数据解码和回调逻辑以及一些其它的判断逻辑所使用的时间
  • consumer_loop.commit_offset 提交偏移量所使用的时间
  • consumer_loop 整个消费循环所使用的时间

而除了SDK所设置的这些数据处理阶段之外,您也可以在自己的业务逻辑中定义要监控的阶段,这些阶段的监控数据会随着SDK内置的阶段一并输出:

    consumer.consumeEvents(events -> {
      PerfLog.beginSpan("output_log");
      logger.info(String.format("Events: %s", events));
      PerfLog.endSpan();

      PerfLog.beginSpan("save_events");
      saveEvents(events);
      PerfLog.endSpan();
    });

PerfLog.beginSpan 方法会开启一个新阶段的计时,而PerfLog.endSpan方法会结束该阶段的计时。这俩方法务必要成对出现。

接下来,我们会看到日志输出结果:

10:28:00.880 [Thread-2] INFO zhuge_sdk_performance - --------------------
10:28:00.880 [Thread-2] INFO zhuge_sdk_performance - consumer_loop.poll_from_kafka used time: 7
10:28:00.880 [Thread-2] INFO zhuge_sdk_performance - consumer_loop.consume_data.decode_data used time: 151
10:28:00.880 [Thread-2] INFO zhuge_sdk_performance - consumer_loop.consume_data.decode_data used time: 2
10:28:00.880 [Thread-2] INFO zhuge_sdk_performance - consumer_loop.consume_data.run_callback.output_log used time: 0
10:28:00.880 [Thread-2] INFO zhuge_sdk_performance - consumer_loop.consume_data.run_callback.save_events used time: 0
10:28:00.880 [Thread-2] INFO zhuge_sdk_performance - consumer_loop.consume_data.run_callback used time: 0
10:28:00.880 [Thread-2] INFO zhuge_sdk_performance - consumer_loop.consume_data used time: 154
10:28:00.880 [Thread-2] INFO zhuge_sdk_performance - consumer_loop.commit_offset used time: 2
10:28:00.880 [Thread-2] INFO zhuge_sdk_performance - consumer_loop used time: 163

发现多了两个阶段,一个是consumer_loop.consume_data.run_callback.output_logconsumer_loop.consume_data.run_callback.save_events 分别为我们在回调逻辑中所添加的两个数据处理阶段衡量。

建议在您所使用的日志框架中,对zhuge_sdk_performance这个logger进行单独配置,将输出结果写入到一个专门的日志文件中,这样有助于更好的观察和分析。而通过HTTP接口,我们可以在应用运行的过程中,就能随时打开或关闭性能监控日志,方便及时的了解当前应用的性能瓶颈。

下面,我们将介绍如何通过HTTP接口来控制消费者线程的行为。

4.3 使用HTTP接口来控制消费者线程

很多时候,我们希望能在运行时来控制消费者的行为以及了解其运行状态。为了能方便您的操作,SDK提供了一系列的HTTP API来对消费者进行运行时进行操作。

HTTP API默认是不启动的,如果需要开启,需要构建一个API Server:

    final ZhugeDataConsumer consumer = new ZhugeKafkaConsumer(
        "localhost:9092",
        "test-kafka-sdk",
        ZhugeETLTopics.PAY_ZG_TOTAL_RANDOM
    );
    final ZhugeDataConsumerAPIServer apiServer = new ZhugeDataConsumerAPIServer(
        "localhost", 1989, consumer);
    apiServer.start();
    consumer.consumeEvents(events -> logger.info(String.format("Events: %s", events)));
    consumer.join();
    apiServer.stop();

我们只需要通过三个参数来构建一个api server:监听地址、端口以及要操作的消费者对象,然后再构建好的API Server对象上调用start()方法,就可以启动HTTP API了。另外,当程序结束时,最好将api server也通过stop()方法来关闭。

接下来,可以尝试访问以下API:

暂停消费

[GET] /v1/pause

恢复被暂停的消费者执行

[GET] /v1/rerun

平滑关闭消费者

[GET] /v1/close

获取当前消费者的状态

[GET] /v1/status

返回的数据:

{"startTime":1606876063456,"lastAliveTime":1606877241642,"times":1175,"consumedObjects":4,"status":"RUNNING"}
  • startTime:进程启动时间戳
  • lastAliveTime:消费者最近一次活跃时间
  • times:执行的消费循环次数
  • consumedObjects:总共消费的对象数目
  • status:当前消费者状态

这个接口对于监控消费者的状态至关重要,比如,很多时候,我们的消费者可能进程还在,但是内部的处理过程已经卡住了,这种情况,我们就可以通过lastAliveTime来得知,正常情况下,lastAliveTime每执行一次消费循环,都是会递增的,如果发现这个时间戳不动了,那么极有可能是内部处理过程已经卡死。这时候就可以通过jstack等问题排查工具,来发现具体是卡在哪里。

开启性能监控日志

[GET] /v1/enablePerfLog

关闭性能监控日志

[GET] /v1/disablePerfLog

4.4 控制消费时间间隔

为了控制消费速率,SDK在消费完一批数据之后,可以指定一个时间间隔,再消费另一批数据。这个时间间隔默认是1000毫秒,即1秒钟。用户也可以自行改变这个时间间隔:

    final ZhugeKafkaDataReaderOptions readerOptions = new ZhugeKafkaDataReaderOptions(
        "localhost:9092",
        "test-kafka-sdk",
        ZhugeETLTopics.PAY_ZG_TOTAL_RANDOM
    );
    final BasicZhugeDataConsumerOptions consumerOptions = new BasicZhugeDataConsumerOptions();
    consumerOptions.setSleepTime(5000);
    final ZhugeDataConsumer consumer = new ZhugeKafkaConsumer(
        readerOptions,
        consumerOptions
    );
    consumer.consumeEvents(events -> logger.info(String.format("Events: %s", events)));

这样,我们就把消费时间间隔调整到了5秒钟一次。

细心的开发者可能发现,SDK针对Consumer的配置分成了两组,一组是readerOptions,用来控制数据读取相关的选项,比如Kafka的配置;还有一组是consumerOptions,用来控制消费者线程的行为。

5. 错误处理

对于实时数据应用,错误处理并不是一件容易的事情。我们往往不小心就选择了某种极端的处理方式:

  1. 只要出现了错误,就让消费线程立即退出。这种策略通常被称为”任其崩溃“, 一些面向严肃事务的开发框架,比如Erlang OTP,非常提倡这种错误处理方式。它的好处是:错误不会被隐瞒,并且程序不会在带着错误逻辑的情况下执行下去;但是,对于有些对时效性要求要大于对错误容忍的场景,这种处理策略就显得大题小做了。比如,在营销活动中,要通过监控实时数据来发送短信或邮件,像网络请求超时一类偶然发生的错误,我们是可以接受的,但如果因为这种错误导致消费线程立即退出,最终使得大量的顾客没有及时收到消息,这才是我们最不能容忍的。
  2. 将处理逻辑完全通过try ... catch 封装起来,并在catch语句中捕获所有的异常类型,任何异常都不允许被抛出到上层的处理框架。这种处理方式的好处在于,应用不会因为偶发的错误就中断数据处理,但却非常容易让错误被隐瞒,并且会让程序带着错误的逻辑执行下去。它造成了一种假象的”可用性“,是非常不推荐的一种错误处理方式。

SDK没有对错误处理做任何假设,它只是为开发者提供了一些便于错误处理的工具,开发者根据业务的需要,既可以选择”任其崩溃“的处理策略,也可以完全把所有错误catch起来,或者选择一种更为折中的处理方案。

可以通过设置错误处理器的方式来处理异常,当异常发生时,错误处理器会获得一个ConsumerErrorContext类型的参数,该参数中包含了本次异常的类型、连续的异常发生次数、一段时间内的异常发生次数等信息。我们可以根据这些信息来做决策,决定消费者下一步的行为:是停止工作,还是依然坚守工作岗位。

通过setErrorHandler方法来设置错误处理器:

  public static void main(String[] args) {
    final ZhugeDataConsumer consumer = new ZhugeKafkaConsumer(
        "localhost:9092",
        "test-kafka-sdk",
        ZhugeETLTopics.PAY_ZG_TOTAL_RANDOM
    );
    consumer.setErrorHandler(errorContext -> {
      if (errorContext.getContinueErrorCount() >= 5) {
        errorContext.close();
      }
    });
    consumer.consumeEvents(this::batchHandleEvents);
 }

以上错误处理器的逻辑,是SDK默认的错误处理逻辑,意思就是,如果消费时连续出现了5次异常,那么就会终止消费。

如果我们要实现“任其崩溃”的错误处理逻辑,那就可以把5改成1,只要发现了一次错误,那么就终止消费:

    consumer.setErrorHandler(errorContext -> {
      if (errorContext.getContinueErrorCount() >= 1) {
        errorContext.close();
      }
    });

也可以基于异常类型来实现不同的处理策略:

    consumer.setErrorHandler(errorContext -> {
      final Throwable t = errorContext.getThrowable();
      if (t instanceof BigErrorException) {

      } else if (t instanceof NoNeedCareException) {

      }
    });

还可以根据一段时间内发生的错误数目来进行处理,应用对于偶尔的错误是可以容忍的,但如果是一段时间内错误数超过了某个阈值,那么就需要进行处理了:

    final ZhugeKafkaDataReaderOptions readerOptions = new ZhugeKafkaDataReaderOptions(
        "localhost:9092",
        "test-kafka-sdk",
        ZhugeETLTopics.PAY_ZG_TOTAL_RANDOM
    );

    final BasicZhugeDataConsumerOptions consumerOptions = new BasicZhugeDataConsumerOptions();
    consumerOptions.setKeepErrorRecordsNum(10);
    consumerOptions.setErrorRecordTimeUnit(TimeUnit.MINUTES);  // 设置保留最近10分钟的错误记录

    final ZhugeDataConsumer consumer = new ZhugeKafkaConsumer(
        "localhost:9092",
        "test-kafka-sdk",
        ZhugeETLTopics.PAY_ZG_TOTAL_RANDOM
    );
    consumer.setErrorHandler(errorContext -> {
      if (errorContext.getLatestErrorCount(3, ChronoUnit.MINUTES) >= 3) {  // 如果最近三分钟内,出现超过3次错误,则消费终止
        errorContext.close();
      }
    });
    consumer.consumeEvents(
        events -> logger.info(String.format("Events: %s", events)));

需要注意:当使用错误处理器时,我们无需将异常显示的通过日志输出,SDK会确保异常堆栈总是被输出到日志。

5.1 建议的错误处理策略

然而,在实践中,我们应当采用怎样的错误处理策略?建议我们将错误分为两大类来看:

  • 第一类错误是我们可以预知并且能够掌控的,对于这类错误,我们应该在消费者的业务逻辑中对错误进行处理,而不是直接抛给SDK的错误处理器。
  • 第二类错误是我们无法预知,或者即便可以预知,也无法在业务中进行处理的,对于这类错误,我们才应该将其交付给错误处理器进行处理。

举个例子,假设我们要实现一个对新用户发邮件的业务,发送邮件的动作是通过网络调用云服务,在这里面,我们能预知的错误就是网络请求超时,当碰到这种情况,我们希望进行重试来处理,另外,有些邮件地址可能不合法,我们会直接忽略;而对于其它的错误,我们无法预料,如果发生了,我们希望通过错误处理器来判断:如果在一定时间内出现了多次,那么就终止消费,我们根据错误日志再去排查具体的原因:

    final ZhugeKafkaDataReaderOptions readerOptions = new ZhugeKafkaDataReaderOptions(
        "localhost:9092",
        "test-kafka-sdk",
        ZhugeETLTopics.PAY_ZG_TOTAL_RANDOM
    );

    final BasicZhugeDataConsumerOptions consumerOptions = new BasicZhugeDataConsumerOptions();
    consumerOptions.setKeepErrorRecordsNum(10);
    consumerOptions.setErrorRecordTimeUnit(TimeUnit.MINUTES);  // 设置保留最近10分钟的错误记录

    final ZhugeDataConsumer consumer = new ZhugeKafkaConsumer(
        readerOptions,
        consumerOptions
    );
    consumer.setErrorHandler(errorContext -> {
      if (errorContext.getLatestErrorCount(3, ChronoUnit.MINUTES)
          >= 3) {  // 如果最近三分钟内,出现超过3次错误,则消费终止
        errorContext.close();
      }
    });
    consumer.consumeEvent(event -> {
      final String eventName = event.getEventName();
      final String emailAddress = event.getCustomStringProperty("email", "");
      try {
        if ("注册成功".equals(eventName)) {
          sendWelcomeMail(emailAddress);
        }
      } catch (TimeoutException e) {
        retry(emailAddress);  // 如果发送超时,进行重试
      } catch (InvalidEmailAddressException e) {
        logger.error(String.format("Invalid email address: %s", emailAddress));  // 非法的邮件地址,直接略过
      }
    });

在上述示例中,我们对两类错误分别做了处理:对于超时和非法的邮件地址这类可控的错误,我们是在消费者中直接处理的;而对于其它未知的错误,我们将其抛给了错误处理器,SDK将在日志中记下错误的堆栈信息,并且回调错误处理器。

总之,我们最好应当将错误处理器视为一种”兜底“的错误处理策略。

5.2 错误处理与偏移量提交

需要注意的是,错误处理会影响到SDK对偏移量的提交。如果我们设置的偏移量提交是在消费数据之后(即At least once语义),那么当无法被catch的错误产生时,偏移量是不会被提交的,直到消费循环没有错误产生,才会再次提交偏移量。这么设计的主要目的,是为了防止当错误发生时,偏移量被提交,导致数据在业务层面被丢失,但也可能会造成重复消费。但是一般说来,如果选择了At least once,那么往往就意味着业务对重复消费是可以容忍的。

6. 并发模型

SDK的默认都是通过单个消费者线程进行数据处理,并且也建议开发者如果在性能上可以满足需要,也尽量使用单线程来完成业务逻辑。如果确实需要通过并发的方式来对数据进行处理,最好的实践是通过批量消费的视图,然后通过JDK自带的parallelStream()进行:

    final ZhugeDataConsumer consumer = new ZhugeKafkaConsumer(readerOptions);
    consumer.consumeEvents(events -> {
      events.parallelStream().forEach(event -> {
        // 最佳实践
      });
    });
    consumer.join();

严禁直接在消费者逻辑中直接构建线程,或者是不断的将数据处理任务加入到自行构造的线程池而不等待结果:

    final ZhugeDataConsumer consumer = new ZhugeKafkaConsumer(readerOptions);
    consumer.consumeEvents(events -> {
      new Thread(() -> handleEvents(events)).start();  // 很危险的做法
    });
    consumer.join();
    final ZhugeDataConsumer consumer = new ZhugeKafkaConsumer(readerOptions);
    consumer.consumeEvents(events -> {
      myThreadPool.submit(() -> handleEvents(events));  // 很危险的做法
    });
    consumer.join();

以上做法会存在非常严重的隐患:

  1. 如果线程处理数据的速度跟不上消费者数据读取的速度,则极有可能造成线程池的队列快速膨胀以至于发生了OOM,或者创建了过多的线程,或者如果配置了AbortPolicy/DiscardPolicy,线程池会将过多的任务拒绝,导致数据丢失。
  2. 偏移量的提交策略将变得不可控,SDK将难以获知一批数据到底是什么时候被处理完毕的。
  3. 产生线程安全问题,比如不小心访问了非线程安全的公共对象。

建议我们在一个进程中总是使用一个消费者线程,然后根据消费的速度,启动小于等于partition数目的进程,如果需要并发处理数据,建议通过parallelStream来进行。如果必须使用自行构建的线程池,那么需要在提交任务之后,等待所有的Future都执行完毕。

开发者在使用多线程时,请务必小心翼翼。