我正在从查找中读取一些客户记录并将其写入bigquery表,然后从同一个表中读取一些必需的数据栏并尝试将该数据(Json)作为消息推送到pubsub中,在批处理模式下使用数据流管道。但是收到错误:“ValueError:Cloud Pub/Sub目前仅可用于流媒体管道”。 我想在数据流管道的批处理模式中使用PubSub
我试图弄清楚GCP上是否有一项服务,允许使用发布/订阅的流,并将累积的数据转储/批处理到云存储中的文件中(例如,每X分钟一次)。我知道这可以通过Dataflow实现,但如果有现成的解决方案,我会寻找更多的解决方案。 例如,这是可以使用AWS Kinesis Firehose进行的操作—纯粹在配置级别—可以告诉AWS定期或在累积数据达到一定大小时将流中累积的任何内容转储到S3上的文件。 这样做的原因
我有一个谷歌云发布/订阅和云数据流处理架构,我需要有保证的消息排序。是否可以设置订阅消息流,以便每个主题的数据流工作线程是固定的,因此消息通过相同的数据流工作线程路由,因此应该实现消息排序。 谢谢
我正在开发一个物联网应用程序,需要从PubSub主题读取流数据。我想使用Google云数据流SDK读取这些数据。我正在使用Java 1.8 我正在使用谷歌云平台的试用版。当我使用PubSubIO时。Read方法读取流数据时,我在日志文件中发现错误,我的项目没有足够的CPU配额来运行应用程序。 所以我想使用谷歌云数据流SDK读取流数据。 请有人告诉我在哪里可以找到使用Google Cloud Dat
我已经使用Google云数据流SDK编写了一个流式管道,但我想在本地测试我的管道。我的管道从Google Pub/Sub获取输入数据。 是否可以使用DirectPipelineRunner(本地执行,而不是在Google云中)运行访问发布/订阅(pubsubIO)的作业? 我在以普通用户帐户登录时遇到权限问题。我是项目的所有者,我正在尝试访问发布/子主题。
是否可以通过注释在Spring Cloud Circuit Breaker上使用Resilience4j?我找不到任何关于它的留档,只有关于通过代码使用弹性4j的示例
问题:我正在尝试创建一个云数据流管道,该管道使用Python SDK从Google云存储读取Avro文件,进行一些处理并在Google云存储上写回Avro文件。在查看ApacheBeam网站上提供的一些示例后,我尝试运行以下代码。我使用了和函数。我试图实现的是读取一个Avro文件并使用Dataflow写入同一个Avro文件,但它给了我以下警告,并且没有输出Avro文件。 警告/错误: 代码: 编辑
我已经设置了Spring Cloud Stream中提供的Spring Avro模式注册表,以便在RabbitMQ中使用。我看到的大多数示例都使用Maven Avro插件从模式资源文件生成Java类。然后在架构注册表中注册架构文件。我的理解是,此注册表允许消息仅通过对已注册架构的引用进行SERDE,而不是在消息中包含整个架构。我不明白的是,在设计时,这些模式文件是如何在所有服务之间分发的,以生成J
我无法使用功能供应商发送Avro消息。SCSt尝试将消息作为JSON发送,但失败。有人能指出是否需要任何其他配置吗? 这是供应商的功能bean 和配置
我们目前正在将一个复杂的spring boot batch+admin UI系统迁移到一个spring-cloud-task基础设施中,该基础设施将被管理云数据流。 作为POC的第一阶段,我们必须能够将所有Spring批处理作业打包在同一个部署JAR下,并且能够使用自定义作业参数一个接一个地运行它们,并且支持某种REST API远程执行作业/任务。 我们删除了所有spring-batch管理依赖项
我正在尝试找出是否有任何GCP数据流模板可用于使用“Pub/Sub to Cloud Spanner”进行数据摄取。我发现已经有一个默认的GCP数据流模板可用于示例-“Cloud Pub/Sub to BigQuery”。所以,我有兴趣看看我是否可以在流或批处理模式下对扳手进行数据摄取,以及行为会如何
我正在开发一个从谷歌云存储(GCS)目录中读取约500万个文件的管道。我已经将其配置为在谷歌云数据流上运行。 问题是,当我启动管道时,需要花费数小时“计算”所有文件的大小: 如你所见,计算大约5.5M文件的大小需要一个半小时(5549秒),然后从头开始!又花了2个小时运行第二遍,然后又启动了第三遍!截至本文撰写之时,该作业在数据流控制台中仍然不可用,这使我相信这一切都发生在我的本地机器上,并且没有
我正在构建一个应用程序,该应用程序不断附加到缓冲区,而许多阅读器独立地从该缓冲区中消费(写一次读多/WORM)。起初我想使用Apache Kafka,但由于我更喜欢即服务选项,我开始研究AWS Kinesis Streams KCL,似乎我可以用它们完成这项任务。 基本上,我需要两个特性:排序(所有读卡器必须以相同的顺序读取事件)和在缓冲区中选择读卡器开始消费的偏移量的能力。 现在我也在评估谷歌云
我正在评估Kafka/Spark/HDFS开发NRT(sub-sec)java应用程序的能力,该应用程序从外部网关接收数据,并将其发布到桌面/移动客户端(消费者),用于各种主题。同时,数据将通过流式处理和批处理(持久性)管道传输,用于分析和ML。 例如,流将是。。。 独立的TCP客户端从外部TCP服务器读取流数据 客户端根据数据包(Kafka)发布不同主题的数据,并将其传递给流管道进行分析(Spa
我们在datalab中运行了一个Python管道,它从google云存储(导入google.datalab.storage)中的存储桶中读取图像文件。最初我们使用DirectRunner,效果很好,但现在我们尝试使用DataflowRunner,并且出现导入错误。即使在管道运行的函数中包含“import google.datalab.storage”或其任何变体,也会出现错误,例如“没有名为'da