我见过很多Apache Beam的例子,从PubSub读取数据并写入GCS bucket,但是有使用KafkaIO并将其写入GCS bucket的例子吗?我可以在哪里解析消息并根据消息内容将其放入适当的桶中?
对于例如。
message = {type="type_x", some other attributes....}
message = {type="type_y", some other attributes....}
type_x --> goes to bucket x
type_y --> goes to bucket y
我的用例是将数据从 Kafka 流式传输到 GCS 存储桶,所以如果有人建议在 GCP 中提供更好的方法,它也欢迎。
谢谢。问候,阿南特。
你可以看看这里的例子-https://github.com/0x0ece/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java
一旦您读取了数据元素,如果您想基于特定数据值写入多个目标,则可以使用TupleTagList
查看多个输出,详细信息可在此处找到-https://beam.apache.org/documentation/programming-guide/#additional-输出
您可以使用 Secor 将消息加载到 GCS 存储桶。Secor还能够解析传入的消息,并将它们放在同一个存储桶中的不同路径下。
嗨,我已经创建了一个apache beam管道,测试了它,并在eclipse内部运行了它,包括本地和使用dataflow Runner。我可以在eclipse控制台中看到管道正在运行。e.控制台上的日志。 这是我在cmd提示符中使用的maven命令, 这是我用来创建管道和设置选项的代码段。
这与这个问题最为相似。 我正在Dataflow 2.x中创建一个管道,它从Pubsub队列获取流式输入。进入的每一条消息都需要通过来自Google BigQuery的一个非常大的数据集进行流式传输,并且在写入数据库之前附加了所有相关的值(基于一个键)。 问题是来自BigQuery的映射数据集非常大--任何将其用作侧输入的尝试都失败了,数据流运行程序会抛出错误“java.lang.IllegalAr
我对GCP、Dataflow、Apache Beam、Python和一般的OOP都是新手。我来自函数式javascript领域,对于上下文。 现在,我已经用Apache Beam python sdk构建了一个流管道,并将其部署到GCP的数据流中。管道的源是pubsub订阅,接收器是数据存储。 管道从pubsub订阅中获取消息,根据配置对象+消息内容做出决定,然后根据做出的决定将其放在数据存储中的
我们有一个波束/数据流管道(使用数据流SDK 2.0.0-beta3 但是,我们正在设置 参数,我们可以看到所有二进制文件/jar 等都已上传到我们在 参数中指定的存储桶。 但是,Beam/Dataflow 随后会在我们项目的 GCS 中创建以下僵尸存储桶: 为什么会发生这种情况,如果我们清楚地设置参数?
我们在实验中发现,在DataFlow/Apache Beam管道中设置显式的输出碎片#会导致更差的性能。我们的证据表明,Dataflow在最后秘密地做了另一个GroupBy。我们已经转向让Dataflow自动选择碎片数(shards=0)。但是,对于某些管道,这会导致大量相对较小的输出文件(~15K文件,每个<1MB)。
null 我注意到这太慢了--CPU资源只被利用了几%。我怀疑每个节点都得到了一个zip文件,但是工作不是在本地CPU之间分配的--所以每个节点只有一个CPU在工作。我不明白为什么会这样,因为我使用了平面地图。
我没有找到任何文档允许将错误处理应用于此步骤,也没有找到将其重写为DOFN的方法。对此应用错误处理有什么建议吗?谢谢
结果如何在工作人员之间分配?是使用查询结果创建一个表,工作人员从中读取页面,还是每个工作人员运行查询并读取不同的页面或。。。怎样