Milvus 于 2019 年开源,主要用于存储、索引和管理通过深度神经网络和机器学习模型产生的海量向量数据。
Milvus 向量数据库专为向量查询与检索设计,能够为万亿级向量数据建立索引。与传统关系型数据库不同,Milvus 主要用于自下而上地处理非结构化数据向量。非结构化数据没有统一的预定义模型,因此可以转化为向量。
随着互联网不断发展,电子邮件、论文、物联网传感数据、社交媒体照片、蛋白质分子结构等非结构化数据已经变得越来越普遍。如果想要使用计算机来处理这些数据,需要使用 embedding 技术将这些数据转化为向量。随后,Milvus 会存储这些向量,并为其建立索引。Milvus 能够根据两个向量之间的距离来分析他们的相关性。如果两个向量十分相似,这说明向量所代表的源数据也十分相似。
向量又称为
embedding vector
,是指由embedding技术从离散变量(如xxx等各种非结构化数据)转变而来的连续向量。在数学表示上,向量是一个由浮点数或者二值型数据组成的 n 维数组。通过现代的向量转化技术,比如各种人工智能(AI)或者机器学习(ML)模型,可以将非结构化数据抽象为 n 维特征向量空间的向量。这样就可以采用最近邻算法(ANN)计算非结构化数据之间的相似度。
Collection
包含一组 entity,可以等价于关系型数据库系统(RDBMS)中的表。
Entity
包含一组 field。field 与实际对象相对应。field 可以是代表对象属性的结构化数据,也可以是代表对象特征的向量。primary key 是用于指代一个 entity 的唯一值。
你可以自定义 primary key,否则 Milvus 将会自动生成 primary key。请注意,目前 Milvus 不支持 primary key 去重,因此有可能在一个 collection 内出现 primary key 相同的 entity。
Field
Entity 的组成部分。Field 可以是结构化数据,例如数字和字符串,也可以是向量。
Milvus 2.0 现已支持标量字段过滤。
Segment
Milvus 在数据插入时通过合并数据自动创建的数据文件。一个 collection 可以包含多个 segment。一个 segment 可以包含多个 entity。在搜索中,Milvus 会搜索每个 segment,并返回合并后的结果。
Sharding
Shard 是指将数据写入操作分散到不同节点上,使 Milvus 能充分利用集群的并行计算能力进行写入。默认情况下单个 collection 包含 2 个分片(shard)。目前 Milvus 采用基于主键哈希的分片方式,未来将支持随机分片、自定义分片等更加灵活的分片方式。
Partition 的意义在于通过划定分区减少数据读取,而shard 的意义在于多台机器上并行写入操作。
Partition
把 collection 中的数据根据一定规则在物理存储上分成多个部分。这种对 collection 数据的划分就叫分区(partitioning)。每个 partition 可包含多个segment。
归一化
归一化指的是通过数学变换将向量的模长变为 1 的过程。如需使用点积计算向量相似度,则必须对向量作归一化处理。处理后点积与余弦相似度等价。
索引
索引基于原始数据构建,可以提高对 collection 数据搜索的速度。Milvus 支持多种索引类型。
向量
一种类型的 field,代表对象的特征。非结构化数据可以通过各种 AI 模型和 embedding 技术转化为向量。
目前,一个实体最多只能包含一个向量。
Milvus 在构建索引和查询向量时依赖 CPU 对 SIMD (Single Instruction Multiple Data) 扩展指令集合的支持。请确保运行 Milvus 的 CPU 至少支持以下一种 SIMD 指令集合:
使用 lscpu 命令以检查 CPU 是否支持特定 SIMD 指令集合:
lscpu | grep -e sse4_2 -e avx -e avx2 -e avx512
检查 Docker 及 Docker Compose 版本
因为官网推荐使用docker-compose安装运行,所以需要检查版本是否合适
docker info
确认 Docker 版本。建议使用 19.03 或以上版本。docker-compose version
确认 Docker Compose 版本。建议使用 1.25.1 或以上版本。因为我们生产环境是在专网,无法连接到互联网,所以需要离线安装
离线安装说白了就是在有网的机器上把需要的docker镜像下载下来,然后再把镜像导出,再上传到专网服务器
根据docker-compose.yml里面内容,把用到的镜像导出
docker save 镜像名:版本号 > xxx.tar #这里最好别用IMAGE ID 因为导入的时候还需要另外指定镜像名
下载到本地
下载下来后根据自己的方式把镜像包上传到专网服务器
我们是开发机器可以同时连到两个网,所以通过FTP再上传到专网服务器就好了
导入镜像
docker load -i xxx.tar #会自动加载镜像名称和版本号等内容
导入docker-compose.yml
把官网教程里下载的docker-compose.yml复制过来就行了
启动
docker-compose up -d
#查看启动状态
docker ps 或 docker-compose ps
#查看docker日志
docker logs 容器id
2.0.0
2.3.0.RELEASE
因为这里用的milvus版本是2.0.0的,但是Maven中央仓库中还没有这个版本的依赖,所以需要到GitHub上面把2.0版本的java-sdk下载到本地,然后编译到本地仓库
地址: https://github.com/milvus-io/milvus-sdk-java
下载下来后会有很多类找不到,这时候只需要clean install一下,有些类是编译之后才会有
如果嫌麻烦可以下我编译好了的:https://www.aliyundrive.com/s/3HZQ1VYaqKB·
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>2.0.0</version>
</dependency>
需要跟milvus交互都需要调用MilvusServiceClient,我这里的做法是把它定义成一个Bean,需要用到的地方依赖注入
@Configuration
public class MilvusConfig {
@Value("${milvus.host}")
private String host; //milvus所在服务器地址
@Value("${milvus.port}")
private Integer port; //milvus端口
@Bean
public MilvusServiceClient milvusServiceClient() {
ConnectParam connectParam = ConnectParam.newBuilder()
.withHost(host)
.withPort(port)
.build();
return new MilvusServiceClient(connectParam);
}
}
有了MilvusServiceClient后就可以为所欲为了!
下面介绍几个常用方法
用来存放这个集合需用到的参数
public class FaceArchive {
/**
* 集合名称(库名)
*/
public static final String COLLECTION_NAME = "face_archive";
/**
* 分片数量
*/
public static final Integer SHARDS_NUM = 8;
/**
* 分区数量
*/
public static final Integer PARTITION_NUM = 16;
/**
* 分区前缀
*/
public static final String PARTITION_PREFIX = "shards_";
/**
* 特征值长度
*/
public static final Integer FEATURE_DIM = 256;
/**
* 字段
*/
public static class Field {
/**
* 档案id
*/
public static final String ARCHIVE_ID = "archive_id";
/**
* 小区id
*/
public static final String ORG_ID = "org_id";
/**
* 档案特征值
*/
public static final String ARCHIVE_FEATURE = "archive_feature";
}
/**
* 通过组织id计算分区名称
* @param orgId
* @return
*/
public static String getPartitionName(Integer orgId) {
return PARTITION_PREFIX + (orgId % PARTITION_NUM);
}
}
判断集合是否已经存在
R<Boolean> response = milvusServiceClient.hasCollection(
HasCollectionParam.newBuilder()
.withCollectionName(collectionName)
.build());
返回值boolean类型,有(true)/无(false)
创建集合
FieldType archiveId = FieldType.newBuilder()
.withName(FaceArchive.Field.ARCHIVE_ID)
.withDescription("主键id")
.withDataType(DataType.Int64)
.withPrimaryKey(true)
.withAutoID(false)
.build();
FieldType orgId = FieldType.newBuilder()
.withName(FaceArchive.Field.ORG_ID)
.withDescription("组织id")
.withDataType(DataType.Int32)
.build();
FieldType archiveFeature = FieldType.newBuilder()
.withName(FaceArchive.Field.ARCHIVE_FEATURE)
.withDescription("档案特征值")
.withDataType(DataType.FloatVector)
.withDimension(FaceArchive.FEATURE_DIM)
.build();
CreateCollectionParam createCollectionReq = CreateCollectionParam.newBuilder()
.withCollectionName(FaceArchive.COLLECTION_NAME)
.withDescription("档案集合")
.withShardsNum(FaceArchive.SHARDS_NUM)
.addFieldType(archiveId)
.addFieldType(orgId)
.addFieldType(archiveFeature)
.build();
R<RpcStatus> response = milvusServiceClient.createCollection(createCollectionReq);
创建分区 ->
这里我理解的意思就跟关系型数据库分表一样的,在插入数据时指定插入到哪个分区,查询的时候也一样,这样可以在查询的时候减少数据量
R<RpcStatus> response = milvusServiceClient.createPartition(CreatePartitionParam.newBuilder()
.withCollectionName(collectionName) //集合名称
.withPartitionName(partitionName) //分区名称
.build());
我在这里的做法是先定义了分区总数PARTITION_NUM
, 然后循环建立分区,在查询或者插入的时候根据里面的某个值进行取模,分到对应的分区里面去
/**
* 创建分区
*/
private void createPartition() {
for (int i = 0; i < FaceArchive.PARTITION_NUM; i++) {
milvusService.createPartition(FaceArchive.COLLECTION_NAME, FaceArchive.PARTITION_PREFIX + i);
}
}
创建索引
/**
* 创建索引
*/
public R<RpcStatus> createIndex() {
R<RpcStatus> response = milvusServiceClient.createIndex(CreateIndexParam.newBuilder()
.withCollectionName(FaceArchive.COLLECTION_NAME)
.withFieldName(FaceArchive.Field.ARCHIVE_FEATURE)
.withIndexType(IndexType.IVF_FLAT)
.withMetricType(MetricType.IP)
//nlist 建议值为 4 × sqrt(n),其中 n 指 segment 最多包含的 entity 条数。
.withExtraParam("{\"nlist\":16384}")
.withSyncMode(Boolean.FALSE)
.build());
log.info("createIndex-------------------->{}", response.toString());
R<GetIndexBuildProgressResponse> idnexResp = milvusServiceClient.getIndexBuildProgress(
GetIndexBuildProgressParam.newBuilder()
.withCollectionName(FaceArchive.COLLECTION_NAME)
.build());
log.info("getIndexBuildProgress---------------------------->{}", idnexResp.toString());
return response;
}
调用 create_index() 方法后,Milvus 会为后续新增向量自动构建索引的任务。每当新增数据量达到一个完整的 segment 时即触发这一任务,Milvus 为新插入的向量构建索引。
新增向量的索引文件与前期构建的索引文件相互独立。
至于选择什么样的索引,见官方文档
数据插入
public boolean insert(List<MilvusArchiveDto> data) {
Map<Integer, List<MilvusArchiveDto>> map =
data.stream().filter(item -> ArrayUtil.isNotEmpty(item.getArcsoftFeature())).collect(Collectors.groupingBy(MilvusArchiveDto::getOrgId));
map.forEach((orgId, list) -> {
//插入数据
List<InsertParam.Field> fields = new ArrayList<>();
List<Long> archiveIds = Lists.newArrayList();
List<Integer> orgIds = Lists.newArrayList();
List<List<Float>> floatVectors = Lists.newArrayList();
for (MilvusArchiveDto dto : list) {
archiveIds.add(dto.getArchiveId());
orgIds.add(dto.getOrgId());
//虹软特征值转Float向量
floatVectors.add(MilvusUtil.arcsoftToFloat(dto.getArcsoftFeature()));
}
//档案ID
fields.add(new InsertParam.Field(FaceArchive.Field.ARCHIVE_ID, DataType.Int64, archiveIds));
//小区id
fields.add(new InsertParam.Field(FaceArchive.Field.ORG_ID, DataType.Int32, orgIds));
//特征值
fields.add(new InsertParam.Field(FaceArchive.Field.ARCHIVE_FEATURE, DataType.FloatVector, floatVectors));
//插入
InsertParam insertParam = InsertParam.newBuilder()
.withCollectionName(FaceArchive.COLLECTION_NAME)
.withPartitionName(FaceArchive.getPartitionName(orgId))
.withFields(fields)
.build();
R<MutationResult> insert = milvusClient.insert(insertParam);
log.info("插入:{}", insert);
});
return true;
}
这里就我自己用的插入代码,因为是按组织ID分区的,所以每个组织分一个组,然后再批量插入,其中的向量值是通过虹软人脸识别SDK计算出来的特征值转换成List,
因为虹软特征值本身就是归一化处理了的,只需要把字节转Float就行了
把集合加载到内存中(milvus查询前必须把数据加载到内存中)
public void loadCollection(String collectionName) {
R<RpcStatus> response = milvusServiceClient.loadCollection(LoadCollectionParam.newBuilder()
//集合名称
.withCollectionName(collectionName)
.build());
log.info("loadCollection------------->{}", response);
}
加载分区数据
public void loadPartitions(String collectionName, String partitionsName) {
R<RpcStatus> response = milvusServiceClient.loadPartitions(
LoadPartitionsParam
.newBuilder()
//集合名称
.withCollectionName(collectionName)
//需要加载的分区名称
.withPartitionNames(Lists.newArrayList(partitionsName))
.build()
);
log.info("loadCollection------------->{}", response);
}
释放集合(从内存中释放)
public void releaseCollection(String collectionName) {
R<RpcStatus> response = milvusServiceClient.releaseCollection(ReleaseCollectionParam.newBuilder()
.withCollectionName(collectionName)
.build());
log.info("releaseCollection------------->{}", response);
}
释放分区
public void releasePartition(String collectionName, String partitionsName) {
R<RpcStatus> response = milvusServiceClient.releasePartitions(ReleasePartitionsParam.newBuilder()
.withCollectionName(collectionName)
.addPartitionName(partitionsName)
.build());
log.info("releasePartition------------->{}", response);
}
删除数据
public void deleteEntity(String collectionName, String partitionName, String expr) {
R<MutationResult> response = milvusServiceClient.delete(
DeleteParam.newBuilder()
//集合名称
.withCollectionName(collectionName)
//分区名称
.withPartitionName(partitionName)
//条件 如: id == 1
.withExpr(expr)
.build()
);
log.info("deleteEntity------------->{}", response);
}
搜索
@Override
public SearchTallestSimilarityDto searchTallestSimilarity(byte[] arcsoftFeature, Integer orgId) {
List<Float> arcsoftToFloat = MilvusUtil.arcsoftToFloat(arcsoftFeature);
List<List<Float>> list = new ArrayList<>();
list.add(arcsoftToFloat);
SearchParam.Builder builder = SearchParam.newBuilder()
//集合名称
.withCollectionName(FaceArchive.COLLECTION_NAME)
//计算方式
// 欧氏距离 (L2)
// 内积 (IP)
.withMetricType(MetricType.IP)
//返回多少条结果
.withTopK(1)
//搜索的向量值
.withVectors(list)
//搜索的Field
.withVectorFieldName(FaceArchive.Field.ARCHIVE_FEATURE)
//https://milvus.io/cn/docs/v2.0.0/performance_faq.md
.withParams("{\"nprobe\":512}");
if (orgId != null) {
//如果只需要搜索某个分区的数据,则需要指定分区
builder
.withExpr(FaceArchive.Field.ORG_ID + " == " + orgId)
.withPartitionNames(Lists.newArrayList(FaceArchive.getPartitionName(orgId)));
}
R<SearchResults> search = milvusClient.search(builder.build());
if (search.getData() == null) return null;
SearchResultsWrapper wrapper = new SearchResultsWrapper(search.getData().getResults());
for (int i = 0; i < list.size(); ++i) {
List<SearchResultsWrapper.IDScore> scores = wrapper.GetIDScore(i);
if (scores.size() > 0) {
System.err.println(scores);
SearchResultsWrapper.IDScore idScore = scores.get(0);
return new SearchTallestSimilarityDto(idScore.getLongID(), idScore.getScore());
}
}
return null;
}
搜索是支持多个向量值一起搜的,但是我这里做的是搜索相似度最高的那一个,所以我只需要一个返回数据(返回数据是已经按相似度排序了的)
待补充…