当前位置: 首页 > 知识库问答 >
问题:

Kafka到Google云平台的数据流摄取

应子真
2023-03-14

主题中的Kafka数据可以被流式传输、消费和吸收到BigQuery/云存储中,有哪些可能的选项。

按照,是否可以将Kafka与Google cloud Dataflow一起使用

GCP自带Dataflow,它建立在Apache Beam编程模型之上。KafkaIO与Beam Pipeline一起使用是对传入数据执行实时转换的推荐方式吗?

https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/kafka/kafkaio.html

Kafka数据可以推送到云pub-sub上,然后再推送到BigQuery表上。也可以使用GCP之外的Kafka流/Spark作业。

如果数据完全托管在谷歌云平台(GCP)上,那么在设计决策过程中需要考虑哪些因素?

共有3个答案

崔绍辉
2023-03-14

另一个可能的选择是使用Google维护的Kafka Connect连接器将数据从Kafka上传到pub-sub。从Pub-Sub,您可以轻松地使用Dataflow在BigQuery或其他Google服务中摄取。

罗伟兆
2023-03-14

您可以使用Kafka Connect和BigQuery或GCS连接器。

  • 以下是WebAy的示例。
  • 本博客展示了使用Kafka Connect将数据从Kafka写入GCS和BigQuery。
  • 此回购协议中有各种资源,可供您自己在GCP上运行Kafka Connect。

在转换方面,您可能对KSQL(它构建在Kafka Streams上)感兴趣,并且在同一博客中也有涉及。

免责声明:我为汇流公司工作,并撰写了一些上述材料。

伍弘盛
2023-03-14

卡夫卡支持在2016年加入到Apache Beam中,使用了KafkaIO转换集。这意味着Dataflow也支持它。

将数据加载到BigQuery中的最简单的方法是在DataFlow上运行Apache Beam管道。管道看起来如下所示:

Pipeline p = Pipeline.create();

p.apply("ReadFromKafka", KafkaIO.read()
                                .withTopic(myTopic)...)
 .apply("TransformData", ParDo.of(new FormatKafkaDataToBigQueryTableRow(mySchema))
 .apply(BigQueryIO.writeTableRows()
                  .to(myTableName)
                  .withSchema(mySchema));

p.run().waitUntilFinish();

在Dataflow上使用Beam流水线的优点是,您不必管理数据读取的偏移量、状态和一致性(相对于从Kafka->BQ读取的自定义编写的进程);也不是集群(相对于火花作业)。

最后,这里是一个使用Kafkaio的管道示例。

 类似资料:
  • 在我的新公司,我是一名数据工程师,负责构建google cloud platform(GCP)批处理ETL管道。我的团队的数据科学家最近给了我一个数据模型(用Python3.6编写的.py文件)。 数据模型有一个主函数,我可以调用它并获得一个dataframe作为输出,我打算将这个dataframe附加到一个bigquery表中。我是否可以只导入这个主函数,并使用apache beam(Dataf

  • 与Firebase console中的Firebase Firestore一样,谷歌云平台中的Firestore中也有相同的数据,同样的,Firebase Realtime Database(json文件)在谷歌云平台中也有

  • 我试图弄清楚GCP上是否有一项服务,允许使用发布/订阅的流,并将累积的数据转储/批处理到云存储中的文件中(例如,每X分钟一次)。我知道这可以通过Dataflow实现,但如果有现成的解决方案,我会寻找更多的解决方案。 例如,这是可以使用AWS Kinesis Firehose进行的操作—纯粹在配置级别—可以告诉AWS定期或在累积数据达到一定大小时将流中累积的任何内容转储到S3上的文件。 这样做的原因

  • 我在谷歌云平台上托管了一个基本的网络应用程序,我注意到在过去的几个月里,我的成本在慢慢上升。在过去的30天里,它真的加速了(幸运的是,在一个很小的基础上--我仍然在每天不到2美元的水平上滴答作响)。我已经几个月没有添加任何新的功能或客户端了,所以这有点令人惊讶。 我的第一直觉是交通增加了。我在App Engine仪表板上看不到类似的内容,但我放入了一堆优化,并大幅降低了QPS以防万一。没有变化。

  • 我正在尝试使用DataFlow(Java)将数据从云存储插入到Big Query中。我可以批量上传数据;但是,我想要设置一个流式上传代替。因此,当新对象添加到我的bucket时,它们将被推送到BigQuery。 我已经将PipelineOptions设置为流,并且在GCP控制台UI中显示dataflow管道是流类型的。bucket中的初始文件/对象集被推送到BigQuery。 但是当我向桶中添加新

  • 有人能帮我做这个吗?