我是Apache Beam的新手,所以我在以下场景中有点吃力:
class getFirestoreUsers(beam.DoFn):
def process(self, element, customerId):
print(f'Getting Users from Firestore, ID: {customerId}')
# Call function to initialize Database
db = intializeFirebase()
""" # get customer information from the database
doc = db.document(f'Customers/{customerId}').get()
customer = doc.to_dict() """
usersList = {}
# Get Optin Users
try:
docs = db.collection(
f'Customers/{customerId}/DevicesWiFi_v3').where(u'optIn', u'==', True).stream()
usersList = {user.id: user.to_dict() for user in docs}
except Exception as err:
print(f"Error: couldn't retrieve OPTIN users from DevicesWiFi")
print(err)
return([usersList])
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--topic',
type=str,
help='Pub/Sub topic to read from')
parser.add_argument(
'--output',
help=('Output local filename'))
args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = True
options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=options)
users = (p | 'Create chars' >> beam.Create([
{
"clientMac": "7c:d9:5c:b8:6f:38",
"username": "Louis"
},
{
"clientMac": "48:fd:8e:b0:6f:38",
"username": "Paul"
}
]))
# Get Dictionary from Pub/Sub
data = (p | 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=args.topic)
| 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e))
)
# Get customerId from Pub/Sub information
PcustomerId = (data | 'get customerId from Firestore' >>
beam.ParDo(lambda x: [x.get('customerId')]))
PcustomerId | 'print customerId' >> beam.Map(print)
# Get Users from Firestore
custUsers = (users | 'Read from Firestore' >> beam.ParDo(
getFirestoreUsers(), customerId=beam.pvalue.AsSingleton(PcustomerId)))
custUsers | 'print Users from Firestore' >> beam.Map(print)
我不清楚示例代码中如何使用users
PCollection(因为element
在process
定义中没有处理)。我使用窗口重新排列了代码,并使用customer_id
作为主要输入。
class GetFirestoreUsers(beam.DoFn):
def setup(self):
# Call function to initialize Database
self.db = intializeFirebase()
def process(self, element):
print(f'Getting Users from Firestore, ID: {element}')
""" # get customer information from the database
doc = self.db.document(f'Customers/{element}').get()
customer = doc.to_dict() """
usersList = {}
# Get Optin Users
try:
docs = self.db.collection(
f'Customers/{element}/DevicesWiFi_v3').where(u'optIn', u'==', True).stream()
usersList = {user.id: user.to_dict() for user in docs}
except Exception as err:
print(f"Error: couldn't retrieve OPTIN users from DevicesWiFi")
print(err)
return([usersList])
data = (p | 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=args.topic)
| beam.WindowInto(window.FixedWindow(60))
| 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e)))
# Get customerId from Pub/Sub information
customer_id = (data | 'get customerId from Firestore' >>
beam.Map(lambda x: x.get('customerId')))
customer_id | 'print customerId' >> beam.Map(print)
# Get Users from Firestore
custUsers = (cutomer_id | 'Read from Firestore' >> beam.ParDo(
GetFirestoreUsers())
custUsers | 'print Users from Firestore' >> beam.Map(print)
从你的评论来看:
当使用来自pub/sub的原始JSON数据运行“main”PCollection时,所需的数据(首先是customerID,然后是customers数据)尚未就绪
在阅读pub/sub主题时,你的意思是firestore中的数据还没有准备好?
您总是可以在主函数中将逻辑拆分为2个管道,然后一个接一个地运行它们。
我有一个数据流工作,将单个文件分割成x个记录(表)。这些流在bigQuery没有问题。 不过,我发现没有办法在结果出来后执行管道中的另一个阶段。 举个例子 根据上述内容,我希望运行以下内容: 是有无论如何运行管道的另一个部分后,up到bigQuery或这是不可能的?提前感谢。
我试图在Apache Beam中使用BigtableIO的运行时参数来写入BigTable。 我创建了一个从 BigQuery 读取并写入 Bigtable 的管道。当我提供静态参数时,管道工作正常(使用 ConfigBigtableIO 和 ConfigBigtableConfiguration,请参阅此处的示例 - https://github.com/GoogleCloudPlatform/
我有一个批次处理作业在数据流运行在gcp下版本apache-梁[gcp]==2.19.0的数据流运行。我为作业创建了一个自定义模板。作业正在按预期运行,但我还想添加最大作业持续时间。我在wait_until_finish()方法中找到了持续时间(毫秒)参数,它应该是可用的。问题是:如何让模板化批处理作业在运行时间超过持续时间时自动停止?我不需要保存任何数据,我只希望工作运行时间过长时停止。我已经实
从这个问题中,我了解到Apache Flink中的SplitStream现在已被弃用,建议改用side输出。有人能举一个侧面输出如何替代splitStream的例子吗? e、 g.如何修改下面的代码段以使用侧输出?
我正在处理一个更大的数据流管道,它在批处理模式下工作得很好,但完成后的重构确实有侧输入的问题。如果我将管道置于流模式并移除侧输入,管道在Google的数据流上可以完美地工作。 如果把所有东西都剥离下来,构建以下简短的脚本来封装这个问题,并能够与它一起玩。 在Google的Dataflow中以批处理作业的形式运行这个脚本可以完成它需要做的事情。请参阅从数据流中可视化的管道:
问题内容: 在Linux上,命令ps aux输出每个统计信息具有多个列的进程列表。例如 我希望能够使用Python进行阅读,然后将每一行然后每一列分开,以便可以将它们用作值。 在大多数情况下,这不是问题: 我现在可以遍历流程以获取每一行并将其按空格分开,例如 但是,问题在于命令的最后一列有时在其中有空格。在上面的示例中,可以在命令中看到 将被拆分为 但我真的希望它是: 所以我的问题是,我该如何拆分