当前位置: 首页 > 工具软件 > tf.Transform > 使用案例 >

Keras(十四)tf.data读取csv文件并与tf.keras结合使用

谯志诚
2023-12-01

一,准备csv文件

import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf
from tensorflow import keras

# 打印使用的python库的版本信息
print(tf.__version__)
print(sys.version_info)
for module in mpl, np, pd, sklearn, tf, keras:
    print(module.__name__, module.__version__)
    
# 1,下载并使用sklearn中的“fetch_california_housing”数据集
from sklearn.datasets import fetch_california_housing

housing = fetch_california_housing()

# 2,拆分数据集中的数据为 训练数据、验证数据、测试数据
from sklearn.model_selection import train_test_split

x_train_all, x_test, y_train_all, y_test = train_test_split(housing.data, housing.target, random_state = 7)
x_train, x_valid, y_train, y_valid = train_test_split(x_train_all, y_train_all, random_state = 11)
print("x_train:",x_train.shape, y_train.shape)
print("x_valid:",x_valid.shape, y_valid.shape)
print("x_test:",x_test.shape, y_test.shape)

# 3,在将数据带入到模型之前,先进行预处理-训练、验证、测试数据标准化
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train)
x_valid_scaled = scaler.transform(x_valid)
x_test_scaled = scaler.transform(x_test)

# 4,创建"generate_csv"文件夹
output_dir = "generate_csv"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

# 5,定义保存csv文件的方法
def save_to_csv(output_dir, data, name_prefix,header=None, n_parts=10):
    path_format = os.path.join(output_dir, "{}_{:02d}.csv")
    filenames = []
    
    for file_idx, row_indices in enumerate(np.array_split(np.arange(len(data)), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        filenames.append(part_csv)
        with open(part_csv, "wt", encoding="utf-8") as f:
            if header is not None:
                f.write(header + "\n")
            for row_index in row_indices:
                f.write(",".join([repr(col) for col in data[row_index]]))
                f.write('\n')
    return filenames

# 6,分别按行合并"训练数据","验证数据","测试数据"中的 特征值和目标值
# 1)使用np.c_[]方法合并
# train_data = np.c_[x_train_scaled, y_train]
# valid_data = np.c_[x_valid_scaled, y_valid]
# test_data = np.c_[x_test_scaled, y_test]

# 2)使用np.column_stack()方法合并
train_data = np.column_stack((x_train_scaled, y_train))
valid_data = np.column_stack((x_valid_scaled, y_valid))
test_data = np.column_stack((x_test_scaled, y_test))

# 7,将数据集中增加目标特征的字段;合并特征字段为一个字符串
header_cols = housing.feature_names + ["MidianHouseValue"]
header_str = ",".join(header_cols)

# 8,将内存中的数据存储为csv文件
train_filenames = save_to_csv(output_dir, train_data, "train",header_str, n_parts=20)
valid_filenames = save_to_csv(output_dir, valid_data, "valid",header_str, n_parts=10)
test_filenames = save_to_csv(output_dir, test_data, "test",header_str, n_parts=10)

二, 读取目录下的"训练集",“测试集”,"验证集"对应的文件名称列表

csv_files = os.listdir("./generate_csv")
train_filenames = ["./generate_csv/" + index  for index in csv_files if "train" in index ]
valid_filenames = ["./generate_csv/" + index  for index in csv_files if "valid" in index ]
test_filenames = ["./generate_csv/" + index  for index in csv_files if "test" in index ]
train_filenames.sort()
valid_filenames.sort()
test_filenames.sort()
import pprint
print("train filenames:")
pprint.pprint(train_filenames)
print("valid filenames:")
pprint.pprint(valid_filenames)
print("test filenames:")
pprint.pprint(test_filenames)

三,将文件名列表转化为 tf.data.Dataset类型

"""
# 1. filename -> dataset
# 2. read file -> dataset -> datasets -> merge
# 3. parse csv
"""
filename_dataset = tf.data.Dataset.list_files(train_filenames)
for filename in filename_dataset:
    print(filename)

四, 将tf.data.Dataset实例化对象中的元素合并

n_readers = 5
dataset = filename_dataset.interleave(
    lambda filename: tf.data.TextLineDataset(filename).skip(1),  # skip来去掉首行列名
    cycle_length = n_readers
)
# 取前15条数据
for line in dataset.take(15):
    print(line.numpy())

五,decode_csv的用法

# tf.io.decode_csv(str, record_defaults)
sample_str = '1,2,3,4,5'
record_defaults = [
    tf.constant(0, dtype=tf.int32),
    0,
    np.nan,
    "hello",
    tf.constant([])
]
parsed_fields = tf.io.decode_csv(sample_str, record_defaults)
print(parsed_fields)

###############################################################################
# try:
#     parsed_fields = tf.io.decode_csv(',,,,', record_defaults)
# except tf.errors.InvalidArgumentError as ex:
#     print(ex)

###############################################################################
# try:
#     parsed_fields = tf.io.decode_csv('1,2,3,4,5,6,7', record_defaults)
# except tf.errors.InvalidArgumentError as ex:
#     print(ex)

###############################################################################

六,带入一条数据测试decode_csv的用法

def parse_csv_line(line, n_fields = 9):
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:])
    return x, y

parse_csv_line(b'-0.9868720801669367,0.832863080552588,-0.18684708416901633,-0.14888949288707784,-0.4532302419670616,-0.11504995754593579,1.6730974284189664,-0.7465496877362412,1.138',
               n_fields=9)

七,使用tf.data.Dataset将csv文件转化为可训练的tensor数据类型 的完整代码

# 1. filename -> dataset
# 2. read file -> dataset -> datasets -> merge
# 3. parse csv
def csv_reader_dataset(filenames, n_readers=5,
                       batch_size=32, n_parse_threads=5,
                       shuffle_buffer_size=10000):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()
    dataset = dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename).skip(1),
        cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(parse_csv_line,num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset

train_set = csv_reader_dataset(train_filenames, batch_size=32)
for x_batch, y_batch in train_set.take(2):
    print("x:")
    pprint.pprint(x_batch)
    print("y:")
    pprint.pprint(y_batch)  

batch_size = 32
train_set = csv_reader_dataset(train_filenames,batch_size = batch_size)
valid_set = csv_reader_dataset(valid_filenames,batch_size = batch_size)
test_set = csv_reader_dataset(test_filenames,batch_size = batch_size)

八,使用tf.data.Dataset转化csv文件,代替fit中的训练,验证,测试数据

model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu',input_shape=[8]),
    keras.layers.Dense(1),
])
model.compile(loss="mean_squared_error", optimizer="sgd")
callbacks = [keras.callbacks.EarlyStopping(
    patience=5, min_delta=1e-2)]

history = model.fit(train_set,
                    validation_data = valid_set,
                    steps_per_epoch = 11160 // batch_size,
                    validation_steps = 3870 // batch_size,
                    epochs = 100,
                    callbacks = callbacks)

九,总结代码

import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf
from tensorflow import keras

# 打印使用的python库的版本信息
print(tf.__version__)
print(sys.version_info)
for module in mpl, np, pd, sklearn, tf, keras:
    print(module.__name__, module.__version__)
    
# 1,下载并使用sklearn中的“fetch_california_housing”数据集
from sklearn.datasets import fetch_california_housing

housing = fetch_california_housing()

# 2,拆分数据集中的数据为 训练数据、验证数据、测试数据
from sklearn.model_selection import train_test_split

x_train_all, x_test, y_train_all, y_test = train_test_split(housing.data, housing.target, random_state = 7)
x_train, x_valid, y_train, y_valid = train_test_split(x_train_all, y_train_all, random_state = 11)
print("x_train:",x_train.shape, y_train.shape)
print("x_valid:",x_valid.shape, y_valid.shape)
print("x_test:",x_test.shape, y_test.shape)

# 3,在将数据带入到模型之前,先进行预处理-训练、验证、测试数据标准化
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train)
x_valid_scaled = scaler.transform(x_valid)
x_test_scaled = scaler.transform(x_test)

# 4,创建"generate_csv"文件夹
output_dir = "generate_csv"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

# 5,定义保存csv文件的方法
def save_to_csv(output_dir, data, name_prefix,header=None, n_parts=10):
    path_format = os.path.join(output_dir, "{}_{:02d}.csv")
    filenames = []
    
    for file_idx, row_indices in enumerate(np.array_split(np.arange(len(data)), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        filenames.append(part_csv)
        with open(part_csv, "wt", encoding="utf-8") as f:
            if header is not None:
                f.write(header + "\n")
            for row_index in row_indices:
                f.write(",".join([repr(col) for col in data[row_index]]))
                f.write('\n')
    return filenames

# 6,分别按行合并"训练数据","验证数据","测试数据"中的 特征值和目标值
# 1)使用np.c_[]方法合并
# train_data = np.c_[x_train_scaled, y_train]
# valid_data = np.c_[x_valid_scaled, y_valid]
# test_data = np.c_[x_test_scaled, y_test]

# 2)使用np.column_stack()方法合并
train_data = np.column_stack((x_train_scaled, y_train))
valid_data = np.column_stack((x_valid_scaled, y_valid))
test_data = np.column_stack((x_test_scaled, y_test))

# 7,将数据集中增加目标特征的字段;合并特征字段为一个字符串
header_cols = housing.feature_names + ["MidianHouseValue"]
header_str = ",".join(header_cols)

# 8,将内存中的数据存储为csv文件
train_filenames = save_to_csv(output_dir, train_data, "train",header_str, n_parts=20)
valid_filenames = save_to_csv(output_dir, valid_data, "valid",header_str, n_parts=10)
test_filenames = save_to_csv(output_dir, test_data, "test",header_str, n_parts=10)


#####################################################################################
csv_files = os.listdir("./generate_csv")
train_filenames = ["./generate_csv/" + index  for index in csv_files if "train" in index ]
valid_filenames = ["./generate_csv/" + index  for index in csv_files if "valid" in index ]
test_filenames = ["./generate_csv/" + index  for index in csv_files if "test" in index ]
train_filenames.sort()
valid_filenames.sort()
test_filenames.sort()
import pprint
print("train filenames:")
pprint.pprint(train_filenames)
print("valid filenames:")
pprint.pprint(valid_filenames)
print("test filenames:")
pprint.pprint(test_filenames)


"""
# 1. filename -> dataset
# 2. read file -> dataset -> datasets -> merge
# 3. parse csv
"""
# filename_dataset = tf.data.Dataset.list_files(train_filenames)
# for filename in filename_dataset:
#     print(filename)
    
    
# n_readers = 5
# dataset = filename_dataset.interleave(
#     lambda filename: tf.data.TextLineDataset(filename).skip(1),  # skip来去掉首行列名
#     cycle_length = n_readers
# )
# # 取前15条数据
# for line in dataset.take(1):
#     print(line.numpy())
    
    
# # tf.io.decode_csv(str, record_defaults)
# sample_str = '1,2,3,4,5'
# record_defaults = [
#     tf.constant(0, dtype=tf.int32),
#     0,
#     np.nan,
#     "hello",
#     tf.constant([])
# ]
# parsed_fields = tf.io.decode_csv(sample_str, record_defaults)
# print(parsed_fields)


def parse_csv_line(line, n_fields = 9):
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:])
    return x, y

# parse_csv_line(b'-0.9868720801669367,0.832863080552588,-0.18684708416901633,-0.14888949288707784,-0.4532302419670616,-0.11504995754593579,1.6730974284189664,-0.7465496877362412,1.138',
#                n_fields=9)


# 1. filename -> dataset
# 2. read file -> dataset -> datasets -> merge
# 3. parse csv
def csv_reader_dataset(filenames, n_readers=5,
                       batch_size=32, n_parse_threads=5,
                       shuffle_buffer_size=10000):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()
    dataset = dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename).skip(1),
        cycle_length = n_readers
    )
    dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(lambda x: parse_csv_line(x) ,num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset

# train_set = csv_reader_dataset(train_filenames, batch_size=32)
# for x_batch, y_batch in train_set.take(2):
#     print("x:")
#     pprint.pprint(x_batch)
#     print("y:")
#     pprint.pprint(y_batch)  


batch_size = 32
train_set = csv_reader_dataset(train_filenames,batch_size = batch_size)
valid_set = csv_reader_dataset(valid_filenames,batch_size = batch_size)
test_set = csv_reader_dataset(test_filenames,batch_size = batch_size)

model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu',input_shape=[8]),
    keras.layers.Dense(1),
])
model.compile(loss="mean_squared_error", optimizer="sgd")
callbacks = [keras.callbacks.EarlyStopping(
    patience=5, min_delta=1e-2)]

history = model.fit(train_set,
                    validation_data = valid_set,
                    steps_per_epoch = 11160 // batch_size,
                    validation_steps = 3870 // batch_size,
                    epochs = 10,
                    callbacks = callbacks)
 类似资料: