import os
import csv
import datetime
import numpy as np
import tensorflow as tf
import tensorflow_transform as tft
from apache_beam.io import textio
from apache_beam.io import tfrecordio
from tensorflow_transform.beam import impl as beam_impl
from tensorflow_transform.beam import tft_beam_io
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema
import apache_beam as beam
NUMERIC_FEATURE_KEYS = ['feature_'+str(i) for i in range(2000)]
def _create_raw_metadata():
column_schemas = {}
for key in NUMERIC_FEATURE_KEYS:
column_schemas[key] = dataset_schema.ColumnSchema(tf.float32, [], dataset_schema.FixedColumnRepresentation())
raw_data_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema(column_schemas))
return raw_data_metadata
def preprocessing_fn(inputs):
outputs={}
for key in NUMERIC_FEATURE_KEYS:
outputs[key] = tft.scale_to_0_1(inputs[key])
return outputs
def main():
output_dir = '/tmp/tmp-folder-{}'.format(datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
RUNNER = 'DirectRunner'
with beam.Pipeline(RUNNER) as p:
with beam_impl.Context(temp_dir=output_dir):
raw_data_metadata = _create_raw_metadata()
_ = (raw_data_metadata | 'WriteInputMetadata' >> tft_beam_io.WriteMetadata(os.path.join(output_dir, 'rawdata_metadata'), pipeline=p))
m = numpy_dataset = np.random.rand(100,2000)*100
raw_data = (p
| 'CreateTestDataset' >> beam.Create([dict(zip(NUMERIC_FEATURE_KEYS, m[i,:])) for i in range(m.shape[0])]))
raw_dataset = (raw_data, raw_data_metadata)
transform_fn = (raw_dataset | 'Analyze' >> beam_impl.AnalyzeDataset(preprocessing_fn))
_ = (transform_fn | 'WriteTransformFn' >> tft_beam_io.WriteTransformFn(output_dir))
(transformed_data, transformed_metadata) = ((raw_dataset, transform_fn) | 'Transform' >> beam_impl.TransformDataset())
transformed_data_coder = tft.coders.ExampleProtoCoder(transformed_metadata.schema)
_ = transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(os.path.join(output_dir, 'train'), file_name_suffix='.gz', coder=transformed_data_coder)
if __name__ == '__main__':
main()
此外,我的生产代码(未显示)会出现以下消息:作业图太大。请使用较小的作业图重试,或将作业拆分为两个或多个较小的作业。
有什么提示吗?
有一种方法可以解决这个问题,不是为进入tft.scale_to_0_1的每个张量创建阶段,我们可以通过首先将它们堆叠在一起,然后用“elementwise=true”将它们传递到tft.scale_to_0_1来融合它们。
结果是一样的,因为最小值和最大值是按“列”计算的,而不是在整个张量上计算的。
它看起来如下所示:
stacked = tf.stack([inputs[key] for key in NUMERIC_FEATURE_KEYS], axis=1)
scaled_stacked = tft.scale_to_0_1(stacked, elementwise=True)
for key, tensor in zip(NUMERIC_FEATURE_KEYS, tf.unstack(scaled_stacked, axis=1)):
outputs[key] = tensor
我当前正尝试将Dataflow与pub/sub一起使用,但出现以下错误: 工作流失败。原因:(6E74E8516C0638CA):刷新凭据时出现问题。请检查:1。已为项目启用Dataflow API。2.您的项目有一个机器人服务帐户:service-[project number]@dataflow-service-producer-prod.iam.gserviceAccount.com应该可以
其他流式框架(如Apache Samza、Storm或Nifi)是否可以实现这一点? 我们非常期待得到答复。
我已经开始使用Scala SDK Scio开发我的第一个DataFlow工作。数据流作业将以流模式运行。 有谁能建议最好的部署方法吗?我已经在Scio文档中阅读了他们使用的,然后将其部署到Docker容器中。我也读过关于使用数据流模板的内容(但不是很详细)。 什么是最好的?
我试图从一个数据流作业中运行两个分离的管道,类似于下面的问题: 一个数据流作业中的并行管道 如果我们使用单个p.run()使用单个数据流作业运行两个分离的管道,如下所示: 我认为它将在一个数据流作业中启动两个独立的管道,但它会创建两个包吗?它会在两个不同的工人上运行吗?
我正在运行数据流作业从气流。我需要说我是气流的新手。数据流(从气流运行)正在成功运行,但我可以看到气流在获得工作状态时遇到了一些问题,我收到了无限的消息,比如: 谷歌云数据流作业尚不可用。。 以下是将所有步骤添加到数据流后的日志(我将{project ectID}和{jobID}放在它所在的位置): 你知道这是什么原因吗?我找不到与此问题相关的任何解决方案。我应该提供更多信息吗? 这是我在DAG中