当前位置: 首页 > 面试题库 >

使用tf.contrib.data.parallel_interleave并行化tf.from_generator

弘志勇
2023-03-14
问题内容

我有一堆JSON数组文件(准确地说是AVRO),每个文件都产生多个样本来训练Keras模型。通过使用@GPhilo和@jsimsa的想法,我能够想到这一点来并行化我的输入管道。无法弄清楚如何设计generator(n)来划分处理文件的工作。代码内部失败,parse_file(f)因为该函数需要一个字符串文件路径而不是一个Tensor

N = num_cores = 2
files_to_process = ["f1.avro", "f2.avro", "f3.avro"]
shuffle_size = prefetch_buffer = 1000
batch_size = 512

def generator(n):
    size = math.ceil(len(files_to_process) / N)
    start_index = n * size
    end_index = start_index + size

    def gen():
        # for f in files_to_process[start_index:end_index]:
        for f in tf.slice(files_to_process, start_index, size):
            yield f

    return gen

def dataset(n):
    return tf.data.Dataset.from_generator(generator(n), (tf.string,))

def process_file(f):
    examples_x, examples_y = parse_file(f)
    return examples_x, examples_y

ds = tf.data.Dataset.range(N)
ds = ds.apply(tf.contrib.data.parallel_interleave(dataset, cycle_length=N))
ds = ds.map(process_file, num_parallel_calls=N)
ds = ds.prefetch(prefetch_buffer)
ds = ds.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
ds = ds.batch(batch_size).shuffle(shuffle_size)

...
myTfKerasModel.fit( ds.make_one_iterator(), NUM_TRAIN_SAMPLES // batch_size )
  • generator(n)这里正确的设计方法是什么
  • 这是使用parallel_interleave和设计输入管道的优化方法吗?flat_map

问题答案:

在我看来,发电机不必要地使您的生活变得复杂。这就是我实现您的输入管道的方式:

def parse_file_tf(filename):
    return tf.py_func(parse_file, [filename], [tf.float32, tf.float32])

# version with map
files = tf.data.Dataset.from_tensor_slices(files_to_process)
dataset = files.map(parse_file_tf, num_parallel_calls=N)
dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
dataset = dataset.batch(batch_size).shuffle(shuffle_size).prefetch(2)
it = dataset.make_one_shot_iterator()

为了测试它,我定义一个虚拟对象parse_file为:

i=0
def parse_file(f):
    global i
    i += 1
    return np.asarray([i]*i, dtype=np.float32), np.asarray([i]*i, dtype=np.float32) # mimicks variable-length examples_x, examples_y

我输入了一个基本循环,该循环显示了迭代器返回的内容:

sess = tf.Session()
try:
    while True:
        x, y = it.get_next()
        vx, vy = sess.run([x,y])
        print(vx)
        print(vy)
except tf.errors.OutOfRangeError:
    pass
sess.close()

运行上面的代码可以打印:

[2. 3. 2. 1. 3. 3.]
[2. 3. 2. 1. 3. 3.]

管道说明

本质上,我将并行化问题留给map,可以在其中传递应运行的线程数。无需生成器迭代范围和那些额外的复杂性。

我选择map
overparallel_interleave是因为map要求您为Dataset返回的每个项生成一个实例,在您的情况下,这实际上没有任何意义,因为在运行时已将所有值加载到内存中parse_file
parallel_interleave如果您缓慢地生成值(例如,通过应用tf.data.TFRecordDataset到文件名列表)会很有意义,但是如果您的数据集适合内存,请使用map

关于tf.py_func限制,它们不会影响您训练有素的网络,只会影响输入管道。理想情况下,您将为培训和网络的最终使用使用不同的管道。您只需要注意后者的局限性,而对于培训(除非您使用分布式培训和/或在机器之间移动培训进行非常具体的操作),则可以相当安全地进行。

带发电机的版本

如果您的JSON文件很大,并且其内容无法容纳在内存中,则可以使用生成器,但与您最初使用的方法略有不同。这个想法是,生成器遍历JSON文件并yield一次记录一个记录。然后,生成器必须是您的parse_file功能。例如,假设您具有以下parse_file生成器:

i = 3
def parse_file(filename):
    global i
    i += 1
    ctr = 0
    while ctr < i:
        yield ctr, ctr

在这种情况下,管道如下所示:

def wrap_generator(filename):
    return tf.data.Dataset.from_generator(parse_file(filename), [tf.int32, tf.int32])

files = tf.data.Dataset.from_tensor_slices(files_to_process)
dataset = files.apply(tf.contrib.data.parallel_interleave(wrap_generator, cycle_length=N))
dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
dataset = dataset.shuffle(shuffle_size).batch(batch_size).prefetch(2)
it = dataset.make_one_shot_iterator()

请注意,此处需要使用,parallel_interleave因为我们将生成器转换Dataset为从中提取值的实例。其余的保持不变。

将其馈送到与上述相同的示例循环中:

[6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]
[6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]


 类似资料:
  • 我有一个用于将实时数据移动到测试环境中的事务数据置乱的过程。该表包含大约一亿行,分布在50个分区中。每月添加一个新分区。随着音量的增加,过程的执行速度比以前慢。 我正在考虑在我的代码中引入某种程度的并行化。这是一个新领域,我想知道是否有任何最佳实践。也许使用dbms_parallel_execute将更新拆分为块? 任何关于如何优化我的代码的建议都非常感谢! 编辑,我的解决方案基于以下反馈:重写部

  • 是否有人有任何使用TBB有效并行std::分区的技巧?这已经完成了吗? 以下是我的想法: 如果数组很小,std::将其分区(串行)并返回 否则,使用自定义迭代器将数组视为2个交错数组(在缓存大小的块中交错) 为每对迭代器启动一个并行分区任务(递归到步骤1) 在两个分区/中间指针之间交换元素* 返回合并的分区/中间指针 *我希望在一般情况下,与数组的长度相比,或者与将数组划分为连续块时所需的交换相比

  • 我已经用RxJava成功地完成了一个小型Java程序。代码为: 使用此代码,一切正常。现在我正在尝试将此代码传递给Android: 在finished()方法中,我正在更新GUI(finishedListener是当前活动正在实现的接口)。 我在使用map(I)的线路上遇到错误- 内置。gradle(用于应用程序)我正在使用: 我如何解决这个问题?

  • 手头的问题写了一个尝试改进的双克生成器在行上工作,考虑到句号和类似的东西。结果如所愿。它不使用mapPartitions,但如下所示。 提前谢了。我一定是漏掉了一些基本的问题。 输出&结果z:Array[(String,String)]=Array((hello,how),(how,are),(are,you),(you,today),(i,am),(am,fine),(fine,but),(bu

  • 问题内容: 您对将尝试获取代码并将其自动拆分为线程的项目有何看法(可能是编译时,可能是在运行时)。 看下面的代码: 这种代码可以自动拆分为两个并行运行的线程。您是否认为有可能?从理论上讲,我感觉这是不可能的(这使我想起了停顿的问题),但是我不能证明这种想法是正确的。 您认为这是一个有用的项目吗?有没有类似的东西? 问题答案: 在一般情况下是否可以知道一段代码是否可以并行化并不重要,因为即使您的算法

  • 我想循环两个列表,将组合传递给函数,并获得以下输出: 由于这是Pyspark,我想将其并行化,因为函数的每个迭代都可以独立运行。 注:我的实际函数是pyspark中的一个长而复杂的算法。只是想贴一个简单的例子来概括。 最好的方法是什么?​