当前位置: 首页 > 知识库问答 >
问题:

如何在Flink中将avro文件写入S3?

吴弘壮
2023-03-14

我想从kafka主题中读取流数据,并以avro或parquet格式写入S3。数据流看起来像是json字符串,但我无法以avro或parquet格式转换并写入S3。

val stream=env.AddSource(myConsumerSource).AddSink(sink)

请帮忙,谢谢!

共有1个答案

仲智
2023-03-14

解决方案可以在基本etl之后使用AWS Kinesis Firehose,将SQL查询Flink表转换为String,并从AWS控制台写入Kinesis,然后作为Parquet写入S3。

https://github.com/kali786516/flinkstreamandsql/blob/master/src/main/scala/com/aws/examples/kinesis/producer/transactionexample/transactionproducer.scala

https://github.com/kali786516/flinkstreamandsql/blob/master/src/main/scala/com/aws/examples/kinesis/producer/transactionexample/transactionproducer.scala

Kafka示例:-https://github.com/kali786516/flinkstreamandsql/tree/master/src/main/scala/com/aws/examples/Kafka

 类似资料:
  • 我正在开发一个Flink流媒体程序,可以读取Kafka消息,并将消息转储到AWS s3上的ORC文件中。我发现没有关于Flink的BucketingSink和ORC file writer整合的文件。BucketingSink中没有这样的ORC文件编写器实现。 我被困在这里了,有什么想法吗?

  • 问题内容: 我在一个免费的支持PHP的服务器上安装了此脚本: 它创建文件,但为空。 如何创建文件并向其中写入内容,例如“猫追老鼠”行? 问题答案: 您可以使用更高级别的函数,例如,与调用,相同,然后依次将数据写入文件。

  • 问题内容: 我正在尝试使用Go写入日志文件。 我尝试了几种方法,但都失败了。这是我尝试过的: 日志文件被创建,但是没有任何打印或附加到该文件。为什么? 问题答案: 过去的工作方式一定不同,但这对我有用: 基于Go文档,不能用于,因为它会打开文件“供阅读:” 打开命名文件以供读取。如果成功,则可以使用返回文件上的方法进行读取;关联的文件描述符具有mode 。如果有错误,它将是类型。 编辑 检查后移至

  • 问题内容: 我试图将XML 存储到XML文件中,以便稍后可以检索信息,然后将其显示回控制台。 有人可以告诉我最有效的方法吗? 编辑: 这是我要写入外​​部文件的内容 这一切都创建了一个Bank用户,该用户被扔到中,然后我想存储他们的信息,以便稍后返回并重新显示。 问题答案: //根据需要修改下面的类 //下面的类实际写了

  • 在我的工作中,我通过将AVRO文件复制到HDFS中,然后在impala中执行“refresh”,将这些文件导入impala表。 但是当我想用压缩文件做的时候,它没有起作用。 hive>设置avro.output.codec=bzip2; 创建表: 创建表(bigint COMMENT“from deserializer”、string COMMENT“from deserializer”、stri

  • 我试图将数据写入csv文件,我创建了四列作为 除了序列号,其他三个字段是列表