当前位置: 首页 > 知识库问答 >
问题:

在ConFluent S3 Kafka连接器中压缩Avro数据

唐俊英
2023-03-14

我有一个 Confluent 接收器连接器,它正在从 Kafka 主题获取数据。然后,它会摄取到 S3 存储桶中。

摄取工作正常,一切都很好,但是现在我需要在将Avro数据放入存储桶之前对其进行压缩。

我已经尝试了以下配置

   {
  "name":"--private-v1-s3-sink",
  "connector.class":"io.confluent.connect.s3.S3SinkConnector",
  "tasks.max": "1",
  "s3.region":"eu-west-1",
  "partition.duration.ms":"3600000",
  "rotate.schedule.interval.ms": "3600000",
  "topics.dir":"svs",
  "flush.size":"2500",
  "schema.compatibility":"FULL",
  "file.delim":"_",
  "topics":"--connect.s3.format.avro.AvroFormat",
  "key.converter":"org.apache.kafka.connect.storage.StringConverter",
  "value.converter":"io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url":"--systems",
  "schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
  "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "storage.class":"io.confluent.connect.s3.storage.S3Storage",
  "s3.bucket.name": "${S3_BUCKET}",
  "s3.acl.canned":"bucket-owner-full-control",
  "avro.codec": "snappy",
  "locale":"en-GB",
  "timezone": "GMT",
  "errors.tolerance": "all",
  "path.format":"'ingest_date'=yyyy-MM-dd",
  "timestamp.extractor":"Record"

“avro.code”,我以为会压缩数据,但它没有。取而代之的是,我还尝试了“ ”s3.compression.type“: ”snappy“ ',仍然没有运气!但是,这确实适用于JSON和GZIP。

不太清楚哪里出了问题?

共有2个答案

霍书
2023-03-14

对于将来可能会遇到这种情况的人。

我在此设置之间运行了测试,使用 BZIP2 而不是 snappy 并且没有启用压缩。

结果是:

No compression 58.2MB / 406 total objects
BZIP    19.9MB / 406 total objects
Snappy 31.1MB / 406 total objects

在24小时的时间里,所有人都从同一个话题中提取并放入自己的桶中。

正如你所看到的,上面使用snappy的配置实际上是有效的。

BZIP提供了更高的压缩率,而且似乎更快。

最终,我们不得不使用snappy,因为Redshift摄取仅允许使用snappy压缩Avro,无论如何,此时。

戴浩初
2023-03-14

这些设置仅适用于S3 Avro编写器,不适用于来自制作方的动态数据,这些数据必须在制作方或代理/主题级别进行“压缩”,而不是连接设置。

参考compression.type主题配置

 类似资料:
  • 问题内容: 有人可以向我解释zlib库在Nodejs中如何工作吗? 我对Node.js很陌生,还不确定如何使用缓冲区和流。 我的简单情况是一个字符串变量,我想将字符串压缩或解压缩(压缩或膨胀,gzip或gunzip等)到另一个字符串。 即(我希望它如何工作) 感谢您的帮助:) 问题答案: 更新 :没意识到在节点0.5中有一个新的内置“ zlib”模块。我在下面的答案是针对第三方node- zlib

  • 我设置了一个Kafka JDBC接收器以将事件发送到PostgreSQL。我编写了这个简单的生产者,它将带有模式(avro)数据的JSON发送到一个主题,如下所示: producer.py(kafka-python) 价值架构: 连接器配置(无主机、密码等) 但我的连接器出现严重故障,有三个错误,我无法找出其中任何一个错误的原因: TL;博士;日志版本 完整日志 有人能帮我理解这些错误和潜在的原因

  • 问题内容: 我在缓存层(redis)中放入了一些较大的JSON值,我认为它们可以使用一些压缩来稍微减少我的内存使用量。 您使用哪个node.js压缩模块?出于某种原因,joyent / node Modules Wiki 上列出的所有内容看起来都很肮脏-要么是404,没有超过一年的提交,很少有人在看,或者打开了内存泄漏的报告。 Snappy看起来不错,但我宁愿选择更轻便的东西。 我自然希望使用异步

  • 问题内容: 我正在使用以下代码来压缩和解压缩字符串数据,但是我面临的问题是,它很容易被压缩而不会出错,但是decompress方法会引发以下错误。 线程“主”中的异常java.io.IOException:不是GZIP格式 仍然不知道如何解决此问题!!! 问题答案: 这是因为 发送您可以从中获得的,并在其中使用它来构建您的。以下是需要在代码中进行的更改。

  • 我正在使用Julia的ZipFile包来提取和处理csv文件。没问题,但是当我遇到zip文件中的zip文件时,我也想处理它,但是遇到了一个错误。 Julia ZipFile文档如下:https://zipfilejl.readthedocs.io/en/latest/ 对如何做到这一点有什么想法吗?

  • 问题内容: 如果我有几个带有压缩zlib数据的二进制字符串,是否有一种方法可以 有效地 将它们组合成单个压缩字符串,而不需要解压缩所有内容? 我现在要做的示例: 我想要的例子: 我对zlib和DEFLATE算法了解不多,因此从理论上讲这可能是完全不可能的。另外,我必须使用use zlib; 因此我无法包装zlib并提出自己的协议,该协议透明地处理级联流。 注意:我不介意该解决方案在Python中不