3.3 Producer 配置
优质
小牛编辑
135浏览
2023-12-01
以下是JAVA生产者的配置:
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
bootstrap.servers | 这是一个用于建立初始连接到kafka集群的"主机/端口对"配置列表。不论这个参数配置了哪些服务器来初始化连接,客户端都是会均衡地与集群中的所有服务器建立连接。—配置的服务器清单仅用于初始化连接,以便找到集群中的所有服务器。配置格式: host1:port1,host2:port2,... . 由于这些主机是用于初始化连接,以获得整个集群(集群是会动态变化的),因此这个配置清单不需要包含整个集群的服务器。(当然,为了避免单节点风险,这个清单最好配置多台主机)。 | list | high | ||
key.serializer | 关键字的序列化类,实现以下接口: org.apache.kafka.common.serialization.Serializer 接口。 | class | high | ||
value.serializer | 值的序列化类,实现以下接口: org.apache.kafka.common.serialization.Serializer 接口。 | class | high | ||
acks | 此配置是 Producer 在确认一个请求发送完成之前需要收到的反馈信息的数量。 这个参数是为了保证发送请求的可靠性。以下配置方式是允许的:
| string | 1 | [all, -1, 0, 1] | high |
buffer.memory | Producer 用来缓冲等待被发送到服务器的记录的总字节数。如果记录发送的速度比发送到服务器的速度快, Producer 就会阻塞,如果阻塞的时间超过 max.block.ms 配置的时长,则会抛出一个异常。这个配置与 Producer 的可用总内存有一定的对应关系,但并不是完全等价的关系,因为 Producer 的可用内存并不是全部都用来缓存。一些额外的内存可能会用于压缩(如果启用了压缩),以及维护正在运行的请求。 | long | 33554432 | [0,...] | high |
compression.type | Producer 生成数据时可使用的压缩类型。默认值是none(即不压缩)。可配置的压缩类型包括:none , gzip , snappy , 或者 lz4 。压缩是针对批处理的所有数据,所以批处理的效果也会影响压缩比(更多的批处理意味着更好的压缩)。 | string | none | high | |
retries | 若设置大于0的值,则客户端会将发送失败的记录重新发送,尽管这些记录有可能是暂时性的错误。请注意,这种 retry 与客户端收到错误信息之后重新发送记录并无区别。允许 retries 并且没有设置max.in.flight.requests.per.connection 为1时,记录的顺序可能会被改变。比如:当两个批次都被发送到同一个 partition ,第一个批次发生错误并发生 retries 而第二个批次已经成功,则第二个批次的记录就会先于第一个批次出现。 | int | 0 | [0,...,2147483647] | high |
ssl.key.password | key store 文件中私钥的密码。这对于客户端来说是可选的。 | password | null | high | |
ssl.keystore.location | key store 文件的位置。这对于客户端来说是可选的,可用于客户端的双向身份验证。 | string | null | high | |
ssl.keystore.password | key store 文件的密码。这对于客户端是可选的,只有配置了 ssl.keystore.location 才需要配置该选项。 | password | null | high | |
ssl.truststore.location | trust store 文件的位置。 | string | null | high | |
ssl.truststore.password | trust store 文件的密码。如果一个密码没有设置到 trust store ,这个密码仍然是可用的,但是完整性检查是禁用的。 | password | null | high | |
batch.size | 当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。 当记录的大小超过了配置的字节数, Producer 将不再尝试往批次增加记录。 发送到 broker 的请求会包含多个批次的数据,每个批次对应一个 partition 的可用数据 小的 batch.size 将减少批处理,并且可能会降低吞吐量(如果 batch.size = 0的话将完全禁用批处理)。 很大的 batch.size 可能造成内存浪费,因为我们一般会在 batch.size 的基础上分配一部分缓存以应付额外的记录。 | int | 16384 | [0,...] | medium |
client.id | 发出请求时传递给服务器的 ID 字符串。这样做的目的是为了在服务端的请求日志中能够通过逻辑应用名称来跟踪请求的来源,而不是只能通过IP和端口号跟进。 | string | "" | medium | |
connections.max.idle.ms | 在此配置指定的毫秒数之后,关闭空闲连接。 | long | 540000 | medium | |
linger.ms | producer 会将两个请求发送时间间隔内到达的记录合并到一个单独的批处理请求中。通常只有当记录到达的速度超过了发送的速度时才会出现这种情况。然而,在某些场景下,即使处于可接受的负载下,客户端也希望能减少请求的数量。这个设置是通过添加少量的人为延迟来实现的—即,与其立即发送记录, producer 将等待给定的延迟时间,以便将在等待过程中到达的其他记录能合并到本批次的处理中。这可以认为是与 TCP 中的 Nagle 算法类似。这个设置为批处理的延迟提供了上限:一旦我们接受到记录超过了分区的 batch.size ,Producer 会忽略这个参数,立刻发送数据。但是如果累积的字节数少于 batch.size ,那么我们将在指定的时间内“逗留”(linger),以等待更多的记录出现。这个设置默认为0(即没有延迟)。例如:如果设置linger.ms=5 ,则发送的请求会减少并降低部分负载,但同时会增加5毫秒的延迟。 | long | 0 | [0,...] | medium |
max.block.ms | 该配置控制KafkaProducer.send() 和KafkaProducer.partitionsFor() 允许被阻塞的时长。这些方法可能因为缓冲区满了或者元数据不可用而被阻塞。用户提供的序列化程序或分区程序的阻塞将不会被计算到这个超时。 | long | 60000 | [0,...] | medium |
max.request.size | 请求的最大字节数。这个设置将限制 Producer 在单个请求中发送的记录批量的数量,以避免发送巨大的请求。这实际上也等同于批次的最大记录数的限制。请注意,服务器对批次的大小有自己的限制,这可能与此不同。 | int | 1048576 | [0,...] | medium |
partitioner.class | 指定计算分区的类,实现 org.apache.kafka.clients.producer.Partitioner 接口。 | class | org.apache.kafka.clients.producer.internals.DefaultPartitioner | medium | |
receive.buffer.bytes | 定义读取数据时 TCP 接收缓冲区(SO_RCVBUF)的大小,如果设置为-1,则使用系统默认值。 | int | 32768 | [-1,...] | medium |
request.timeout.ms | 客户端等待请求响应的最大时长。如果超时未收到响应,则客户端将在必要时重新发送请求,如果重试的次数达到允许的最大重试次数,则请求失败。这个参数应该比 replica.lag.time.max.ms (Broker 的一个参数)更大,以降低由于不必要的重试而导致的消息重复的可能性。 | int | 30000 | [0,...] | medium |
sasl.jaas.config | SASL 连接使用的 JAAS 登陆上下文参数,以 JAAS 配置文件的格式进行配置。 JAAS 配置文件格式可参考这里。值的格式: ' (=)*;' | password | null | medium | |
sasl.kerberos.service.name | Kafka 运行时的 Kerberos 主体名称。可以在 Kafka 的 JAAS 配置文件或者 Kafka 的配置文件中配置。 | string | null | medium | |
sasl.mechanism | 用于客户端连接的 SASL 机制。可以是任意安全可靠的机制。默认是 GSSAPI 机制。 | string | GSSAPI | medium | |
security.protocol | 与 brokers 通讯的协议。可配置的值有: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. | string | PLAINTEXT | medium | |
send.buffer.bytes | 定义发送数据时的 TCP 发送缓冲区(SO_SNDBUF)的大小。如果设置为-1,则使用系统默认值。 | int | 131072 | [-1,...] | medium |
ssl.enabled.protocols | 可用于 SSL 连接的协议列表。 | list | TLSv1.2,TLSv1.1,TLSv1 | medium | |
ssl.keystore.type | key store 文件的文件格类型。这对于客户端来说是可选的。 | string | JKS | medium | |
ssl.protocol | 用于生成SSLContext的SSL协议。默认设置是TLS,大多数情况下不会有问题。在最近的jvm版本中,允许的值是TLS、tlsv1.1和TLSv1.2。在旧的jvm中可能会支持SSL、SSLv2和SSLv3,但是由于存在已知的安全漏洞,因此不建议使用。 | string | TLS | medium | |
ssl.provider | 用于 SSL 连接security provider 。默认值是当前 JVM 版本的默认 security provider 。 | string | null | medium | |
ssl.truststore.type | trust store 的文件类型。 | string | JKS | medium | |
enable.idempotence | 当设置为true时, Producer 将确保每个消息在 Stream 中只写入一个副本。如果为false,由于 Broker 故障导致 Producer 进行重试之类的情况可能会导致消息重复写入到 Stream 中。请注意,启用幂等性需要确保max.in.flight.requests.per.connection 小于或等于5,retries 大于等于0,并且ack 必须设置为all 。如果这些值不是由用户明确设置的,那么将自动选择合适的值。如果设置了不兼容的值,则将抛出一个ConfigException的异常。 | boolean | false | low | |
interceptor.classes | 配置 interceptor 类的列表。实现org.apache.kafka.clients.producer.ProducerInterceptor 接口之后可以拦截(并可能改变)那些 Producer 还没有发送到 kafka 集群的记录。默认情况下,没有 interceptor 。 | list | null | low | |
max.in.flight.requests.per.connection | 在发生阻塞之前,客户端的一个连接上允许出现未确认请求的最大数量。注意,如果这个设置大于1,并且有失败的发送,则消息可能会由于重试而导致重新排序(如果重试是启用的话)。 | int | 5 | [1,...] | low |
metadata.max.age.ms | 刷新元数据的时间间隔,单位毫秒。即使没有发现任何分区的 leadership 发生变更也会强制刷新以便能主动发现新的 Broker 或者新的分区。 | long | 300000 | [0,...] | low |
metric.reporters | 用于指标监控报表的类清单。实现org.apache.kafka.common.metrics.MetricsReporter 接口之后允许插入能够通知新的创建度量的类。JmxReporter 总是包含在注册的 JMX 统计信息中。 | list | "" | low | |
metrics.num.samples | 计算 metrics 所需要维持的样本数量。 | int | 2 | [1,...] | low |
metrics.recording.level | metrics 的最高纪录级别。 | string | INFO | [INFO, DEBUG] | low |
metrics.sample.window.ms | 计算 metrics 样本的时间窗口。 | long | 30000 | [0,...] | low |
reconnect.backoff.max.ms | 当重新连接到一台多次连接失败的 Broker 时允许等待的最大毫秒数。如果配置该参数,则每台主机的 backoff 将呈指数级增长直到达到配置的最大值。当统计到 backoff 在增长,系统会增加20%的随机波动以避免大量的连接失败。 | long | 1000 | [0,...] | low |
reconnect.backoff.ms | 在尝试重新连接到给定的主机之前,需要等待的基本时间。这避免了在一个紧凑的循环中反复连接到同一个主机。这个 backoff 机制应用于所有客户端尝试连接到 Broker 的请求。 | long | 50 | [0,...] | low |
retry.backoff.ms | 在尝试将一个失败的请求重试到给定的 topic 分区之前需要等待的时间。这避免在某些失败场景下在紧凑的循环中重复发送请求。 | long | 100 | [0,...] | low |
sasl.kerberos.kinit.cmd | Kerberos kinit 命令的路径。 | string | /usr/bin/kinit | low | |
sasl.kerberos.min.time.before.relogin | 重新尝试登陆之前,登录线程的休眠时间。 | long | 60000 | low | |
sasl.kerberos.ticket.renew.jitter | 随机抖动增加到更新时间的百分比。 | double | 0.05 | low | |
sasl.kerberos.ticket.renew.window.factor | 登录线程将持续休眠直到上一次刷新到 ticket 的过期时间窗口,在此时间窗口它将尝试更新 ticket 。 | double | 0.8 | low | |
ssl.cipher.suites | 密码套件列表。密码套件是利用 TLS 或 SSL 网络协议来实现网络连接的安全设置,是一个涵盖认证,加密,MAC和密钥交换算法的组合。默认情况下,支持所有可用的密码套件。 | list | null | low | |
ssl.endpoint.identification.algorithm | 使用服务器证书验证服务器主机名的 endpoint 识别算法。 | string | null | low | |
ssl.keymanager.algorithm | key manager factory 用于 SSL 连接的算法。默认值是Java虚拟机配置的 key manager factory 算法。 | string | SunX509 | low | |
ssl.secure.random.implementation | 用于 SSL 加密操作的 SecureRandom PRNG 实现。 | string | null | low | |
ssl.trustmanager.algorithm | trust manager factory 用于SSL连接的算法。默认值是Java虚拟机配置的 trust manager factory 算法。 | string | PKIX | low | |
transaction.timeout.ms | 主动中止进行中的事务之前,事务协调器等待 Producer 更新事务状态的最长时间(以毫秒为单位)。如果此值大于 Broker 中的 max.transaction.timeout.ms 设置的时长,则请求将失败并提示"InvalidTransactionTimeout"错误。 | int | 60000 | low | |
transactional.id | 用于事务交付的 TransactionalId。 这使跨越多个生产者会话的可靠性语义成为可能,因为它可以保证客户在开始任何新的事务之前,使用相同的 TransactionalId 的事务都已经完成。 如果没有提供 TransactionalId ,则 Producer 被限制为幂等递送。 请注意,如果配置了 TransactionalId,则必须启用 enable.idempotence 。 缺省值为空,这意味着无法使用事务。 | string | null | non-empty string | low |
如果对老的Scala版本的 Producer 配置感兴趣,请点击 这里.