3.3 Producer 配置

优质
小牛编辑
117浏览
2023-12-01
以下是JAVA生产者的配置:
NameDescriptionTypeDefaultValid ValuesImportance
bootstrap.servers这是一个用于建立初始连接到kafka集群的"主机/端口对"配置列表。不论这个参数配置了哪些服务器来初始化连接,客户端都是会均衡地与集群中的所有服务器建立连接。—配置的服务器清单仅用于初始化连接,以便找到集群中的所有服务器。配置格式: host1:port1,host2:port2,.... 由于这些主机是用于初始化连接,以获得整个集群(集群是会动态变化的),因此这个配置清单不需要包含整个集群的服务器。(当然,为了避免单节点风险,这个清单最好配置多台主机)。listhigh
key.serializer关键字的序列化类,实现以下接口: org.apache.kafka.common.serialization.Serializer 接口。classhigh
value.serializer值的序列化类,实现以下接口: org.apache.kafka.common.serialization.Serializer 接口。classhigh
acks此配置是 Producer 在确认一个请求发送完成之前需要收到的反馈信息的数量。 这个参数是为了保证发送请求的可靠性。以下配置方式是允许的:
  • acks=0 如果设置为0,则 producer 不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成。在这种情况下,服务器是否收到请求是没法保证的,并且参数retries也不会生效(因为客户端无法获得失败信息)。每个记录返回的 offset 总是被设置为-1。
  • acks=1 如果设置为1,leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。在这种情况下,如果 leader 节点在接收记录之后,并且在 follower 节点复制数据完成之前产生错误,则这条记录会丢失。
  • acks=all 如果设置为all,这就意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。只要至少有一个同步副本存在,记录就不会丢失。这种方式是对请求传递的最有效保证。acks=-1与acks=all是等效的。
string1[all, -1, 0, 1]high
buffer.memoryProducer 用来缓冲等待被发送到服务器的记录的总字节数。如果记录发送的速度比发送到服务器的速度快, Producer 就会阻塞,如果阻塞的时间超过 max.block.ms 配置的时长,则会抛出一个异常。

这个配置与 Producer 的可用总内存有一定的对应关系,但并不是完全等价的关系,因为 Producer 的可用内存并不是全部都用来缓存。一些额外的内存可能会用于压缩(如果启用了压缩),以及维护正在运行的请求。

long33554432[0,...]high
compression.typeProducer 生成数据时可使用的压缩类型。默认值是none(即不压缩)。可配置的压缩类型包括:none, gzip, snappy, 或者 lz4 。压缩是针对批处理的所有数据,所以批处理的效果也会影响压缩比(更多的批处理意味着更好的压缩)。stringnonehigh
retries若设置大于0的值,则客户端会将发送失败的记录重新发送,尽管这些记录有可能是暂时性的错误。请注意,这种 retry 与客户端收到错误信息之后重新发送记录并无区别。允许 retries 并且没有设置max.in.flight.requests.per.connection 为1时,记录的顺序可能会被改变。比如:当两个批次都被发送到同一个 partition ,第一个批次发生错误并发生 retries 而第二个批次已经成功,则第二个批次的记录就会先于第一个批次出现。int0[0,...,2147483647]high
ssl.key.passwordkey store 文件中私钥的密码。这对于客户端来说是可选的。passwordnullhigh
ssl.keystore.locationkey store 文件的位置。这对于客户端来说是可选的,可用于客户端的双向身份验证。stringnullhigh
ssl.keystore.passwordkey store 文件的密码。这对于客户端是可选的,只有配置了 ssl.keystore.location 才需要配置该选项。passwordnullhigh
ssl.truststore.locationtrust store 文件的位置。stringnullhigh
ssl.truststore.passwordtrust store 文件的密码。如果一个密码没有设置到 trust store ,这个密码仍然是可用的,但是完整性检查是禁用的。passwordnullhigh
batch.size当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。

当记录的大小超过了配置的字节数, Producer 将不再尝试往批次增加记录。

发送到 broker 的请求会包含多个批次的数据,每个批次对应一个 partition 的可用数据

小的 batch.size 将减少批处理,并且可能会降低吞吐量(如果 batch.size = 0的话将完全禁用批处理)。 很大的 batch.size 可能造成内存浪费,因为我们一般会在 batch.size 的基础上分配一部分缓存以应付额外的记录。

int16384[0,...]medium
client.id发出请求时传递给服务器的 ID 字符串。这样做的目的是为了在服务端的请求日志中能够通过逻辑应用名称来跟踪请求的来源,而不是只能通过IP和端口号跟进。string""medium
connections.max.idle.ms在此配置指定的毫秒数之后,关闭空闲连接。long540000medium
linger.msproducer 会将两个请求发送时间间隔内到达的记录合并到一个单独的批处理请求中。通常只有当记录到达的速度超过了发送的速度时才会出现这种情况。然而,在某些场景下,即使处于可接受的负载下,客户端也希望能减少请求的数量。这个设置是通过添加少量的人为延迟来实现的—即,与其立即发送记录, producer 将等待给定的延迟时间,以便将在等待过程中到达的其他记录能合并到本批次的处理中。这可以认为是与 TCP 中的 Nagle 算法类似。这个设置为批处理的延迟提供了上限:一旦我们接受到记录超过了分区的 batch.size ,Producer 会忽略这个参数,立刻发送数据。但是如果累积的字节数少于 batch.size ,那么我们将在指定的时间内“逗留”(linger),以等待更多的记录出现。这个设置默认为0(即没有延迟)。例如:如果设置linger.ms=5 ,则发送的请求会减少并降低部分负载,但同时会增加5毫秒的延迟。long0[0,...]medium
max.block.ms该配置控制KafkaProducer.send()KafkaProducer.partitionsFor() 允许被阻塞的时长。这些方法可能因为缓冲区满了或者元数据不可用而被阻塞。用户提供的序列化程序或分区程序的阻塞将不会被计算到这个超时。long60000[0,...]medium
max.request.size请求的最大字节数。这个设置将限制 Producer 在单个请求中发送的记录批量的数量,以避免发送巨大的请求。这实际上也等同于批次的最大记录数的限制。请注意,服务器对批次的大小有自己的限制,这可能与此不同。int1048576[0,...]medium
partitioner.class指定计算分区的类,实现 org.apache.kafka.clients.producer.Partitioner 接口。classorg.apache.kafka.clients.producer.internals.DefaultPartitionermedium
receive.buffer.bytes定义读取数据时 TCP 接收缓冲区(SO_RCVBUF)的大小,如果设置为-1,则使用系统默认值。int32768[-1,...]medium
request.timeout.ms客户端等待请求响应的最大时长。如果超时未收到响应,则客户端将在必要时重新发送请求,如果重试的次数达到允许的最大重试次数,则请求失败。这个参数应该比 replica.lag.time.max.ms (Broker 的一个参数)更大,以降低由于不必要的重试而导致的消息重复的可能性。int30000[0,...]medium
sasl.jaas.configSASL 连接使用的 JAAS 登陆上下文参数,以 JAAS 配置文件的格式进行配置。 JAAS 配置文件格式可参考这里。值的格式: ' (=)*;'passwordnullmedium
sasl.kerberos.service.nameKafka 运行时的 Kerberos 主体名称。可以在 Kafka 的 JAAS 配置文件或者 Kafka 的配置文件中配置。stringnullmedium
sasl.mechanism用于客户端连接的 SASL 机制。可以是任意安全可靠的机制。默认是 GSSAPI 机制。stringGSSAPImedium
security.protocol与 brokers 通讯的协议。可配置的值有: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.stringPLAINTEXTmedium
send.buffer.bytes定义发送数据时的 TCP 发送缓冲区(SO_SNDBUF)的大小。如果设置为-1,则使用系统默认值。int131072[-1,...]medium
ssl.enabled.protocols可用于 SSL 连接的协议列表。listTLSv1.2,TLSv1.1,TLSv1medium
ssl.keystore.typekey store 文件的文件格类型。这对于客户端来说是可选的。stringJKSmedium
ssl.protocol用于生成SSLContext的SSL协议。默认设置是TLS,大多数情况下不会有问题。在最近的jvm版本中,允许的值是TLS、tlsv1.1和TLSv1.2。在旧的jvm中可能会支持SSL、SSLv2和SSLv3,但是由于存在已知的安全漏洞,因此不建议使用。stringTLSmedium
ssl.provider用于 SSL 连接security provider 。默认值是当前 JVM 版本的默认 security provider 。stringnullmedium
ssl.truststore.typetrust store 的文件类型。stringJKSmedium
enable.idempotence当设置为true时, Producer 将确保每个消息在 Stream 中只写入一个副本。如果为false,由于 Broker 故障导致 Producer 进行重试之类的情况可能会导致消息重复写入到 Stream 中。请注意,启用幂等性需要确保max.in.flight.requests.per.connection小于或等于5,retries 大于等于0,并且ack必须设置为all 。如果这些值不是由用户明确设置的,那么将自动选择合适的值。如果设置了不兼容的值,则将抛出一个ConfigException的异常。booleanfalselow
interceptor.classes配置 interceptor 类的列表。实现org.apache.kafka.clients.producer.ProducerInterceptor接口之后可以拦截(并可能改变)那些 Producer 还没有发送到 kafka 集群的记录。默认情况下,没有 interceptor 。listnulllow
max.in.flight.requests.per.connection在发生阻塞之前,客户端的一个连接上允许出现未确认请求的最大数量。注意,如果这个设置大于1,并且有失败的发送,则消息可能会由于重试而导致重新排序(如果重试是启用的话)。int5[1,...]low
metadata.max.age.ms刷新元数据的时间间隔,单位毫秒。即使没有发现任何分区的 leadership 发生变更也会强制刷新以便能主动发现新的 Broker 或者新的分区。long300000[0,...]low
metric.reporters用于指标监控报表的类清单。实现org.apache.kafka.common.metrics.MetricsReporter接口之后允许插入能够通知新的创建度量的类。JmxReporter 总是包含在注册的 JMX 统计信息中。list""low
metrics.num.samples计算 metrics 所需要维持的样本数量。int2[1,...]low
metrics.recording.levelmetrics 的最高纪录级别。stringINFO[INFO, DEBUG]low
metrics.sample.window.ms计算 metrics 样本的时间窗口。long30000[0,...]low
reconnect.backoff.max.ms当重新连接到一台多次连接失败的 Broker 时允许等待的最大毫秒数。如果配置该参数,则每台主机的 backoff 将呈指数级增长直到达到配置的最大值。当统计到 backoff 在增长,系统会增加20%的随机波动以避免大量的连接失败。long1000[0,...]low
reconnect.backoff.ms在尝试重新连接到给定的主机之前,需要等待的基本时间。这避免了在一个紧凑的循环中反复连接到同一个主机。这个 backoff 机制应用于所有客户端尝试连接到 Broker 的请求。long50[0,...]low
retry.backoff.ms在尝试将一个失败的请求重试到给定的 topic 分区之前需要等待的时间。这避免在某些失败场景下在紧凑的循环中重复发送请求。long100[0,...]low
sasl.kerberos.kinit.cmdKerberos kinit 命令的路径。string/usr/bin/kinitlow
sasl.kerberos.min.time.before.relogin重新尝试登陆之前,登录线程的休眠时间。long60000low
sasl.kerberos.ticket.renew.jitter随机抖动增加到更新时间的百分比。double0.05low
sasl.kerberos.ticket.renew.window.factor登录线程将持续休眠直到上一次刷新到 ticket 的过期时间窗口,在此时间窗口它将尝试更新 ticket 。double0.8low
ssl.cipher.suites密码套件列表。密码套件是利用 TLS 或 SSL 网络协议来实现网络连接的安全设置,是一个涵盖认证,加密,MAC和密钥交换算法的组合。默认情况下,支持所有可用的密码套件。listnulllow
ssl.endpoint.identification.algorithm使用服务器证书验证服务器主机名的 endpoint 识别算法。stringnulllow
ssl.keymanager.algorithmkey manager factory 用于 SSL 连接的算法。默认值是Java虚拟机配置的 key manager factory 算法。stringSunX509low
ssl.secure.random.implementation用于 SSL 加密操作的 SecureRandom PRNG 实现。stringnulllow
ssl.trustmanager.algorithmtrust manager factory 用于SSL连接的算法。默认值是Java虚拟机配置的 trust manager factory 算法。stringPKIXlow
transaction.timeout.ms主动中止进行中的事务之前,事务协调器等待 Producer 更新事务状态的最长时间(以毫秒为单位)。如果此值大于 Broker 中的 max.transaction.timeout.ms 设置的时长,则请求将失败并提示"InvalidTransactionTimeout"错误。int60000low
transactional.id用于事务交付的 TransactionalId。 这使跨越多个生产者会话的可靠性语义成为可能,因为它可以保证客户在开始任何新的事务之前,使用相同的 TransactionalId 的事务都已经完成。 如果没有提供 TransactionalId ,则 Producer 被限制为幂等递送。 请注意,如果配置了 TransactionalId,则必须启用 enable.idempotence 。 缺省值为空,这意味着无法使用事务。stringnullnon-empty stringlow

如果对老的Scala版本的 Producer 配置感兴趣,请点击 这里.