升级版本
从 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x 升级到1.0.0
Kafka 1.0.0 介绍了通信协议方面的改变。 遵循下面的滚动升级计划,可以保证您在升级过程中不用停机。 在升级之前,请先查看1.0.0版本中显著的变化。
滚动升级计划:
- 更新所有代理上的server.properties 并添加以下属性: CURRENT_KAFKA_VERSION代表指您要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION代表当前正在使用的消息格式版本。 如果您以前重写过消息格式版本,则应保留当前值。或者如果您正从0.11.0.x之前的版本升级,则应将current_message_format_version设置为与current_kafka_version匹配的值。
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0)。
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (请参阅 升级后在性能方面潜在的影响 ,了解有关此配置的详细信息。
如果您从0.11.0.x升级,且没有重写消息格式,那么您只需要覆盖inter-broker协议格式。
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0)。
- 一次升级一个代理:关闭代理,更新代码,重新启动代理。
- 整个群集升级后,通过编辑修改协议版本
inter.broker.protocol.version
并将其设置为1.0。 - 重新启动代理,以使新的协议版本生效。
- 如果您按照上面的指示重写了消息格式版本,则需要再执行一次滚动重启才能将其升级到最新版本。一旦所有(或大部分的)consumer升级到0.11.0或更高版本,请将每个代理上的log.message.format.version更改为1.0,然后逐个重启它们。 请注意,以前的Scala consumer 不支持0.11中新的消息格式,因此为了避免转换中的性能成本(或者使用一次语义),必须使用较新的Java consumer 。
其他升级说明:
- 如果你可以接受停机,那么你可以把所有的broker关闭,更新代码并重启。系统将默认启动新的协议。
- 在升级broker后,可以随时更新协议版本并重启。这不需要在升级broker后立即进行。更新消息格式版本也是如此。
1.0.0 中显著的变化
- 由于功能稳定,所以默认启动删除topic功能。希望保留以前操作的用户请将代理配置
delete.topic.enable
设置为false。请记住,在topic中删除数据的操作是不可逆的(即没有“撤销删除”操作)。 - 对于可以按时间戳搜索的topic,如果找不到分区的偏移量,则会将该分区显示在搜索结果中,并将偏移量值设置为空。在以前的版本中,这类分区不会显示。这种更改是为了使搜索行为与不支持时间戳搜索的topic相一致。
- 如果
inter.broker.protocol.version
是1.0或更高版本,即使存在脱机日志目录,代理也会一直保持联机,并在实时日志目录上提交副本。由硬件故障导致的IOException,日志目录可能会变为脱机状态。用户需要监控每个代理度量标准offlineLogDirectoryCount
来检查是否存在离线日志目录。 - 增加了一个可回溯的异常 KafkaStorageException 。 如果客户端的FetchRequest或ProducerRequest的版本不支持KafkaStorageException,则KafkaStorageException将在响应中转换为NotLeaderForPartitionException。
- -XX:在默认的JVM设置中,+ DisableExplicitGC被-XX:+ ExplicitGCInvokesConcurrent替换。在某些情况下,这有助于避免通过直接缓冲区分配本机内存时出现的内存异常。
- 重写的
handleError
方法已经从以下过时类中除去kafka.api
:包FetchRequest
,GroupCoordinatorRequest
,OffsetCommitRequest
,OffsetFetchRequest
,OffsetRequest
,ProducerRequest
,和TopicMetadataRequest
。这只是为了在代理上使用,但是实际上它已经不再被使用了,实现也没有被维护。只是因为二进制兼容性,保留了一个存根。 - Java客户端和工具现在接受任何字符串作为客户端ID。
kafka-consumer-offset-checker.sh
工具已被弃用。使用kafka-consumer-groups.sh
来得到consumer group 的详细信息- SimpleAclAuthorizer默认将拒绝访问日志记录到授权人日志中。
AuthenticationException
中的一个子类向客户端报告身份验证失败日志。如果客户端连接失败,并不会重新进行验证 。- 自定义
SaslServer
实现可能会向客户端抛出SaslAuthenticationException
来提供有关身份验证失败的错误信息。同时应注意在异常信息中,不要向未授权的客户泄露任何安全方面的关键信息。 - 向JMX提供版本和提交ID 的
app-info
将被弃用,由提供这些属性的metrics(度量)来替换。 - Kafka metrics 现在可能包含非数字值。
org.apache.kafka.common.Metric#value()
已被弃用并返回0.0
以最大限度地减少用户读取每个客户端值时系统断开的概率(用户调用MetricsReporter
或metrics()
来读取)。org.apache.kafka.common.Metric#metricValue()
用来检索数字和非数字的度量值 - 每个 Kafka 速率指标都有相应的累计计数度量标准,带后缀
-total
方便后续处理。 例如,records-consumed-rate
对应的度量标准是records-consumed-total
。 - 当系统属性
kafka_mx4jenable
设置为true
时,Mx4j才会启用。以前它是默认启用的,如果kafka_mx4jenable
设置为true
,则禁用Mx4j。 - 客户端jar 包中的
org.apache.kafka.common.security.auth
包现在是公有的,已被添加到javadocs中。这个包中的内部类已经移到其他地方了。 - 当使用授权且用户对topic没有必备的权限时,broker 返回TOPIC_AUTHORIZATION_FAILED错误表示broker对于已存在的topic无权限。如果用户具有权限但topic不存在,则返回UNKNOWN_TOPIC_OR_PARTITION错误。
- 为新的consumer配置config/consumer.properties 文件中的属性。
新的版本协议
- KIP-112: LeaderAndIsrRequest v1 引入一个分区字段
is_new
。 - KIP-112: UpdateMetadataRequest v4 引入一个分区字段
offline_replicas
。 - KIP-112: MetadataResponse v5 引入一个分区字段
offline_replicas
。 - KIP-112: ProduceResponse v4 引入了错误代码KafkaStorageException。
- KIP-112: FetchResponse v6 引入了错误代码KafkaStorageException。
- KIP-152:添加SaslAuthenticate request来报告身份验证失败。当SaslHandshake request版本大于0,将使用此请求。
升级 1.0.0 Kafka Streams 应用程序
- 将Streams应用程序从0.11.0升级到1.0.0不需要使用代理。Kafka Streams 1.0.0应用程序可以连接到0.11.0,0.10.2和0.10.1的代理(却不能连接到0.10.0代理)。
- 如果您正在监控 streams 指标,则需要更改一下报告和代码中的指标名称,因为传递指标的层次结构已更改。
- 有些公共的API,如
ProcessorContext#schedule()
、Processor#punctuate()
、KStreamBuilder
和TopologyBuilder
正在被新的API取代。我们建议进行相应的代码更改,在升级时这些改变是细微的,因为新的API看起来非常相似。 - 更多详细信息,请参阅 1.0.0版本中Streams API 的变化。 。
从 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x 或 0.10.2.x 升级到 0.11.0.0
Kafka 0.11.0.0 引入了一个新的消息格式版本,在有线协议方面也有变化。 遵循下面的滚动升级计划,可以保证您在升级过程中不用停机。在升级之前,请先查看0.11.0.0版本中显著的变化。
从0.10.2 版本开始,Java客户端(生产者和消费者)已经可以与旧代理进行通信,0.11.0版本客户可以与0.10.0及其以上的代理进行通信。但如果代理版本大于0.10.0,则须先升级Kafka集群中的所有代理,然后再升级客户端。0.11.0版本的代理支持0.8.x及其以上的客户端。
对于滚动升级:
- 更新所有代理上的server.properties并添加以下属性:CURRENT_KAFKA_VERSION指将要升级的版本。CURRENT_MESSAGE_FORMAT_VERSION指当前正在使用的消息格式版本。如果您以前没有重写消息格式,那么应该将CURRENT_MESSAGE_FORMAT_VERSION设置为与CURRENT_KAFKA_VERSION匹配的版本。
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如: 0.8.2,0.9.0,0.10.0,0.10.1 或0.10.2).
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (想了解有关此配置的详细信息,请参阅 升级后潜在的性能影响。)
- 一次升级一个代理:关闭代理,更新代码并重启。
- 整个群集升级后,通过编辑修改协议版本
inter.broker.protocol.version
为0.11.0,但不要更改log.message.format.version
。 - 重启代理,以使新的协议版本生效。
- 一旦所有(或大部分)消费者升级到0.11.0及以上版本,则将每个代理的log.message.format.version更改为0.11.0,然后逐一重启它们。请注意,较低版本的Scala消费者不支持新的消息格式,因此为了避免向下转换的性能成本(或者利用一次语义),必须使用新的Java消费者。
其他升级说明:
- 在升级broker后,可以随时更新协议版本并重启。这不需要在升级broker后立即进行。更新消息格式版本也是如此。
- 在更新全局设置
log.message.format.version
之前,也可以使用主题管理工具(bin/kafka-topics.sh
)在各个topic上启用0.11.0消息格式。 - 如果要从0.10.0之前的版本升级,则在切换到0.11.0之前,不必先将消息格式更新为0.10.0。
升级 0.10.2 Kafka Streams 应用程序
- 将Streams应用程序从 0.10.2 升级到 0.11.0 不需要使用代理。Kafka Streams 0.11.0应用程序可以连接到0.11.0,0.10.2和0.10.1的代理(却不能连接到0.10.0代理)。
- 如果您自定义配置
key.serde
,value.serde
和timestamp.extractor
,建议使用替换的配置参数,因为这些配置已被弃用。 - 更多详细信息,请参阅 0.11.0版本中Streams API 的变化。
0.11.0.0版本中显著的变化
- 现在默认禁用 Unclean leader选择。这一新的默认值有利于耐用性而非可用性。希望保留原有配置的用户可将代理配置
unclean.leader.election.enable
改为为true
。 - 生产者配置
block.on.buffer.full
,metadata.fetch.timeout.ms
和timeout.ms
已被删除。他们在 0.9.0.0版本中就被弃用。 offsets.topic.replication.factor
配置现在被限制由 topic 自动生成。当群集大小不满足复制因子要求时,topic 内部自动生成将失败并返回 GROUP_COORDINATOR_NOT_AVAILABLE 错误。- 快速压缩数据时,为提高压缩率,制造商和代理默认使用的压缩块大小为(2 x 32 KB)而不是1 KB。有报告表明:使用较小的块压缩后,数据占用的空间比使用大的块多50%对于这种情况来说,拥有5000个分区的生产者需要额外的315 MB的JVM堆。
- 同样,使用gzip压缩数据时,生产者和代理会将缓冲区大小设置为8 KB而不是1 KB。gzip的默认值过低(512字节)。
- 代理配置
max.message.bytes
现在指批量消息的大小。之前将其应用于批量压缩的消息,或单独应用于未压缩的消息。批量消息可能只包含单个消息,因此大多数情况下,单个消息的大小只能通过批量格式的上限来控制。不过,消息格式转换有一些微妙的含义(详见 below for more detail)。请注意,代理以前会确保在每个提取请求中至少返回一条消息(无论总分区级别和分区级别的提取大小),但这一行为现在适用于批量消息。 - 默认启用GC日志旋转,详情请参阅KAFKA-3754。
- RecordMetadata,MetricName和Cluster类的构造函数已被删除。
- 通过提供用户header读写访问,新Headers接口增加了对用户header的支持。
- ProducerRecord和ConsumerRecord通过
Headers headers()
方法调用新的Headers API。 - ExtendedSerializer和ExtendedDeserializer接口用来来支持头文件的序列化和反序列化。如果配置的串行器和解串器不是上述类,那么头文件将被忽略。
- 引入了一个新的配置
group.initial.rebalance.delay.ms
,该配置指定时间以毫秒为单位。GroupCoordinator
将延迟初始消费者以实现再平衡。当有新成员加入group时,将根据group.initial.rebalance.delay.ms
的值进行平衡,延迟的时间最高可达max.poll.interval.ms
(默认为3秒)。在开发和测试中,为了不延迟执行的时间,可能需要将其设置为0。 - 在主题请求的元数据不存在时,
org.apache.kafka.common.Cluster#partitionsForTopic
、partitionsForNode
和availablePartitionsForTopic
方法会返回一个空列表,而不是null
(这被认为是不好的做法)。 - Streams API 的配置参数
timestamp.extractor
、key.serde
和value.serde
分别被default.timestamp.extractor
、default.key.serde
和default.value.serde
替代。 - 当实例RetriableCommitFailedException通过提交回调时,如遇Java消费者
commitAsync
API中的偏移提交失败,我们不再公布底层原因。更多详细信息,请参阅KAFKA-5052。
新的版本协议
- KIP-107: FetchRequest v5 引入了一个分区字段
log_start_offset
。 - KIP-107:FetchResponse v5 引入了一个分区字段
log_start_offset
。 - KIP-82:ProduceRequest v3 在消息协议中引入了一组包含
key
字段和value
字段的header
。 - KIP-82:FetchResponse v5 在消息协议中引入了一组包含
key
字段和value
字段的header
。
有关一次语义的注记
在生产者方面,Kafka 0.11.0 支持幂等和事务性能力。幂等传递确保消息在单个生产者的生命周期内仅给特定的主题分区传递一次。事务交付允许生产者给多个分区发送数据,这样所有的消息都会被传递成功或失败。这些功能使Kafka符合“恰好一次语义”。有关这些功能的更多详细信息,请参阅用户指南。下面我们将指出一些有关升级群集过程中的特定注意事项。请注意,启用EoS不是必需的,如未使用,不会影响broker的行为。
- 只有新的Java生产者和消费者支持一次语义。
- 这些功能主要取决于0.11.0的消息格式。在旧版本中使用将导致不被支持的版本错误。
- 事务状态存储在一个新的内部主题
__transaction_state
中。在首次使用事务性请求API时才创建此主题。同样地,消费者偏移主题也有几个配置设置用来控制主题。如transaction.state.log.min.isr
控制主题的最小ISR。请参阅用户指南中的配置部分以获取完整的选项列表。 - 对于安全集群,事务性API需要新的ACL,它可以被
bin/kafka-acls.sh
工具打开。 - Kafka的EoS引入了新的请求API,并修改了几个现有的API。更多详细信息,请参阅 KIP-98
有关0.11.0中新消息格式的说明
为了更好地支持生产者的交付语义(见KIP-98)以及提升复制容错能力(参见KIP-101),0.11.0消息格式增强了几个主要的功能。虽然新格式包含了更多信息以实现这些改进,但我们已经使批处理格式更有效率。只要每批消息的数量大于2,就可以降低整体开销。然而,对于单批次,可能会有一些轻微的性能影响。请参阅这里 以便了解我们对新消息格式初始性能的分析结果。您也可以在KIP-98方案中找到更多有关信息格式的细节。
新消息格式中,一个显著的差异是:未压缩的消息会被存储为一个批次。这会对代理配置max.message.bytes
(它限制单个批次的大小)有一些影响。首先,如果一个旧版的客户端使用旧格式生产消息到主题分区,且每个消息都比max.message.bytes
小,那么通过上述转换,合并成单批次后,代理仍可能会拒绝它们。通常,这可能发生在单个消息的聚合大小大于max.message.bytes
时。旧的消费者阅读从新格式转换来的消息时也有类似的影响:如果提取大小未被设置为 max.message.bytes
,即使单个未压缩的消息小于已配置的获取大小,消费者也可能无法取得进展。此行为不影响0.10.1.0及更高版本的Java客户端,因为它的获取协议是新的,该协议保证即使超过获取大小也能返回至少一条消息。为了解决这些问题,你应该确保:1)生产者的批量大小没有大于max.message.bytes
,并且2)消费者的获取大小为max.message.bytes
。
大多数关于升级到0.10.0消息格式对性能影响的讨论,仍然与0.11.0升级有关。这主要影响不使用TLS保护的群集,因为在这种情况下“零复制”传输是不可行的。为了避免下变换的成本,您应确保客户应用程序升级到最新的0.11.0版本。值得注意的是,由于旧消费者在0.11.0.0已经被弃用,它不支持新的消息格式。您必须升级才能使用新消费者及新的消息格式,这不需要下转换的成本。请注意,0.11.0消费者向后兼容0.10.0及更高版本的代理,所以可以先在升级代理之前升级客户端。
从0.8.x,0.9.x,0.10.0.x或0.10.1.x升级到0.10.2.0
0.10.2.0在线协议方面有变化。遵循下面的滚动升级计划,可以保证您在升级过程中不用停机。请在升级之前,请查看0.10.2.0中的显著更改。
从0.10.2版本开始,Java客户端(生产者和消费者)获得了与旧代理进行通信的能力。0.10.2版本客户端可以与0.10.0版本及其以上的代理进行通信。但是,如果您的代理低于0.10.0版本,则必须先升级Kafka集群中的所有代理,然后再升级您的客户端。0.10.2版代理支持0.8.x及以上的客户端。
对于滚动升级:
- 更新所有代理上的server.properties文件并添加以下属性:
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION 例如 0.8.2, 0.9.0, 0.10.0 或 0.10.1)。
- log.message.format.version=CURRENT_KAFKA_VERSION (请参阅升级后潜在的性能影响 ,了解有关此配置的详细信息。)
- 一次升级一个代理:关闭代理,更新代码并重启。
- 整个群集升级完后,设置inter.broker.protocol.version为0.10.2来改变协议版本。
- 如果以前的消息格式是0.10.0,则将log.message.format.version更改为0.10.2(对于0.10.0,0.10.1和0.10.2而言消息格式是相同的)。如果以前的消息格式版本低于0.10.0,则不要更改log.message.format.version - 只有在所有使用者都升级到0.10.0.0或更高版本后,才应更改此参数。
- 重启代理,以使新的协议版本生效。
- 如果此时log.message.format.version仍然低于0.10.0,请等到所有使用者都升级到0.10.0及以上版本,然后将每个代理的log.message.format.version改为0.10.2,逐一重启。
注意:如果你可以接受停机,那么你可以把所有的broker关闭,更新代码并重启。系统将默认启动新的协议。
注意:在升级broker后,可以随时更新协议版本并重启。这不需要在升级broker后立即进行。
升级0.10.1 Kafka Streams应用程序
- 将Streams应用程序从0.10.1升级到0.10.2不需要升级代理。Kafka Streams 0.10.2应用程序可以连接到0.10.2和0.10.1代理(但是不能连接到0.10.0代理)。
- 你需要重新编译你的代码。只交换Kafka Streams库的jar文件将不起作用,并会破坏您的应用程序。
- 如果您使用自定义的(即用户实现的)时间戳提取器,则需要更新代码,因为
TimestampExtractor
界面已更改。 - 如果您使用自定义指标,则需要更新代码,因为
StreamsMetric
界面已更改。 - 请参阅0.10.2中Streams API的更改以获取更多详细信息。
0.10.2.1中的显著变化
- StreamsConfig类中两个配置的默认值已更改,以提高Kafka Streams应用程序的灵活性。Kafka Streams 消费者中
retries
的默认值从0更改为10,max.poll.interval.ms
的默认值从300000改为Integer.MAX_VALUE
。
0.10.2.0中的显著变化
- Java客户端(生产者和消费者)可以与旧版本代理通信。0.10.2版本客户端可以与0.10.0及其以上版本的代理进行通信。请注意,使用旧代理时某些功能可能不可用或受限。
- 如果调用线程中断,Java消费者的几个方法可能会抛出
InterruptException
。请参阅KafkaConsumer
Javadoc以获得更深入的解释。 - Java消费者现在会优雅地关闭。默认情况下,消费者最多等待30秒以完成待处理的请求。已添加了一个新的超时关闭API
KafkaConsumer
来控制最大等待时间。 - 用逗号分隔的多个正则表达式可以通过新的Java消费者中--whitelist选项传递给MirrorMaker。当使用旧的Scala消费者时,这行为与MirrorMaker一致。
- 将Streams应用程序从0.10.1升级到0.10.2不需要升级代理。Kafka Streams 0.10.2应用程序可以连接到0.10.2和0.10.1代理(但不能连接到0.10.0代理)。
- 已从Streams API中删除Zookeeper依赖项。Streams API现在使用Kafka协议来管理内部主题,而不是通过直接修改Zookeeper。这消除了直接访问Zookeeper的权限,也不必在Streams应用中设置“StreamsConfig.ZOOKEEPER_CONFIG”了。如果Kafka集群被保护,Streams应用程序必须具有所需的安全权限才能创建新主题。
- StreamsConfig类添加了几个新的字段,包括"security.protocol", "connections.max.idle.ms", "retry.backoff.ms", "reconnect.backoff.ms" 和 "request.timeout.ms"。用户应注意默认值,并根据需要进行设置。欲了解更多详情,请参阅3.5 Kafka Streams 配置。
新版本协议
- KIP-88:如果
topics
数组设置为null
, OffsetFetchRequest v2 将支持检索所有主题的偏移量, - KIP-88: OffsetFetchResponse v2 引入了一个顶级字段
error_code
。 - KIP-103:UpdateMetadataRequest v3 引入一个
listener_name
字段作为end_points
数组的元素。 - KIP-108: CreateTopicsRequest v1引入
validate_only
字段。 - KIP-108:CreateTopicsResponse v1 引入
error_message
字段作为topic_errors
数组的元素。
从0.8.x,0.9.x或0.10.0.X升级到0.10.1.0
0.10.1.0 在线协议方面有变化。遵循下面的滚动升级计划,可以保证您在升级过程中不用停机。请在升级之前,请查看0.10.1.0中的显著更改。
注意:由于引入了新协议,在升级客户端之前先升级您的Kafka群集是非常重要的(即,0.10.1.x客户端仅支持0.10.1.x或更高版本的代理,而0.10.1.x代理支持较旧的客户端)。
对于滚动升级:
- 更新所有代理上的server.properties文件并添加以下属性:
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如 0.8.2.0, 0.9.0.0 或 0.10.0.0)。
- log.message.format.version=CURRENT_KAFKA_VERSION (请参阅升级后潜在的性能影响,了解有关此配置的详细信息。)
- 一次升级一个代理:关闭代理,更新代码并重启。
- 整个群集升级完后,设置 inter.broker.protocol.version为0.10.1.0来改变协议版本。
- 如果以前的消息格式是0.10.0,则将log.message.format.version更改为0.10.1(对于0.10.0和0.10.1而言消息格式是相同的)。如果以前的消息格式版本低于0.10.0,则不要更改log.message.format.version - 只有在所有使用者都升级到0.10.0.0或更高版本后,才应更改此参数。
- 重启代理,以使新的协议版本生效。
- 如果此时log.message.format.version仍然低于0.10.0,请等到所有使用者都升级到0.10.0及以上版本,然后将每个代理的log.message.format.version改为0.10.1,并逐一重启。
注意:如果你可以接受停机,那么你可以把所有的broker关闭,更新代码并重启。系统将默认启动新的协议。
注意:在升级broker后,可以随时更新协议版本并重启。这不需要在升级broker后立即进行。
0.10.1.0中潜在的突出变化
- 日志保留时间不再基于日志段的上次修改时间。相反,它将基于日志段消息的最大时间戳。
- 日志滚动时间不再取决于日志段创建时间。相反,它现在基于消息中的时间戳。进一步来说,如果段中第一条消息的时间戳是T,则当新消息的时间戳大于或等于T + log.roll.ms时,日志滚动。
- 由于每个段添加了时间索引文件,0.10.0的开放文件处理程序将增加33%。
- 时间索引和偏移索引共享相同的索引大小配置。由于每次索引条目是偏移索引大小的1.5倍,用户可能需要增加log.index.size.max.bytes以避免可能的日志频繁滚动。
- 由于索引文件数量的增加,在一些具有大量日志段(如大于15K)的代理中,代理启动期间的日志加载过程可能会更长。根据我们的实验,将num.recovery.threads.per.data.dir设置为1可能会减少日志加载时间。
升级0.10.0 Kafka Streams应用程序
- 将Streams应用程序从0.10.0升级到0.10.1需要代理升级,因为Kafka Streams 0.10.1应用程序只能连接到0.10.1代理。
- 有几个API有变化且不向后兼容(更多的细节请参考0.10.1中Streams API的变化)。因此,您需要更新并重新编译您的代码。只交换Kafka Streams库的jar文件将不起作用,并会破坏您的应用程序。
0.10.1.0中的显著变化
- 新的Java消费者已经通过测试阶段,我们推荐使用它来开发。以前的Scala消费者仍然支持,但是在下一个版本中它们将被弃用,并会在以后的版本中删除。
--new-consumer
/--new.consumer
转换已不再需要MirrorMaker和消费者控制台之类的工具。只需要通过Kafka broker来连接,而不必使用ZooKeeper。此外,旧消费者控制台已被弃用,并将在以后的版本中删除。- Kafka群集现在可以由群集ID唯一标识。当代理升级到0.10.1.0后,它会自动生成。群集ID可通过设置kafka.server:type = KafkaServer,name = ClusterId来获得,它是元数据响应的一部分。通过ClusterResourceListener接口,串行器,客户端拦截器和度量记录器可以接收集群ID。
- BrokerState "RunningAsController" (value 4)已被删除。由于一个bug,代理在转换出来之前会一直处于这个状态,因此此时移除该BrokerState的影响是最小的。推荐用于检测指定代理是否为控制器的方法是使用kafka.controller:type = KafkaController,name = ActiveControllerCount指标。
- 新的Java消费者允许用户通过分区上的时间戳搜索偏移量。
- 新的Java消费者可以获得后台线程的状态。新的配置
max.poll.interval.ms
可以控制用户主动离开组前轮询调用的最大时间(默认为5分钟)(在用户主动离开组前)。配置request.timeout.ms
的值必须总是大于max.poll.interval.ms
,因为这是JoinGroup请求平衡服务器消费的最大时间,所以我们已经将其默认值更改为5分钟以上。session.timeout.ms
的默认值已经调整到10秒,max.poll.records
的默认值已经改为500。 - 当使用授权并且用户没有相关主题的授权时,代理不再返回TOPIC_AUTHORIZATION_FAILED错误,因为这会泄漏主题名称。相反会返回UNKNOWN_TOPIC_OR_PARTITION错误。这可能会导致在使用生产者和消费者时出现意外超时或延迟,因为Kafka客户端在出现未知主题错误时会自动重试。如果您担心这种情况发生,可以查看客户端日志。
- 提取响应的默认大小是固定的(消费者为50 MB,复制为10 MB)。现有分区的限制也适用(消费者和复制均为1 MB)。请注意,这些限制都不是以后的绝对最大值。
- 如果发现大于响应/分区大小限制的消息,则消费者和副本仍然可以进行。更具体地说,如果提取的第一个非空分区中第一条消息大于一个或两个限制,则该消息仍会被返回。
- 重载构造函数被添加到
kafka.api.FetchRequest
和kafka.javaapi.FetchRequest
且可以让调用者指定分区的顺序(因为在v3中顺序是重要的)。原先的构造函数已被弃用,在发送请求之前将对分区进行洗牌以避免饥饿问题。
新的版本协议
- ListOffsetRequest v1支持基于时间戳的精确偏移搜索。
- MetadataResponse v2 引入了一个新的字段: "cluster_id"。
- FetchRequest v3可以限制响应大小(除已存在的分区限制外),如果请求进行更改,则返回大于限制的消息,而且分区的顺序是很重要的。
- JoinGroup v1 引入了一个新的字段:"rebalance_timeout"。
从0.8.x或0.9.x升级到0.10.0.0
0.10.0.0 版本有 潜在的重大变化(请在升级之前查看)并可能有 升级后的性能影响。请遵循以下建议的滚动升级计划,可确保在升级过程中和升级完成后不会出现停机和性能影响。
注意:由于引入了新的协议,在升级客户端之前,升级您的Kafka集群是非常重要的。 针对0.9.0.0版本客户的说明:由于0.9.0.0版本中的一个错误,依赖于ZooKeeper的客户端(使用旧Scala高级Consumer和MirrorMaker的客户端)不能与0.10.0.x代理一起工作。因此,在将代理升级到0.10.0.x 之前,应将0.9.0.0客户端升级到0.9.0.1。对于0.8.X或0.9.0.1版本,这一步不是必需的。
对于滚动升级:
- 更新所有代理上的server.properties文件并添加以下属性:
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如 0.8.2 或0.9.0.0).
- log.message.format.version=CURRENT_KAFKA_VERSION (请参阅升级后潜在的性能影响,以了解有关此配置的详细信息。)
- 升级代理:关闭代理,更新代码,重启。
- 整个群集升级完后,设置inter.broker.protocol.version为0.10.0.0来改变协议版本。注意:您不应该编辑log.message.format.version - 只有当所有使用者都升级到0.10.0.0后,该参数才能被更改。
- 重启代理,以使新的协议版本生效。
- 如果所有使用者都升级到0.10.0,更改每个代理的log.message.format.version为0.10.0,然后逐个重启。
注意: 如果你可以接受停机,那么你可以把所有的broker关闭,更新代码并重启。系统将默认启动新的协议。
注意: 在升级broker后,可以随时更新协议版本并重启。这不需要在升级broker后立即进行。
在升级到0.10.0.0后,可能会对性能造成的影响
0.10.0中的消息格式包含一个新的时间戳字段,并使用压缩消息的相对偏移量。磁盘上的消息格式可以通过server.properties文件中的log.message.format.version来配置。磁盘上默认的消息格式是0.10.0。如果消费者客户端使用的是0.10.0.0之前的版本,则只能适用0.10.0之前的消息格式。在这种情况下,代理可以将消息从0.10.0格式转换为较早的格式,然后将响应发送给旧版本的消费者。但在这种情况下,代理不能使用零复制转移。Kafka社区中关于性能影响的报告显示,在升级之后,CPU使用率从20%上升到100%,这会迫使所有客户端立即升级,以使性能恢复正常。为避免这种消息转换,可以在代理升级到0.10.0.0时,将log.message.format.version设置为0.8.2或0.9.0。这样,代理仍可使用零拷贝将数据发送给以前的消费者。消费者升级之后,可以将代理上的消息格式更改为0.10.0,就可以使用新时间戳和改进压缩的新消息格式。Kafka支持这一转换以确保兼容性也有利于一些尚未更新到较新客户端的应用程序。但即使在过度配置的群集上也不支持所有消费者间的通信。因此,在代理升级后,尽量避免信息转换是非常重要的,但大多数客户端还没有。
对于升级到0.10.0.0的客户端,不会影响性能。
注意:通过设置消息格式版本,可以让所有现有消息都在该版本及其以下。且10.0.0之前的消费者可能会中断。尤其是,把消息格式设置为0.10.0后,不应将其更改回以前的格式,因为它可能会中断0.10.0.0版本前的使用者。
注意:由于在每条消息中额外地引入了时间戳,由于增加开销,发送小消息的生产者可能会产生消息吞吐量下降。同样,复制里现在每个消息额外传输8个字节。如果您运行的集群接近网络容量,则可能会压垮网卡,并由过载而导致故障和性能问题。
注意:如果您在生产者上启用了压缩,您可能会注意到某些情况下,生产者吞吐量或代理的压缩率会降低。在接收压缩消息时,0.10.0的代理会避免重新压缩消息,通常会减少延迟并提高吞吐量。然而,在某些情况下,这可能会减少生产者的批量大小,导致更差的吞吐量。如果发生这种情况,用户可以调整生产者的linger.ms和batch.size以获得更好的吞吐量。另外,如果用snappy压缩消息的生产者缓冲区比代理缓冲区小,可能会不利于磁盘消息的压缩率。我们打算在更高版本的Kafka中进行配置。
0.10.0.0中潜在的重大更改
- 从Kafka 0.10.0.0开始,Kafka中的消息格式版本代表Kafka版本。例如,消息格式0.9.0指的是Kafka 0.9.0支持的最高消息版本。
- 已经引入消息格式0.10.0,并默认使用。它包含消息中的时间戳字段及用于压缩消息的相对偏移量。
- ProduceRequest / Response v2被引入,默认支持消息格式0.10.0
- FetchRequest / Response v2被引入,默认支持消息格式0.10.0
- MessageFormatter接口从
def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
改为def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)
- MessageReader接口从
def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]
改为def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]
- MessageFormatter的包从
kafka.tools
改为kafka.common
- MessageReader的包从
kafka.tools
改为kafka.common
- MirrorMakerMessageHandler不再公开
handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])
方法,因为它从未被调用过。 - 0.7 KafkaMigrationTool不再与Kafka打包在一起。如果您需要从0.7迁移到0.10.0,请先迁移到0.8,然后按照升级过程从0.8升级到0.10.0。
- 新的消费者已经标准化它的API,接收
java.util.Collection
序列类型作为方法的参数。现有的代码可能需要更新才能使用0.10.0客户端库。 - LZ4压缩的消息使用可互操作的帧规范(LZ4f v1.5.1)进行处理。为了保持与旧客户端的兼容性,此更改仅适用于0.10.0及更高版本。使用v0 / v1(消息格式0.9.0)生成/获取LZ4压缩消息的客户端应该继续使用0.9.0成帧实现。使用Produce / Fetch协议v2或更高版本的客户端应使用可互操作的LZ4f成帧。可在http://www.lz4.org/上找到可互操作的LZ4库列表。
0.10.0.0中的显著变化
- 从Kafka 0.10.0.0开始,一个名为Kafka Streams的新客户端库被用于对存储在Kafka主题中的数据进行流处理。由于上面提到的消息格式更改,这个新的客户端库仅适用于0.10.x及其以上版本的代理。欲了解更多信息,请阅读 Streams 文档。
- 新用户的配置参数receive.buffer.bytes默认为64K。
- 新的使用者公开配置参数
exclude.internal.topics
以防止内部主题(例如消费者偏移主题)意外地被符合正则表达式的订阅源订阅。默认情况下,该设置启用。 - 旧的Scala生产者已被弃用。用户应尽快将其代码迁移到kafka-clients JAR中的Java生产者。
- 新的消费者API已稳定。
从0.8.0,0.8.1.X或0.8.2.X升级到0.9.0.0
0.9.0.0有潜在的重大变化(请在升级之前查看),而且代理协议也有所改变。这意味着升级的代理和客户端可能与旧版本不兼容。在升级客户端之前升级您的Kafka集群是非常重要的。如果您正在使用MirrorMaker,则应先升级下游群集。
对于滚动升级:
- 更新所有代理上的server.properties文件并添加以下属性:inter.broker.protocol.version=0.8.2.X
- 升级代理。这可以通过将其关闭,更新代码并重启来完成。
- 一旦整个群集升级完毕,将inter.broker.protocol.version设置为0.9.0.0来改变协议版本。
- 重启代理,以使新的协议版本生效
注意:如果你可以接受停机,那么你可以把所有的broker关闭,更新代码并重启。系统将默认启动新的协议。
注意:在升级broker后,可以随时更新协议版本并重启。这不需要在升级broker后立即进行。
0.9.0.0中潜在的重大变化
- 不再支持Java 1.6。
- 不再支持Scala 2.9。
- 现在超过1000的Broker IDs默认自动分配broker IDs并保留。如果您的群集具有高于该阈值的现有代理ID,请确保添加代理配置属性reserved.broker.max.id。
- 删除配置参数replica.lag.max.messages。在决定哪些副本同步时,分区leaders不再考虑滞后消息的数量。
- 现在,配置参数replica.lag.time.max.ms不仅指自上次从副本获取请求后经过的时间,还指最后一次抓取副本的时间。副本仍从leaders 获取最新消息却没有赶上replica.lag.time.max.ms时,将被视为不同步。
- 被压缩的topics不接受没有密钥的消息,如果尝试这样做,生产者会抛出异常。在0.8.x中,没有密钥的消息会导致日志压缩线程出错并退出(并停止压缩所有被压缩的主题)。
- MirrorMaker不再支持多目标群集。因此它只接受一个--consumer.config参数。要镜像多个源群集,每个群集至少需要一个MirrorMaker实例并有自己的消费者配置。
- 在org.apache.kafka.clients.tools.*中的打包工具已被移至org.apache.kafka.tools.*。所有包含的脚本仍照常运行,只有直接导入这些类的自定义代码才会受到影响。
- kafka-run-class.sh中更改了默认的Kafka JVM性能选项(KAFKA_JVM_PERFORMANCE_OPTS)。
- kafka-topics.sh脚本(kafka.admin.TopicCommand)现在退出时返回非0。
- 当在主题名字中使用'.' 或 '_'而导致风险度量标准冲突及实际碰撞冲突时,kafka-topics.sh脚本(kafka.admin.TopicCommand)将显示警告。
- kafka-console-producer.sh脚本(kafka.tools.ConsoleProducer)默认使用Java生产者而不是旧的Scala生产者,用户须在“old-producer”中指定使用旧的生产者。
- 默认情况下,所有命令行工具将打印一切日志消息到stderr而不是stdout。
0.9.0.1中的显著变化
- 将broker.id.generation.enable设置为false可以禁用新的代理ID生成功能。
- 配置参数log.cleaner.enable默认为true。这意味着cleanup.policy = compact的主题默认被压缩,根据log.cleaner.dedupe.buffer.size,128 MB的堆将被分配给清理进程。您可以根据您使用的压缩主题来查看log.cleaner.dedupe.buffer.size和其他log.cleaner配置值。
- 新用户的配置参数fetch.min.bytes默认为1。
0.9.0.0中弃用的部分
- <li不建议使用kafka-topics.sh脚本(kafka.admin.topiccommand)更改主题配置。今后,请使用kafka-configs.sh脚本(kafka.admin.configcommand)来实现此功能。< li="">
- kafka-consumer-offset-checker.sh(kafka.tools.ConsumerOffsetChecker)已被弃用。今后,请使用kafka-consumer-groups.sh(kafka.admin.ConsumerGroupCommand)来实现此功能。
- kafka.tools.ProducerPerformance类已被弃用。今后,请使用org.apache.kafka.tools.ProducerPerformance来实现此功能(kafka-producer-perf-test.sh也使用新类)。
- 生产者配置block.on.buffer.full已被弃用,并会在未来的版本中删除。目前其默认值为false。KafkaProducer不再抛出BufferExhaustedException,而是使用max.block.ms值并抛出一个TimeoutException。如果block.on.buffer.full属性被设置为true,则会将max.block.ms设置为Long.MAX_VALUE,且不遵守metadata.fetch.timeout.ms
</li不建议使用kafka-topics.sh脚本(kafka.admin.topiccommand)更改主题配置。今后,请使用kafka-configs.sh脚本(kafka.admin.configcommand)来实现此功能。<>
从0.8.1升级到0.8.2
0.8.2与0.8.1完全兼容。升级代理可以通过将其关闭,更新代码并重启来完成。
从0.8.0升级到0.8.1
0.8.1与0.8完全兼容。0.8.1与0.8完全兼容。
从0.7升级
0.7版本与新版本不兼容。新版本对API、ZooKeeper数据结构、协议以及配置进行了重大更改,以便添加副本(在0.7中没有)。从0.7升级到更高版本需要专门的迁移工具。可以在不停机的情况下完成迁移。