我们需要创建紧凑的主题,这些主题需要在特定大小(segment.bytes)之后进行压缩,但最重要的是在经过特定时间(segment.ms)之后(即使segment.bytes尚未达到)在主题级配置上进行压缩。
现在,我们已经看到segment.bytes得到了尊重,但是segment.ms没有得到尊重。我已经用融合的Kafka5.x发行版转载了这一期
https://kafka.apache.org/documentation/#topicconfigs
这是我在apache kafka文档中读到的关于segment.ms的内容,它使我相信我们的理解是正确的--segment.ms将覆盖segment.bytes--当kafka在主题上进行压缩时。
segment.ms此配置控制Kafka在段文件未满的情况下强制滚动日志的时间段,以确保保留可以删除或压缩旧数据。
我发送的数据键在0-20值之间旋转,字符串‘springKafka生产者和消费者的例子’和我追加键值到这个字符串。
这是制作人的代码
@Override
public void run(String... strings) throws Exception {
String data = "Spring Kafka Producer and Consumer Example";
for (int j = 0; j < 30000; j++) {
for (int i = 0; i < 20; i++) {
sender.send(new Integer(i).toString(), data + i);
}
}
}
这里的代码示例为https://github.com/leofoto/kafka-producer-consumer.git
我从https://memorynotfound.com/spring-kafka-consout-producer-example/中获取了代码示例(并对此测试案例进行了修改)
我首先创建了compact-topic,在broker日志中看到了以下内容
为/tmp/kafka中的分区my-topic-compact-0创建了日志,其属性为{compression.type->producer,message.format.version->2.0-iv1,file.delete.delay.ms->60000,max.message.bytes->1000012,min.compaction.lag.ms->0,message.timestamp.type->CreateTime,message.downconversion.enable->true,min.insync.replicas->1,segment.jitter.ms->0,(kafka.log.logmanager)[2018-09-17 21:28:00,110]信息[Partition my-topic-compact-0 Broker=0]未找到分区my-topic-compact-0(kafka.cluster.Partition)的检查点高水印
然后当我更改配置使主题紧凑时
./kafka-configs--zookeeper localhost:2181--实体类型主题--实体名称my-topic-compact--alter--add-config min.cleanable.dirty.ratio=0.01,cleanup.policy=compact,segment.ms=12000,delete.retention.ms=100,segment.bytes=200000已完成对实体的配置更新:主题“my-topic-compact”。
Broker日志再次显示它(现在正确地报告它是一个紧凑的主题)
[2018-09-17 22:06:25,745]对/config/changes的信息处理通知(Kafka.Common.ZKnodeChangeNotificationListener)[2018-09-17 22:06:25,746]EntityPath:Topics/My-Topic-Compact with Config:Map的信息处理重写(Cleanup.Policy->compact,Segment.ms->12000,Min.Cleanable.Dirty.Ratio->0.01,Segment.Bytes->200000,Delete.Retention.ms->100)
kafka-config--description命令也清楚地显示了它
./kafka-configs--zookeeper localhost:2181--实体-类型主题--实体-名称my-topic-compact--描述
主题“my-topic-compact”的配置为segment.bytes=200000,min.cleanable.dirty.ratio=0.01,delete.retention.ms=100,segment.ms=12000,cleanup.policy=compact
当我启动kafka服务器时,我看到以下消息
<<以300000毫秒的时间开始日志清理>>[[我确信300秒是代理配置值,本例中主题级别的值是12秒]]
[2018-09-17 22:01:31,215]信息[Log partition=my-topic-non-compact-0,dir=/tmp/kafka-logs]在2毫秒内完成日志加载,其中有1段,日志开始偏移量为0,日志结束偏移量为20(kafka.Log.Log)[2018-09-17 22:01:31,218]信息日志加载在378毫秒内完成。(kafka.log.logmanager)[2018-09-17 22:01:31,224]开始日志清理的信息,周期为300000毫秒。(kafka.log.logmanager)[2018-09-17 22:01:31,225]信息启动日志刷新,默认周期为9223372036854775807毫秒。(kafka.log.logmanager)[2018-09-17 22:01:31,439]信息正在等待0.0.0.0:9092上的套接字连接。(kafka.network.acceptor)[2018-09-17 22:01:31,463]信息[SocketServer BrokerID=0]启动1个接受线程(kafka.network.SocketServer)[2018-09-17 22:01:31,478]信息[ExpirationReaper-0-Product]:启动(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2018-09-17 22:01:31,478]信息[ExpirationReaper-0-Product]:启动(kafka.server.DelayedOperationPurgatoryenerName(明文),明文))(kafka.zk.kafkazkclient)
当我写了很多数据的时候,我也看到了滚动的片段,我看到了很多activity,这推动了压缩的发生。[这很好]我发送了超过300k条记录,压缩发生了,一个新的消费者使用了消息(压缩发生后),它看到了大约3225条记录。
[2018-09-17 22:09:21,602]信息[Log partition=my-topic-compact-0,dir=/tmp/kafka-logs]在4 ms内滚动了偏移量185361处的新日志段。(kafka.Log.Log)[2018-09-17 22:09:21,673]信息[ProducerStateManager Partition=My-Topic-Compact-0]在偏移量188897处写入生产者快照(kafka.Log.ProducerStateManager)[2018-09-17 22:09:21,675]信息[Log Partition=My-Topic-Compact-0,dir=/tmp/kafka-logs]在3毫秒内滚动了偏移量188897处的新日志段。(kafka.Log.Log)[2018-09-17 22:09:21,755]信息[ProducerStateManager Partition=My-Topic-Compact-0]在偏移量192348处写入生产者快照(kafka.Log.ProducerStateManager)[2018-09-17 22:09:21,758]信息[Log Partition=My-Topic-Compact-0,dir=/tmp/kafka-logs]在3毫秒内滚动了偏移量192348处的新日志段。(kafka.Log.Log)[2018-09-17 22:09:21,831]信息[ProducerStateManager Partition=My-Topic-Compact-0]在偏移量195846处写入生产者快照(kafka.Log.ProducerStateManager)[2018-09-17 22:09:21,834]信息[Log Partition=My-Topic-Compact-0,dir=/tmp/kafka-logs]在3毫秒内滚动了偏移量195846处的新日志段。(kafka.Log.Log)[2018-09-17 22:09:21,879]信息[ProducerStateManager Partition=My-Topic-Compact-0]在偏移量199461处写入生产者快照(kafka.Log.ProducerStateManager)[2018-09-17 22:09:21,882]信息[Log Partition=My-Topic-Compact-0,dir=/tmp/kafka-logs]在3毫秒内滚动了偏移量199461处的新日志段。(kafka.Log.Log)[2018-09-17 22:09:21,909]信息[ProducerStateManager Partition=My-Topic-Compact-0]在偏移量203134处写入生产者快照(kafka.Log.ProducerStateManager)[2018-09-17 22:09:21,915]信息[Log Partition=My-Topic-Compact-0,dir=/tmp/kafka-logs]在7毫秒内滚动了偏移量203134处的新日志段。(kafka.Log.Log)[2018-09-17 22:09:21,980]信息[ProducerStateManager Partition=My-Topic-Compact-0]在偏移量206703处写入生产者快照(kafka.Log.ProducerStateManager)[2018-09-17 22:09:21,985]信息[Log Partition=My-Topic-Compact-0,dir=/tmp/kafka-logs]在6毫秒内滚动了偏移量206703处的新日志段。(kafka.log.log)
现在,无论等待多长时间(超过12秒),日志压缩都不会启动
无论我在运行以下命令之前等待了多少时间(每次都有新的消费者组)
./kafka-console-consumer--bootstrap-server localhost:9092-topic my-topic-compact--from-begind--property print.key=true--group new-group16
每一个新的使用者正好消费3225条消息,如果压缩是在主题级segment.ms通过之后发生的,它应该将它压缩到只有20个键和它们的最新值。但我们看不到这种行为。我错过了什么吗。
删除无效
最重要的是,当我为相同的密钥发送空有效负载时,像这样
@Override
public void run(String... strings) throws Exception {
String data = "Spring Kafka Producer and Consumer Example";
for (int j = 0; j < 2; j++) {
for (int i = 0; i < 20; i++) {
sender.send(new Integer(i).toString(), null);
}
}
}
我们期望消息最终在下一个压缩周期被删除。在segment.ms时间过去后(在我们的例子中,主题级别配置为12秒),我们也不会发生这种情况
./kafka-configs--zookeeper localhost:2181--实体-类型主题--实体-名称my-topic-compact--描述
主题“my-topic-compact”的配置为segment.bytes=200000,min.cleanable.dirty.ratio=0.01,delete.retention.ms=100,segment.ms=12000,cleanup.policy=compact
kafka(apache kafka 2.x或confluent distribution 5.x)还不支持基于时间的日志压缩这是我从confluent工程师那里得到的
现在,这对我们是行不通的。分享以下信息供他人参考
一旦https://cwiki.apache.org/confluence/display/kafka/kip-354%3a+add+a+maximum+log+compaction+lag完成并实现,我将鼓励您重新访问该用例。
我需要压缩Weblogic日志。 我检查了Weblogic控制台的日志设置,发现我可以旋转日志,但没有看到可以为日志压缩(压缩到zip文件)设置的任何属性。 当前设置如图所示。 有没有自动压缩这些日志的方法?
日志压缩可确保 Kafka 始终至少为单个 topic partition 的数据日志中的每个 message key 保留最新的已知值。 这样的设计解决了应用程序崩溃、系统故障后恢复或者应用在运行维护过程中重启后重新加载缓存的场景。 接下来让我们深入讨论这些在使用过程中的更多细节,阐述在这个过程中它是如何进行日志压缩的。 迄今为止,我们只介绍了简单的日志保留方法(当旧的数据保留时间超过指定时间、
我不明白的第一个例子是,下面只打印“warning:root:hello from warn”。如果我理解正确的话,“logging.info”实际上调用根记录器,而根记录器默认为警告级别。所以第一个“Hello from Info”被忽略,这很好。但为什么第二个“你好从信息”也没有打印出来? 第二个问题是处理程序和记录器的日志级别。如果我们同时为处理程序和记录器设置日志级别,那么哪一个是有效的?
我尝试使用FFMPEG和这个库压缩视频:https://github.com/guardianproject/android-ffmpeg-java
我想用Gzip压缩我的web应用程序,我使用下面的类 压缩滤波器 更新缓存过滤器工作正常,但仍然没有gzip压缩,下面是Chrome中的响应头。 我有什么办法能让这一切成功吗?我真的需要帮助,谢谢
我有一个带有Kafka使用者的spring应用程序,它使用@KafKalisterner注释。正在使用的主题是日志压缩的,我们可能会遇到必须再次使用主题消息的情况。以编程方式实现这一目标的最佳方法是什么?我们不控制Kafka主题配置。