Ceph 针对 fio 测试工具提供了多种引擎,用于适应不同测量对象的情况。本文将介绍其中的 fio_ceph_objectstore 和 fio_ceph_message 引擎。
打开 DWITH_FIO,编译 fio_ceph_objectstore。
./do_cmake.sh -DWITH_FIO=ON
cd build
make fio_ceph_objectstore
把编译得到 libfio_ceph_objectstore.so 的目录加入动态链接库的环境变量。
一般,ceph 默认把库文件安装在 ceph/build/lib 目录下。
export LD_LIBRARY_PATH=/path/to/install/lib
使用 ceph/build/bin 目录下的 fio 工具测试,该工具在 make fio_ceph_objectstore 过程中安装在 bin 目录下。
注意:不要使用自己安装的 fio,以免版本问题发生错误。
运行以下命令,检测是否安装成功。
./fio --enghelp=libfio_ceph_objectstore.so
fio [options] [job options] <job files>
libfio_ceph_objectstore.so 引擎的 job 文件与 fio 内置引擎的 job 文件写法类似。需要注意的是,bluestore 配置项不是直接写在 job 中,而是写在另外的 conf 文件中。并且需要在 job 文件中指明 conf 文件的位置,通过 “conf” 选项。
[root@localhost fio-job]# cat ceph-bluestore.fio
[global]
ioengine=libfio_ceph_objectstore.so #engine 名称
conf=ceph-bluestore.conf #bluestore 配置文件路径
directory=/root/ceph/build/dev/osd0 #osd 路径
rw=randwrite #随机写
iodepth=16 #队列深度
time_based=1 #表示运行时间必须达到设置的 runtime 值,如果提前完成读取写入任务,那么便会循环重复执行,直至时间到达 runtime 值。
runtime=20s #运行时间
[bluestore]
nr_files=64 #
size=256m #总大小 256M
bs=64k #每次写入文件大小 64k
[root@localhost fio-job]# cat ceph-bluestore.conf
[global]
debug bluestore = 0/0
debug bluefs = 0/0
debug bdev = 0/0
debug rocksdb = 0/0
# spread objects over 8 collections
osd pool default pg num = 8
# increasing shards can help when scaling number of collections
osd op num shards = 5
[osd]
osd objectstore = bluestore
# use directory= option from fio job file
osd data = ${fio_dir} #这里的设置和 fio 中的 directory 保持一致
# log inside fio_dir
log file = ${fio_dir}/log
需要注意的是,测试使用的 fio 工具是随编译 objectstore 动态库一起安装的,安装目录为 /ceph/build/bin。如果使用自己安装的其他版本的 fio 工具,可能会报段错误。
[root@localhost fio-job]# /root/ceph/build/bin/fio ceph-bluestore.fio
bluestore: (g=0): rw=randwrite, bs=(R) 64.0KiB-64.0KiB, (W) 64.0KiB-64.0KiB, (T) 64.0KiB-64.0KiB, ioengine=ceph-os, iodepth=16
fio-3.10
Starting 1 process
bluestore: (groupid=0, jobs=1): err= 0: pid=20118: Sun Feb 7 13:59:12 2021
write: IOPS=1646, BW=103MiB/s (108MB/s)(2313MiB/22476msec) #写时 IOPS 和带宽(BW)总览
clat (usec): min=937, max=4160.9k, avg=6608.53, stdev=69534.14 #completion latency,“执行完 IO 的时间”
lat (usec): min=1296, max=4161.2k, avg=7117.14, stdev=72114.55 #总的延时,主要参考指标
clat percentiles (usec):
| 1.00th=[ 1352], 5.00th=[ 1745], 10.00th=[ 2073],
| 20.00th=[ 2638], 30.00th=[ 3064], 40.00th=[ 3490],
| 50.00th=[ 3884], 60.00th=[ 4293], 70.00th=[ 4621],
| 80.00th=[ 5014], 90.00th=[ 5473], 95.00th=[ 5997],
| 99.00th=[ 37487], 99.50th=[ 107480], 99.90th=[ 379585],
| 99.95th=[ 952108], 99.99th=[3808429]
bw ( KiB/s): min= 1532, max=183040, per=100.00%, avg=130515.64, stdev=61789.70, samples=36
iops : min= 23, max= 2860, avg=2039.08, stdev=965.59, samples=36
lat (usec) : 1000=0.01%
lat (msec) : 2=8.25%, 4=44.70%, 10=45.30%, 20=0.37%, 50=0.61%
lat (msec) : 100=0.26%, 250=0.36%, 500=0.08%, 750=0.01%, 1000=0.01%
cpu : usr=16.56%, sys=67.86%, ctx=4379, majf=0, minf=793
IO depths : 1=0.1%, 2=0.1%, 4=14.6%, 8=75.2%, 16=10.2%, 32=0.0%, >=64=0.0%
submit : 0=0.0%, 4=0.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.0%
complete : 0=0.0%, 4=1.5%, 8=23.4%, 16=75.1%, 32=0.0%, 64=0.0%, >=64=0.0%
issued rwts: total=0,37004,0,0 short=0,0,0,0 dropped=0,0,0,0
latency : target=0, window=0, percentile=100.00%, depth=16
Run status group 0 (all jobs):
WRITE: bw=103MiB/s (108MB/s), 103MiB/s-103MiB/s (108MB/s-108MB/s), io=2313MiB (2425MB), run=22476-22476msec
首先介绍下 fio 的 engio 统一接口。具体代码如下。
struct ceph_ioengine : public ioengine_ops {
ceph_ioengine() : ioengine_ops({}) {
name = "ceph-os"; // engine 名称
version = FIO_IOOPS_VERSION; // fio engine版本
flags = FIO_DISKLESSIO; // 标志
setup = fio_ceph_os_setup; // 引擎初始化
queue = fio_ceph_os_queue; // 请求事件入队,队列深度等于 io_depth
commit = fio_ceph_os_commit; // 提交io请求,即读写数据
getevents = fio_ceph_os_getevents; // 获取完成请求的数量
event = fio_ceph_os_event; // 获取指定事件
cleanup = fio_ceph_os_cleanup; // 释放内存
open_file = fio_ceph_os_open; // 打开文件,用于打开要写入的目的地址(块设备、文件)
close_file = fio_ceph_os_close; // 关闭文件
io_u_init = fio_ceph_os_io_u_init; // io数据初始化
io_u_free = fio_ceph_os_io_u_free; // 释放 io 数据内存
options = ceph_options.data(); // 改引擎用到的配置选项的参数及其说明
option_struct_size = sizeof(struct Options);
}
};
fio_ceph_os_setup() 函数主要工作是初始化了 objectstore 实例和 cephcontext 实例,其主要流程如下:
1. 初始化 g_ceph_context 对象,
cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD, CODE_ENVIRONMENT_UTILITY, CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
common_init_finish(g_ceph_context);
2. 创建 objectstore 对象,根据 objectstore type 创建对应的存储引擎对象,一般是 bluestore 引擎。
os.reset(ObjectStore::create(g_ceph_context,
g_conf().get_val<std::string>("osd objectstore"),
g_conf().get_val<std::string>("osd data"),
g_conf().get_val<std::string>("osd journal")));
3. 创建并加载本地对象存储系统
int r = os->mkfs();
r = os->mount();
4. 创建 collection,Transaction
std::vector<Collection>& collections;
collections.emplace_back(pg, ch);
ObjectStore::Transaction t;
根据上述步骤,fio_ceph_os_setup() 把 g_ceph_context、objectstore、collections 封装在一个 engine 中,并把该 engine 包装在一个 job 对象。通过调用 job->engine->objectstore 可以调用 ceph 的本地存储方法。collections 就是一组对象的放置规则的集合,用于分散对象,里面保存了 pg、coll_t、CollectionHandle等。
目前,ceph 主要采用 bluestore 存储引擎,本文将对该引擎中的 mkfs、mount 等方法结合源码进行分析。
mkfs 作用是在磁盘第一次使用 bluestore 时,写入一些用户指定的配置到磁盘第一个块——超级块(大小可配置,一般为: BDEV_LABEL_BLOCK_SIZE 4096),这样后续使用该磁盘时,可以直接读取配置项。 之所以需要固化这些配置项,是因为 bluestore 使用不同的配置项对于磁盘数据的组织形式不同,如果前后两次上电使用不同的配置项访问磁盘数据有可能导致数据发生永久损坏。对已经 mkfs 过的磁盘再次使用该函数,则会对磁盘做一次 meta 数据检查。
以下为 LABEL_BLOCK 块中存储的数据,主要有 osd id、设备大小、生日时间、设备描述以及一组元数据 map。
/// label for block device
struct bluestore_bdev_label_t {
uuid_d osd_uuid; ///< osd uuid
uint64_t size; ///< device size
utime_t btime; ///< birth time
string description; ///< device description
map<string,string> meta; ///< {read,write}_meta() content from ObjectStore
...
};
mkfs() 主要是初始化 bluestore_bdev_label_t,并写入到磁盘的第一个块中,同时建立块设备链并预分配指定大小的空间。
int BlueStore::mkfs() {
...
{
// 如果之前已经 mkfs 过了,则只做 fsck 检查
r = read_meta("mkfs_done", &done);
...
r = fsck(cct->_conf->bluestore_fsck_on_mkfs_deep);
...
return r; // idempotent
}
// 向/osd-data-path/block写入元数据 type ,设为 bluestore
{
...
r = read_meta("type", &type);
if (r == 0) {
if (type != "bluestore") {
derr << __func__ << " expected bluestore, but type is " << type << dendl;
return -EIO;
}
} else {
r = write_meta("type", "bluestore");
if (r < 0)
return r;
}
}
freelist_type = "bitmap";
//打开设备目录/osd-data-path/
r = _open_path();
if (r < 0)
return r;
//打开/创建设备目录下的/osd-data-path/fsid
r = _open_fsid(true);
if (r < 0)
goto out_path_fd;
//锁定fsid
r = _lock_fsid();
if (r < 0)
goto out_close_fsid;
//读取fsid,若没有,则生成 fsid
r = _read_fsid(&old_fsid);
if (r < 0 || old_fsid.is_zero()) {
if (fsid.is_zero()) {
fsid.generate_random(); //随机生成 fsid
dout(1) << __func__ << " generated fsid " << fsid << dendl;
} else {
dout(1) << __func__ << " using provided fsid " << fsid << dendl;
}
// we'll write it later.
} else {
if (!fsid.is_zero() && fsid != old_fsid) {
derr << __func__ << " on-disk fsid " << old_fsid
<< " != provided " << fsid << dendl;
r = -EINVAL;
goto out_close_fsid;
}
fsid = old_fsid;
}
//在/osd-data-path/目录下创建 block 文件,并把它链接到真正的 bluestore_block_path,尝试预分配 bluestore_block_size 大小的空间。
r = _setup_block_symlink_or_file("block", cct->_conf->bluestore_block_path,
cct->_conf->bluestore_block_size,
cct->_conf->bluestore_block_create);
if (r < 0)
goto out_close_fsid;
//若设有多个磁盘,用作 wal 和 db 设备,则继续创建 block.wal 和 block.db 链接,并预分配空间。
if (cct->_conf->bluestore_bluefs) {
r = _setup_block_symlink_or_file("block.wal", cct->_conf->bluestore_block_wal_path,
cct->_conf->bluestore_block_wal_size,
cct->_conf->bluestore_block_wal_create);
if (r < 0)
goto out_close_fsid;
r = _setup_block_symlink_or_file("block.db", cct->_conf->bluestore_block_db_path,
cct->_conf->bluestore_block_db_size,
cct->_conf->bluestore_block_db_create);
if (r < 0)
goto out_close_fsid;
}
//创建并打开 BlockDevice,其类型有pmem,kernel,ust-nvme。ceph有自己的一套块设备操作方式,例如 kernel 设备使用 libaio 直接操作,越过了文件系统。
r = _open_bdev(true);
if (r < 0)
goto out_close_fsid;
// choose min_alloc_size
if (cct->_conf->bluestore_min_alloc_size) {
min_alloc_size = cct->_conf->bluestore_min_alloc_size;
} else {
ceph_assert(bdev);
if (bdev->is_rotational()) {
min_alloc_size = cct->_conf->bluestore_min_alloc_size_hdd;
} else {
min_alloc_size = cct->_conf->bluestore_min_alloc_size_ssd;
}
}
//验证块设备大小是否足够启用 bluefs
_validate_bdev();
// make sure min_alloc_size is power of 2 aligned.
if (!isp2(min_alloc_size)) {
...
goto out_close_bdev;
}
// 启用 cephfs 及其 db,用来存储元数据,一般是 rocksdb
r = _open_db(true);
if (r < 0)
goto out_close_bdev;
...
// 记录 kv_backend 数据库类型
r = write_meta("kv_backend", cct->_conf->bluestore_kvbackend);
if (r < 0)
goto out_close_fm;
// 记录是否采用 bluefs 代替文件系统,基本都采用
r = write_meta("bluefs", stringify(bluefs ? 1 : 0));
if (r < 0)
goto out_close_fm;
// 更新 fsid
if (fsid != old_fsid) {
r = _write_fsid();
if (r < 0) {
derr << __func__ << " error writing fsid: " << cpp_strerror(r) << dendl;
goto out_close_fm;
}
}
if (out_of_sync_fm.fetch_and(0)) {
_sync_bluefs_and_fm();
}
out_close_fm:
_close_fm();
out_close_db:
_close_db();
out_close_bdev:
_close_bdev();
out_close_fsid:
_close_fsid();
out_path_fd:
_close_path();
if (r == 0 &&
cct->_conf->bluestore_fsck_on_mkfs) {
int rc = fsck(cct->_conf->bluestore_fsck_on_mkfs_deep);
if (rc < 0)
return rc;
if (rc > 0) {
derr << __func__ << " fsck found " << rc << " errors" << dendl;
r = -EIO;
}
}
if (r == 0) {
// indicate success by writing the 'mkfs_done' file
r = write_meta("mkfs_done", "yes");
}
if (r < 0) {
derr << __func__ << " failed, " << cpp_strerror(r) << dendl;
} else {
dout(0) << __func__ << " success" << dendl;
}
return r;
}
在 mkfs 中,我们注意到创建了 BlueFS 文件系统,其调用链:mkfs() -> _open_db() -> _open_bluefs()。
这里简单阐述下 BlueFS 的由来。其实 ceph 本身为了避开复杂耗时的 Linux 原生文件系统,特意使用 libaio+directIO 的方式操控裸盘,把数据直接通过 libaio 的 api 读写操作。这样就产生了一个问题,我在读取之前写入的对象时,要如何知道去磁盘的哪一个范围读取,因此就需要另一份元数据用来保存对象的位置、大小等信息。ceph 采用 RocksDB 来保存该信息,但是 RocksDB 本身是在内存中的,不支持直接裸盘持久化。因此 ceph 又设计了 BlueFS 文件系统用来支撑 RocksDB。中国人有句古话:来都来了。既然文件系统来都来了,咱们也不能让它闲着(只负责持久化 RocksDB),干脆直接给所有存储对象来个索引、inode、日志 等信息,方便快速检索、校验文件。
对象数据 -> libaio+directIO -> 磁盘。
【对象数据的元数据+操作日志+rocksdb 日志】 -> rocksdb -> bluefs -> 磁盘。由于 rocksdb 运行在内存中,所以速度极快。
注意 bluefs 的超级块位于 bluestore 的超级块的后面,大小也是 4kb,原因是一般直接读取到内存中,对磁盘速度要求不高,就放在速度最慢的数据盘中。因此对于一块数据盘,一般来说前 8 kb 都是固定的超级块信息(第一个4k存储 bluestore_bdev_label_t)。
这里给出 bluefs 超级块代码:
struct bluefs_super_t {
uuid_d uuid; ///< unique to this bluefs instance
uuid_d osd_uuid; ///< matches the osd that owns us
uint64_t version;
uint32_t block_size;
bluefs_fnode_t log_fnode; // 记录文件系统日志的文件
bluefs_super_t()
: version(0),
block_size(4096) { }
uint64_t block_mask() const {
return ~((uint64_t)block_size - 1);
}
void encode(bufferlist& bl) const;
void decode(bufferlist::const_iterator& p);
void dump(Formatter *f) const;
static void generate_test_instances(list<bluefs_super_t*>& ls);
};
总结:mkfs 固化的配置项:objectstore 类型、fsid、kv_backend(kvDB 类型)、bluefs、freelist_type 等。
_mount 操作主要是加载或验证 mkfs 中固化的信息以及一起其他准备工作。只有当 mount 操作完成后,才能使用 bluestore 存储引擎。
int BlueStore::_mount(bool kv_only, bool open_db) {
...
// fsck 元数据校验
if (cct->_conf->bluestore_fsck_on_mount) {
int rc = fsck(cct->_conf->bluestore_fsck_on_mount_deep);
if (rc < 0)
return rc;
if (rc > 0) {
derr << __func__ << " fsck found " << rc << " errors" << dendl;
return -EIO;
}
}
// 打开 /path 目录
int r = _open_path();
if (r < 0)
return r;
// 打开 fsid 文件
r = _open_fsid(false);
if (r < 0)
goto out_path;
// 读取 fsid 号
r = _read_fsid(&fsid);
if (r < 0)
goto out_fsid;
// 锁定 fsid 文件
r = _lock_fsid();
if (r < 0)
goto out_fsid;
// 打开 /path/block 设备,创建并初始化 BlockDevice,检查 label 信息,
r = _open_bdev(false);
if (r < 0)
goto out_fsid;
// 是否启用 kvdb,一般是 rocksdb
if (open_db) {
// 打开 kvdb,若其为 rocksdb,则 mount bluefs。创建 FreelistManager 和 allocator
r = _open_db_and_around(false);
} else {
// we can bypass db open exclusively in case of kv_only mode
ceph_assert(kv_only);
r = _open_db(false, true);
if (r < 0)
goto out_bdev;
}
// 如果只需要 kvdb 的数据,则不需要做后续步骤
if (kv_only)
return 0;
// changes:
// - super: added ondisk_format
// - super: added min_readable_ondisk_format
// - super: added min_compat_ondisk_format
// - super: added min_alloc_size
// - super: removed min_min_alloc_size
r = _upgrade_super();
if (r < 0) {
goto out_db;
}
// 加载所有 collection 至内存
r = _open_collections();
if (r < 0)
goto out_db;
// 加载日志
r = _reload_logger();
if (r < 0)
goto out_coll;
// 开启 kvdb 同步、终止线程
_kv_start();
r = _deferred_replay();
if (r < 0)
goto out_stop;
mempool_thread.init();
if (!per_pool_stat_collection &&
cct->_conf->bluestore_fsck_quick_fix_on_mount == true) {
dout(1) << __func__ << " quick-fix on mount" << dendl;
_fsck_on_open(FSCK_SHALLOW, true);
//reread statfs
//FIXME minor: replace with actual open/close?
_open_statfs();
_check_legacy_statfs_alert();
}
mounted = true;
return 0;
out_stop:
_kv_stop();
out_coll:
_flush_cache();
out_db:
_close_db_and_around();
out_bdev:
_close_bdev();
out_fsid:
_close_fsid();
out_path:
_close_path();
return r;
}
fio_ceph_objectsore 内部采用 ceph 的 objectstore api 实现 io 请求,其请求事件的提交实际在 fio_ceph_os_queue() 函数中完成,fio_ceph_os_commit() 则是一个空函数。
enum fio_q_status fio_ceph_os_queue(thread_data* td, io_u* u)
{
...
//写操作
if (u->ddir == DDIR_WRITE) {
//构建要写对象的 bufferlist,从 io_u 中获取长度和内容信息
bufferlist bl;
bl.push_back(buffer::copy(reinterpret_cast<char*>(u->xfer_buf),
u->xfer_buflen ) );
//初始化属性集合、属性映射
map<string,bufferptr> attrset;
map<string, bufferlist> omaps;
// enqueue a write transaction on the collection's handle
ObjectStore::Transaction t;
//填写 attrset、omaps,根据设置文件中的参数
// fill attrs if any
if (o->oi_attr_len_high) {
ceph_assert(o->oi_attr_len_high >= o->oi_attr_len_low);
// fill with the garbage as we do not care of the actual content...
job->one_for_all_data.set_length(
ceph::util::generate_random_number(
o->oi_attr_len_low, o->oi_attr_len_high));
attrset["_"] = job->one_for_all_data;
}
...
//注册写事务
t.write(coll.cid, object.oid, u->offset, u->xfer_buflen, bl, flags);
...
if (rmkeys.size()) {
ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
t.omap_rmkeys(coll.cid, pgmeta_oid, rmkeys);
}
if (omaps.size()) {
ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
t.omap_setkeys(coll.cid, pgmeta_oid, omaps);
}
//注册事务提交完成后的回调函数
t.register_on_commit(new UnitComplete(u));
//提交写事务
os->queue_transaction(coll.ch,
std::move(t));
return FIO_Q_QUEUED;
}
//读操作,不涉及改写磁盘数据,不需要事务保证acid特性,直接调用 objectstore api
if (u->ddir == DDIR_READ) {
// ObjectStore reads are synchronous, so make the call and return COMPLETED
bufferlist bl;
//读取指定对象的指定长度,放入 bl 中
int r = os->read(coll.ch, object.oid, u->offset, u->xfer_buflen, bl);
...
return FIO_Q_COMPLETED;
}
derr << "WARNING: Only DDIR_READ and DDIR_WRITE are supported!" << dendl;
u->error = -EINVAL;
td_verror(td, u->error, "xfer");
return FIO_Q_COMPLETED;
}
ceph 中使用事务来保证数据库的 ACID 特性。
这里简单罗列 bluestore 能够保证事务的原因:
对于读操作,因为不涉及改写数据内容,所以可以不通过事务来读取数据,直接使用 objectstore->read() 方法来读取。而写操作比较复杂,接下来通过 queue_transaction() 方法,深入探究 bluestore 写流程。
写操作步骤:
1. 初始化事务
ObjectStore::Transaction t;
2. 获取或者新建一个 collection
auto pg = spg_t{pg_t{i, pool}};
coll_t cid(pg);
bool exists = os->collection_exists(cid);
auto ch = exists ? os->open_collection(cid) : os->create_new_collection(cid);
3. 创建 object
ghobject_t oid(hobject_t(name, "", CEPH_NOSNAP, coll.pg.ps(), coll.pg.pool(), ""));
4. 写入事务
t.write(cid, oid, 0, bl.length(), bl); // bl 是 data 实体
5. 执行事务
os->queue_transaction(ch, std::move(t));
int BlueStore::queue_transactions(
CollectionHandle& ch,
vector<Transaction>& tls,
TrackedOpRef op,
ThreadPool::TPHandle *handle)
{
...
// 把事务中注册的三类回调函数取出来
list<Context *> on_applied, on_commit, on_applied_sync;
ObjectStore::Transaction::collect_contexts(
tls, &on_applied, &on_commit, &on_applied_sync);
// 获取操作队列 OpSequencer,用来保证按顺序执行操作
Collection *c = static_cast<Collection*>(ch.get());
OpSequencer *osr = c->osr.get();
dout(10) << __func__ << " ch " << c << " " << c->cid << dendl;
// 创建 TransContext 并关联回调函数
// prepare
TransContext *txc = _txc_create(static_cast<Collection*>(ch.get()), osr, &on_commit);
for (vector<Transaction>::iterator p = tls.begin(); p != tls.end(); ++p) {
txc->bytes += (*p).get_num_bytes();
// !!!重要,下文继续深入
_txc_add_transaction(txc, &(*p));
}
_txc_calc_cost(txc);
// 记录 onode,更新共享 blob
_txc_write_nodes(txc, txc->t);
// journal deferred items
if (txc->deferred_txn) {
txc->deferred_txn->seq = ++deferred_seq;
bufferlist bl;
encode(*txc->deferred_txn, bl);
string key;
get_deferred_key(txc->deferred_txn->seq, &key);
txc->t->set(PREFIX_DEFERRED, key, bl);
}
// 把 _txc_add_transactio() 中分配或者释放的磁盘空间写入 kvdb
_txc_finalize_kv(txc, txc->t);
if (handle)
handle->suspend_tp_timeout();
auto tstart = mono_clock::now();
throttle_bytes.get(txc->cost);
if (txc->deferred_txn) {
// ensure we do not block here because of deferred writes
if (!throttle_deferred_bytes.get_or_fail(txc->cost)) {
dout(10) << __func__ << " failed get throttle_deferred_bytes, aggressive"
<< dendl;
++deferred_aggressive;
deferred_try_submit();
{
// wake up any previously finished deferred events
std::lock_guard l(kv_lock);
kv_cond.notify_one();
}
throttle_deferred_bytes.get(txc->cost);
--deferred_aggressive;
}
}
auto tend = mono_clock::now();
if (handle)
handle->reset_tp_timeout();
logger->inc(l_bluestore_txc);
// execute (start) 写操作事务的入口:状态机
// 重要,后续分析
_txc_state_proc(txc);
// we're immediately readable (unlike FileStore)
for (auto c : on_applied_sync) {
c->complete(0);
}
// 执行先前注册的回调函数
if (!on_applied.empty()) {
if (c->commit_queue) {
c->commit_queue->queue(on_applied);
} else {
finisher.queue(on_applied);
}
}
log_latency("submit_transact",
l_bluestore_submit_lat,
mono_clock::now() - start,
cct->_conf->bluestore_log_op_age);
log_latency("throttle_transact",
l_bluestore_throttle_lat,
tend - tstart,
cct->_conf->bluestore_log_op_age);
return 0;
}
_txc_add_transaction() 中 根据 Transaction::Op -> op 标志位来执行具体操作,同时创建了对象的 Onode 实例,例如 Transaction::OP_WRITE 则调用 _write() 方法。
int BlueStore::_write(TransContext *txc,
CollectionRef& c,
OnodeRef& o,
uint64_t offset, size_t length,
bufferlist& bl,
uint32_t fadvise_flags)
{
r = _do_write(txc, c, o, offset, length, bl, fadvise_flags);
// 把此对象 OnodeRef 加入 Onode 数组
txc->write_onode(o);
}
_do_write() 中分配一个 WriteContext ,初始化压缩、是否延迟写等设置,确认 extent_map 对应的逻辑地址装载到 kvdb中。
_do_write_data() 判断对象是否可以落在一个 min_alloc_size 内。若可以,则直接小写;若不行,则拆分对象后,分别执行大小写。
int BlueStore::_do_write(
TransContext *txc,
CollectionRef& c,
OnodeRef o,
uint64_t offset,
uint64_t length,
bufferlist& bl,
uint32_t fadvise_flags)
{
...
WriteContext wctx;
//设置压缩参数,使用 BufferIO 或者 Libaio,一般是 Libaio
_choose_write_options(c, o, fadvise_flags, &wctx);
// ensure that a range of the map is loaded
o->extent_map.fault_range(db, offset, length);
// 切割并块对齐, _do_write_small 或 _do_write_big
// | headoffset - headlength | middle offset - middlelength | tailoffset - taillength|
// write_small write_big wirte_small
_do_write_data(txc, c, o, offset, length, bl, &wctx);
// 分配磁盘空间,执行写操作
r = _do_alloc_write(txc, c, o, &wctx);
...
// gc,回收 extent_map 中旧的映射 extents
if (wctx.extents_to_gc.empty() || wctx.extents_to_gc.range_start() > offset || wctx.extents_to_gc.range_end() < offset + length) {
benefit = gc.estimate(offset, length, o->extent_map, wctx.old_extents, min_alloc_size);
}
...
// NB: _wctx_finish() will empty old_extents
// so we must do gc estimation before that
_wctx_finish(txc, c, o, &wctx);
...
}
}
这里简单介绍下 RMW(Read Modify Write)和 COW(Copy-On-Write)。
RMW:指当覆盖写发生时,如果本次改写的内容不足一个磁盘大小,那么需要先将对应的块读取,然后将待修改的内容与原先的内容合并,最后将更新后的块重新写入原先的位置。
COW:指当覆盖写发生时,不是直接更新磁盘对应位置的已有内容,而是重新在磁盘上分配一块新的空间,用于存放本次新写入的内容,这个过程也被称为写时重定向。
这里的小写就对应了 RMW,大写对应了 COW。
// 小写 (ToT)
// 小写(非对齐写)需要额外考虑以下几种情况:
// (1)能否直接跳过执行 COW。一般是对应的 Extent 被压缩过,因为此时执行 RMW 代价太大,所以直接使用 COW,即新建 Blob。
// (2)能否直接复用已有的 Extent 的 unused 块。
// (3)是否执行 WAL 写。使用 RMW 操作需要使用 WAl 来避免数据写坏。
void BlueStore::_do_write_small(
TransContext *txc,
CollectionRef &c,
OnodeRef o,
uint64_t offset, uint64_t length,
bufferlist::iterator& blp,
WriteContext *wctx)
{
...
// 寻找可复用 blob
// Look for an existing mutable blob we can use.
auto begin = o->extent_map.extent_map.begin();
auto end = o->extent_map.extent_map.end();
auto ep = o->extent_map.seek_lextent(offset);
if (ep != begin) {
--ep;
if (ep->blob_end() <= offset) {
++ep;
}
}
...
// 没怎么看懂,结合网上资料,略微介绍下 T_T 。找到 blob 中为完全使用的并且可以放得下的 min_alloc_block,先按块对齐读取已经写入的部分,与要写入的对象合并后,再在无数据的部分置0。这中属于 RMW,需要先写入缓存,后续 submit 时写到磁盘上,这也被称为延迟写。
// | min_alloc_block block对齐 |
// ||..0000..|object-to-write|object-on-disk(read) ||
// | |
do{
...
int r = _do_read(c.get(), o, dctx.off + dctx.used, dctx.tail_read, tail_bl, 0);
_buffer_cache_write(txc, b0, dctx.b_off, bl, wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
b0->dirty_blob().calc_csum(dctx.b_off, bl);
Extent* le = o->extent_map.set_lextent(c, dctx.off,dctx.off - dctx.blob_start, dctx.used, b0, &wctx->old_extents);
// in fact this is a no-op for big writes but left here to maintain
// uniformity and avoid missing after some refactor.
b0->dirty_blob().mark_used(le->blob_offset, le->length);
txc->statfs_delta.stored() += le->length;
...
// 块对齐的情况下,blob 直接有未使用的空间可以用来放置对象,通过 wctx->write() 交给 _do_alloc_write() 处理。
// | block对齐 |
// ||..0000..| object-to-write ||
// | |
o->extent_map.punch_hole(c, offset, length, &wctx->old_extents);
wctx->write(offset, b, alloc_len, b_off0, bl, b_off, length,false, false);
...
}while(any_change)
// 申请一个新的 blob,根据 alloc_len 补0,补成一个最小可写块。通过 wctx->write() 交给 _do_alloc_write() 处理。
// new blob.
BlobRef b = c->new_blob();
uint64_t b_off = p2phase<uint64_t>(offset, alloc_len);
uint64_t b_off0 = b_off;
_pad_zeros(&bl, &b_off0, block_size);
o->extent_map.punch_hole(c, offset, length, &wctx->old_extents);
wctx->write(offset, b, alloc_len, b_off0, bl, b_off, length,
min_alloc_size != block_size, // use 'unused' bitmap when alloc granularity
// doesn't match disk one only
true);
...
}
// 大写,也没怎么看懂 (T_T)
// 这里根据参考资料简单介绍其流程。
// 新建 Blob 并确定 Blob 目标长度,从原始数据中截取对应位置数据,更新Cache。
// 生成一个写条目,将其加入对应的 WriteContext,
// 生成一个新的 LExtent,更新 logic_offset、blob_offset、length等,并关联 Blob
void BlueStore::_do_write_big(
TransContext *txc,
CollectionRef &c,
OnodeRef o,
uint64_t offset, uint64_t length,
bufferlist::iterator& blp,
WriteContext *wctx)
{
BlobRef b;
...
uint32_t l = std::min(max_bsize, length);
...
// 判断是否做延迟写 deferrIO
if (prefer_deferred_size_snapshot &&
l <= prefer_deferred_size_snapshot * 2) {
...
}
// 尝试寻找可以复用的 blob,simpleIO
if (prev_ep->blob->can_reuse_blob(min_alloc_size, max_bsize,offset - prev_ep->blob_start(),&l)) {
b = prev_ep->blob;
}
...
// 找不到就申请新 blob,simpleIO
if (b == nullptr) {
b = c->new_blob();
b_off = 0;
new_blob = true;
}
bufferlist t;
blp.copy(l, t);
wctx->write(offset, b, l, b_off, t, b_off, l, false, new_blob);
}
根据上述分析,可以知道:_do_write_data() 进行大写和小写,将数据放在WriteContext,此时数据还在内存。
_do_alloc_write() 首先根据配置 COMPRESSION_ALGORITHM、bluestore_compression_required_ratio 选择压缩算法并对对象执行压缩。这里涉及到的写入方式有两种:defferIO 和 directIO。
directIO 调用 bdev->aio_write() 写入数据到内存,后续使用 submit 到 DATA 盘。根据块设备的不同,分别调用其对应的异步写函数。
defferIO 在 RMW 覆盖写时会用到,数据会先写入 WAL 盘,后续再同步到 DATA 盘。这是为了保证数据的一致性。
int BlueStore::_do_alloc_write(
TransContext *txc,
CollectionRef coll,
OnodeRef o,
WriteContext *wctx)
{
...
// 分配磁盘空间
prealloc_left = shared_alloc.a->allocate(need, min_alloc_size, need, 0, &prealloc);
// 把分配的磁盘空间地址 pextent 加入 blob
dblob.allocated(p2align(b_off, min_alloc_size), final_length, extents);
...
// 把分配的虚拟空间地址 lextent 加入 extentmap,和 blob 关联
Extent *le = o->extent_map.set_lextent(coll, wi.logical_offset,
b_off + (wi.b_off0 - wi.b_off),
wi.length0,
wi.b,
nullptr);
...
if (l->length() <= prefer_deferred_size.load()) {
// 延迟写
bluestore_deferred_op_t *op = _get_deferred_op(txc);
op->op = bluestore_deferred_op_t::OP_WRITE;
int r = b->get_blob().map(
b_off, l->length(),
[&](uint64_t offset, uint64_t length) {
op->extents.emplace_back(bluestore_pextent_t(offset, length));
return 0;
});
ceph_assert(r == 0);
op->data = *l;
} else {
// 直接写
b->get_blob().map_bl(
b_off, *l,
[&](uint64_t offset, bufferlist& t) {
bdev->aio_write(offset, t, &txc->ioc, false);
});
}
...
}
bluestore 具有数据库的 ACID 特性,主要通过状态机来实现,在 _txc_state_proc() 方法中开启。
int BlueStore::queue_transactions(
CollectionHandle& ch,
vector<Transaction>& tls,
TrackedOpRef op,
ThreadPool::TPHandle *handle) {
...
_txc_write_nodes(txc, txc->t);
...
throttle.try_start_transaction(*db, *txc, tstart);
...
// 执行事务,进入状态机
_txc_state_proc(txc);
...
}
void BlueStore::_txc_state_proc(TransContext *txc) {
while (true) {
switch (txc->get_state()) {
//
case TransContext::STATE_PREPARE:
if (txc->ioc.has_pending_aios()) {
txc->set_state(TransContext::STATE_AIO_WAIT);
txc->had_ios = true;
_txc_aio_submit(txc);
return;
}
// ** fall-thru **
//
case TransContext::STATE_AIO_WAIT: {
mono_clock::duration lat = throttle.log_state_latency(
*txc, logger, l_bluestore_state_aio_wait_lat);
if (ceph::to_seconds<double>(lat) >= cct->_conf->bluestore_log_op_age) {
dout(0) << __func__ << " slow aio_wait, txc = " << txc
<< ", latency = " << lat
<< dendl;
}
}
_txc_finish_io(txc); // may trigger blocked txc's too
return;
case TransContext::STATE_IO_DONE:
ceph_assert(ceph_mutex_is_locked(txc->osr->qlock)); // see _txc_finish_io
if (txc->had_ios) {
++txc->osr->txc_with_unstable_io;
}
throttle.log_state_latency(*txc, logger, l_bluestore_state_io_done_lat);
txc->set_state(TransContext::STATE_KV_QUEUED);
if (cct->_conf->bluestore_sync_submit_transaction) {
if (txc->last_nid >= nid_max ||
txc->last_blobid >= blobid_max) {
dout(20) << __func__
<< " last_{nid,blobid} exceeds max, submit via kv thread"
<< dendl;
} else if (txc->osr->kv_committing_serially) {
dout(20) << __func__ << " prior txc submitted via kv thread, us too"
<< dendl;
// note: this is starvation-prone. once we have a txc in a busy
// sequencer that is committing serially it is possible to keep
// submitting new transactions fast enough that we get stuck doing
// so. the alternative is to block here... fixme?
} else if (txc->osr->txc_with_unstable_io) {
dout(20) << __func__ << " prior txc(s) with unstable ios "
<< txc->osr->txc_with_unstable_io.load() << dendl;
} else if (cct->_conf->bluestore_debug_randomize_serial_transaction &&
rand() % cct->_conf->bluestore_debug_randomize_serial_transaction
== 0) {
dout(20) << __func__ << " DEBUG randomly forcing submit via kv thread"
<< dendl;
} else {
_txc_apply_kv(txc, true);
}
}
{
std::lock_guard l(kv_lock);
kv_queue.push_back(txc);
if (!kv_sync_in_progress) {
kv_sync_in_progress = true;
kv_cond.notify_one();
}
if (txc->get_state() != TransContext::STATE_KV_SUBMITTED) {
kv_queue_unsubmitted.push_back(txc);
++txc->osr->kv_committing_serially;
}
if (txc->had_ios)
kv_ios++;
kv_throttle_costs += txc->cost;
}
return;
case TransContext::STATE_KV_SUBMITTED:
_txc_committed_kv(txc);
// ** fall-thru **
case TransContext::STATE_KV_DONE:
throttle.log_state_latency(*txc, logger, l_bluestore_state_kv_done_lat);
if (txc->deferred_txn) {
txc->set_state(TransContext::STATE_DEFERRED_QUEUED);
_deferred_queue(txc);
return;
}
txc->set_state(TransContext::STATE_FINISHING);
break;
case TransContext::STATE_DEFERRED_CLEANUP:
throttle.log_state_latency(*txc, logger, l_bluestore_state_deferred_cleanup_lat);
txc->set_state(TransContext::STATE_FINISHING);
// ** fall-thru **
case TransContext::STATE_FINISHING:
throttle.log_state_latency(*txc, logger, l_bluestore_state_finishing_lat);
_txc_finish(txc);
return;
default:
derr << __func__ << " unexpected txc " << txc
<< " state " << txc->get_state_name() << dendl;
ceph_abort_msg("unexpected txc state");
return;
}
}
}
fio_ceph_os_getevents() 函数可以通过检查 io_u->flag 是否等于 IO_U_F_FLIGHT 来判断是否完成 io 操作,计算一组任务中完成的个数,一组任务的数量为配置文件中的 io_depth。
int fio_ceph_os_getevents(thread_data* td, unsigned int min,
unsigned int max, const timespec* t)
{
auto job = static_cast<Job*>(td->io_ops_data);
unsigned int events = 0;
io_u* u = NULL;
unsigned int i = 0;
// loop through inflight ios until we find 'min' completions
do {
//#define io_u_qiter(q, io_u, i) \
// for (i = 0; i < (q)->nr && (io_u = (q)->io_us[i]); i++)
//以下代码功能是对 io_u_all 进行轮询,检查是否有完成 io 操作的,如果有,就 event++,当 event 大于最小值时跳出循环。
io_u_qiter(&td->io_u_all, u, i) {
if (!(u->flags & IO_U_F_FLIGHT))
continue;
// 在 UnitComplete() 回调函数中,设置了数据写入完成后把 engine_data 置为1,因此这里只需要判断 engine_data 是否为1就可以知道是否操作完成
if (u->engine_data) {
u->engine_data = nullptr;
job->events[events] = u;
events++;
}
}
if (events >= min)
break;
usleep(100);
} while (1);
return events;
}
其他函数或是比较简单,或是为空函数,这里不再介绍。
用于测试 ceph 网络通信模块,可以设置多种通信方式,如 async+posix 、async+spdk 等。
本安装教程旨在介绍如何在【 CentOS7 + n版 Ceph】中安装 libfio_ceph_messenger.so,其他更高版本可直接参考官方 README.md 文件。
从高版本的 /src/test/fio 合并代码至n版:可直接覆盖拷贝该文件夹,建议使用 o 版。
拷贝后,文件夹中将多出以下文件:
fio_ceph_messenger.cc
ceph-messenger.fio
ceph-messenger.conf
ring_buffer.h
并且 CMakeList.txt 和 README.md 做了修改。
修改 ring_buffer.h 源码
第11行修改为:
#define __cacheline_aligned __attribute__((__aligned__()))
编译并安装动态库
./do_cmake.sh -DWITH_FIO=ON
cd build
make fio_ceph_messenger
添加动态库路径,或者直接拷贝至动态库
export LD_LIBRARY_PATH=/path/to/install/lib
或者
cp /root/ceph/src/build/lib/libfio_ceph_messenger /usr/lib64
使用 ceph/build/bin 目录下的 fio 工具测试,该工具在 make fio_ceph_objectstore 过程中安装在 bin 目录下。
注意:不要使用自己安装的 fio,以免版本问题发生错误。
运行以下命令,检测是否安装成功。
./fio --enghelp=libfio_ceph_messenger.so
fio [options] [job options] <job files>
messenger 引擎支持在两个主机上(一个客户端,另一个服务端)运行测试。在服务器上注释掉客户机部分,在客户机上注释掉服务器部分,在客户机上指定正确的主机名,这样就完成了。
[root@localhost fio-job]# cat ceph-messenger.fio
[global]
bs=4k # 每次写操作大小
size=100m # 总写入数据量
iodepth=128 # 队列深度,即同时写操作最大数
ioengine=libfio_ceph_messenger.so
#ceph_conf_file=ceph-messenger.conf
# In order to select protocol explicitly add 'v1:' or 'v2:' prefix.
# By default v2 is used.
hostname=127.0.0.1 # 服务端节点
port=5555
ms_type=async+posix # or async+dpdk or async+rdma
[client] # 本机为客户端
receiver=0
rw=write
[server] # 本机为服务端
receiver=1
rw=read
[root@localhost fio-job]# cat ceph-messenger.conf
[global]
ms_type=async+posix
ms_crc_data=false
ms_crc_header=false
ms_dispatch_throttle_bytes=0
debug_ms=0/0
[root@localhost fio]# /root/ceph/build/bin/fio ./ceph-messenger.fio
client: (g=0): rw=write, bs=(R) 4096B-4096B, (W) 4096B-4096B, (T) 4096B-4096B, ioengine=ceph-msgr, iodepth=128
server: (g=0): rw=read, bs=(R) 4096B-4096B, (W) 4096B-4096B, (T) 4096B-4096B, ioengine=ceph-msgr, iodepth=128
fio-3.15
Starting 2 processes
...
client: (groupid=0, jobs=1): err= 0: pid=11370: Mon Mar 8 14:46:25 2021
write: IOPS=15.6k, BW=61.0MiB/s (63.0MB/s)(100MiB/1639msec)
slat (nsec): min=1952, max=2469.6k, avg=3372.66, stdev=23256.41
clat (usec): min=1717, max=14411, avg=8186.12, stdev=915.04
lat (usec): min=1721, max=14413, avg=8189.50, stdev=914.66
clat percentiles (usec):
| 1.00th=[ 6128], 5.00th=[ 7242], 10.00th=[ 7373], 20.00th=[ 7504],
| 30.00th=[ 7963], 40.00th=[ 8029], 50.00th=[ 8094], 60.00th=[ 8094],
| 70.00th=[ 8160], 80.00th=[ 8717], 90.00th=[ 9241], 95.00th=[ 9634],
| 99.00th=[11207], 99.50th=[12780], 99.90th=[13698], 99.95th=[14091],
| 99.99th=[14353]
bw ( KiB/s): min=61632, max=64542, per=100.00%, avg=62715.33, stdev=1591.05, samples=3
iops : min=15408, max=16135, avg=15678.67, stdev=397.47, samples=3
lat (msec) : 2=0.01%, 4=0.25%, 10=96.20%, 20=3.54%
cpu : usr=2.63%, sys=3.91%, ctx=2274, majf=0, minf=20
IO depths : 1=0.1%, 2=0.1%, 4=0.1%, 8=0.1%, 16=0.1%, 32=0.1%, >=64=99.8%
submit : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.0%
complete : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.1%
issued rwts: total=0,25600,0,0 short=0,0,0,0 dropped=0,0,0,0
latency : target=0, window=0, percentile=100.00%, depth=128
server: (groupid=0, jobs=1): err= 0: pid=11371: Mon Mar 8 14:46:25 2021
read: IOPS=15.5k, BW=60.6MiB/s (63.6MB/s)(100MiB/1649msec)
slat (nsec): min=40, max=20225, avg=72.88, stdev=267.29
clat (usec): min=3285, max=19524, avg=8242.99, stdev=1133.26
lat (usec): min=3285, max=19524, avg=8243.06, stdev=1133.26
clat percentiles (usec):
| 1.00th=[ 6915], 5.00th=[ 7373], 10.00th=[ 7439], 20.00th=[ 7767],
| 30.00th=[ 7963], 40.00th=[ 8029], 50.00th=[ 8094], 60.00th=[ 8160],
| 70.00th=[ 8160], 80.00th=[ 8717], 90.00th=[ 8848], 95.00th=[ 9634],
| 99.00th=[11863], 99.50th=[13698], 99.90th=[19530], 99.95th=[19530],
| 99.99th=[19530]
bw ( KiB/s): min=60830, max=64792, per=100.00%, avg=62376.67, stdev=2119.03, samples=3
iops : min=15207, max=16198, avg=15594.00, stdev=529.94, samples=3
lat (msec) : 4=0.29%, 10=96.64%, 20=3.07%
cpu : usr=0.00%, sys=3.22%, ctx=2860, majf=0, minf=8
IO depths : 1=0.1%, 2=0.1%, 4=0.1%, 8=0.1%, 16=0.1%, 32=0.1%, >=64=99.8%
submit : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.0%
complete : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.1%
issued rwts: total=25600,0,0,0 short=0,0,0,0 dropped=0,0,0,0
latency : target=0, window=0, percentile=100.00%, depth=128
Run status group 0 (all jobs):
READ: bw=60.6MiB/s (63.6MB/s), 60.6MiB/s-60.6MiB/s (63.6MB/s-63.6MB/s), io=100MiB (105MB), run=1649-1649msec
WRITE: bw=61.0MiB/s (63.0MB/s), 61.0MiB/s-61.0MiB/s (63.0MB/s-63.0MB/s), io=100MiB (105MB), run=1639-1639msec
ceph-bench 是一个用来测试单个 osd 或者单个存储节点的存储性能。
1.拷贝代码
git clone https://github.com/Daemonxiao/ceph-bench.git
2.安装依赖
yum install gcc gcc-c++ jsoncpp-devel libradospp-devel gperftools-devel
3.安装
cd ceph-bench
make
./main [test-pool] [mode=host|osd] <host name|osd name> <-d secs> <-t threads> <-b block>
test-pool: 测试池名称,避免集群中已有的池名称冲突
mode: 选择测试针对单个节点还是单个 osd
host name: 指定节点名称
osd name: 指定 osd 名称
-d: 测试时长,默认10秒,单位秒
-t: 线程数,默认1
-b: 每次写入的大小,默认4096 字节,单位 byte
[root@localhost ceph-bench]# ./main pool-test osd osd.0
[Settings]
pool name: pool-test
mode: osd
specific_bench_item: osd.0
threads: 1
duration: 10
block size: 4096
prepare test pool: pool-test #测试池名称
Finding object names
Benching osd osd.0 #测试的对象
min latency 0.615187 ms #最小时延
max latency 365.483 ms #最大时延
>= 0.6 ms: 7% #### cnt=757 #每次写操作时延分布
>= 0.7 ms: 48% ############################## cnt=4774
>= 0.8 ms: 17% ########## cnt=1707
>= 0.9 ms: 8% #### cnt=793
>= 1 ms: 13% ######## cnt=1351
>= 2 ms: 1% # cnt=182
>= 3 ms: 1% cnt=130
>= 4 ms: 0% cnt=72
>= 5 ms: 0% cnt=13
>= 6 ms: 0% cnt=6
>= 7 ms: 0% cnt=1
>= 10 ms: 0% cnt=11
>= 20 ms: 0% cnt=2
>= 30 ms: 0% cnt=1
>= 90 ms: 0% cnt=1
>= 300 ms: 0% cnt=1
Average iops: 980.195 #平均iops
Average latency: 1.02021 ms #平均时延
Total writes: 9802 #总写入次数
cleaning...
Exiting successfully.