在我的新公司,我是一名数据工程师,负责构建google cloud platform(GCP)批处理ETL管道。我的团队的数据科学家最近给了我一个数据模型(用Python3.6编写的.py文件)。
数据模型有一个主函数,我可以调用它并获得一个dataframe作为输出,我打算将这个dataframe附加到一个bigquery表中。我是否可以只导入这个主函数,并使用apache beam(Dataflow)将其集成到管道中,而不必将数据模型重新编码到pTransform中?或者仅仅使用云调度器和云功能来实现我想要的目标会更好吗?
我是一个完全的初学者与数据流和apache beam,所以任何帮助或链接到指南将非常感谢!
如果您有一个dataframe,最简单的方法是将其转换为CSV,然后将其加载到BigQuery(加载作业)中
不要忘记添加job_config.write_disposition='write_append'
将数据添加到现有表中。查看其他参数、模式自动检测、CSV分隔器、跳过前导行可以帮助您实现负载。
在调度程序触发的函数或云运行中执行。效果很好!
您可以利用BigQuery与Pandas的内置集成,而不是转换为CSV文件并将其加载到BigQuery中(这是一个更长且可能更昂贵的过程):
import pandas
df = pandas.DataFrame(
{
'my_string': ['a', 'b', 'c'],
'my_int64': [1, 2, 3],
'my_float64': [4.0, 5.0, 6.0],
}
)
full_table_id = 'my_dataset.new_table'
project_id = 'my-project-id'
df.to_gbq(full_table_id, project_id=project_id)
在管道编排方面,我个人喜欢与Cloud Composer集成得很好的Apache Airflow
编辑:查看df.to_GBQ的大量文档,了解如何加载数据帧的更多控制。
主题中的Kafka数据可以被流式传输、消费和吸收到BigQuery/云存储中,有哪些可能的选项。 按照,是否可以将Kafka与Google cloud Dataflow一起使用 GCP自带Dataflow,它建立在Apache Beam编程模型之上。KafkaIO与Beam Pipeline一起使用是对传入数据执行实时转换的推荐方式吗? https://beam.apache.org/releas
与Firebase console中的Firebase Firestore一样,谷歌云平台中的Firestore中也有相同的数据,同样的,Firebase Realtime Database(json文件)在谷歌云平台中也有
我在谷歌云平台上托管了一个基本的网络应用程序,我注意到在过去的几个月里,我的成本在慢慢上升。在过去的30天里,它真的加速了(幸运的是,在一个很小的基础上--我仍然在每天不到2美元的水平上滴答作响)。我已经几个月没有添加任何新的功能或客户端了,所以这有点令人惊讶。 我的第一直觉是交通增加了。我在App Engine仪表板上看不到类似的内容,但我放入了一堆优化,并大幅降低了QPS以防万一。没有变化。
顺便说一句:我的应用程序是一些REST控制器和一些批处理作业的组合。那么使用云数据流有意义吗?如果没有,那么是否有更好的控制台管理器用于批处理作业(如重新启动、取消作业门户)等?
有人能帮我做这个吗?
新增数据项 更新数据项 删除数据项 获取数据项 查询数据项 分页和排序 地理位置操作