当前位置: 首页 > 工具软件 > Aerospike > 使用案例 >

Aerospike之路

淳于宏伯
2023-12-01

Docker

重要概念

镜像:理解为某个系统
容器:镜像运行时的实体

环境准备

安装(mac系统,且已安装brew)

brew docker

基本操作

# 查看、删除镜像
docker images
docker rmi 镜像名
# 查看、删除容器
docker ps -a
dcokder container kill 容器id

Aerospike

官方文档
博客参考

#aerospike-server

安装

docker pull aerospike/aerospike-server
docker run -d --name as_server -p 3000:3000 

*配置文件

进程号存储文件、工作目录、各种端口、副本、过期时间、日志路径、数据持久化路径、

#java 操作

入门

一个基本入门,添加包含两个bin的记录到as中,后面的很多变形都是基于这个架子的修改,像策略的引入等

public class AsQuickStartClient {
    public static void main(String[] args) {
        //1.创建连接,集群需用Host配置多个地址
        AerospikeClient client = new AerospikeClient("localhost", 3000);

        //2.创建key
        Key key = new Key("test", "set1", "key1");

        //3.创建bin
        Bin phone = new Bin("phone", 1008611);
        Bin name = new Bin("name", "zone");

        //4.根据key添加或修改
        client.put(null, key, phone, name);

        //5.根据key查询
        Record record = client.get(null, key);
        System.out.println(record);

        //6.关闭连接
        client.close();
    }
}

简单添加或更新

public class AsCreateOrUpdateClient {
    public static void main(String[] args) {
        AerospikeClient client = new AerospikeClient( "localhost", 3000);
        Key key = new Key("test", "set1", "key1");

        //写入策略
        WritePolicy writePolicy = new WritePolicy();
        //...具体策略下文

        Bin age = new Bin("age",17);

        client.put(writePolicy, key, age);
        Record record = client.get(null, key);
        System.out.println(record);
        //ttl
        System.out.println(record.getTimeToLive());

        client.close();
    }
}
  • 是否保存原生的key

    writePolicy.sendKey = true;
    
  • 设置record的过期时间 单位是秒

    writePolicy.expiration = 0;
    

    -2代表修改时不更新过期时间、-1永不过期、0由配置文件决定,默认30d、大于0代表多少秒过期

  • 版本,默认每次更新都加1

    //设置预期版本和预期策略,预期和实际不符合策略时会抛出异常
    writePolicy.generation = 1;
    writePolicy.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL;
    

    NONE表示无限制 EXPECT_GEN_GT表示预期大于实际则成功 EXPECT_GEN_EQUAL表示预期等于实际则成功

  • 记录存在处理

    //REPLACE_ONLY、UPDATE_ONLY都需要保证记录存在,否则Key not found异常
    writePolicy.recordExistsAction = RecordExistsAction.UPDATE_ONLY;
    //UPDATE、REPLACE是当记录不存在时能创建,且替换可能比更新更具优势UPDAATE
    writePolicy.recordExistsAction = RecordExistsAction.REPLACE;
    //CREATE_ONLY仅创建,存在则报Key already exists异常
    
  • 字符串和数字的更新还可通过拼接append/prepend或加减法add,而不是put

    //年龄是累加的效果
    client.add(writePolicy,key,age);
    
  • 单纯的过期时间更新可用touch函数代替put函数

复杂的添加或更新

public class AsOperationClient {
    public static void main(String[] args) {
        AerospikeClient client = new AerospikeClient( "localhost", 3000);
        Key key = new Key("test", "set1", "key1");
        
        //...具体见下文
        
        client.close();
    }
}

Operation

	Bin age = new Bin("age",17);
	Bin name = new Bin("name","zone");
	Bin hobby = new Bin("hobby", new String[]{"study", "painting", "basketball"});
	
	//operate
	Record record1 = client.operate(null, key, Operation.put(name), Operation.get(), Operation.put(hobby), Operation.put(age));
	System.out.println("获取中间的记录:" + record1);
	Record record2 = client.operate(null, key,Operation.get("hobby"), Operation.get("age"));
	System.out.println("获取指定字段:" + record2);

ListOperation

   List<Integer> list = new ArrayList<>();
   list.add(0);
   list.add(1);
   list.add(2);
   System.out.println(list);
   Bin listBin = new Bin("listBin", list);
   Record record = client.operate(null, key
           ,Operation.put(listBin)
           ,ListOperation.getByRankRange("listBin",0, ListReturnType.VALUE)
           ,ListOperation.increment("listBin",0)
           ,ListOperation.insert("listBin", 0, Value.get(3))
   );
   System.out.println("处理结果:" + record);
   System.out.println("最终结果:" + client.get(null, key));

MapOperation

        Map<String,Integer> hotWord = new HashMap<>();
        hotWord.put("卧槽", 90);
        hotWord.put("扑街", 50);
        hotWord.put("你好", 10);
        Bin hw = new Bin("hw", hotWord);
        client.put(null,key,hw);
        System.out.println("开始的结果" + client.get(null, key));
        Record record = client.operate(null,key
        , MapOperation.put(MapPolicy.Default,"hw", Value.get("扑街"), Value.get(100))
        , MapOperation.increment(MapPolicy.Default,"hw", Value.get("你好"), Value.get(100))
        , MapOperation.getByRankRange("hw",0, MapReturnType.KEY_VALUE));
        System.out.println("处理的结果" + record);

更新的时候,map、list的修改操作会返回集合大小、其他默认返回null,get则返回这个bin,且返回的结果会根据bin进行分组(例如上面的record既返回函数修改结果,也返回查询结果)封装为一个列表;

删除

 public class AsDeleteClient {
    public static void main(String[] args) {
        AerospikeClient client = new AerospikeClient( "localhost", 3000);
        Key key = new Key("ns1","set1","key1");
        //...下文
        client.close();
    }
}

删除记录

boolean result = client.delete(null, key);

但是该删除并没有把磁盘的也删除,重启后会恢复

彻底删除记录

// 通过设置过期时间为1
WritePolicy policy = new WritePolicy();
policy.expiration = 1;
client.touch(policy,key);

成功的版本是4.3.1,用最新的不行,可能是配置的原因

删除bin

//删除---将bin值更新为null,当它是记录中唯一的一个bin时,会删除整个记录。
Bin hw = Bin.asNull("hw");
client.put(null, key, hw);

简单查询

public class AsReadClient {
    public static void main(String[] args) {
        AerospikeClient client = new AerospikeClient( "localhost", 3000);
        Key key = new Key("test","set1","key1");

        //自定义读取策略--例如防止超时
        Policy policy = new Policy();
        policy.socketTimeout = 300;

        //判断记录是否存在--只需查看索引
        boolean exists = client.exists(policy, key);
        System.out.format("1.Exists: %s \n", exists);

        //仅获取元数据
        Record recordMetada = client.getHeader(policy, key);
        System.out.format("2.RecordMetada: %s \n", recordMetada);

        //获取整个记录信息
        Record record = client.get(policy, key);
        System.out.format("3.Record: %s \n", record);

        //指定bin
        Record hwRecord = client.get(policy, key, "hw");
        System.out.format("4.hwRecord: %s \n", hwRecord);

        client.close();
    }
}

上面很多方法都是针对单个key,即一条记录,可使用其他构造方法,key可作为数组传入,获取多个记录

⚡️指定key的同时指定bin,aerospike-server也是需要低版本,才能使用

    List<BatchRead> brs = new ArrayList<>();
    //true代表读取全部
    brs.add(new BatchRead(key2, true));
    //读取指定bin
    brs.add(new BatchRead(key1, new String[]{"hw"}));
    //根据key指定不同的bin
    client.get(null, brs);
    brs.forEach(item -> {
        System.out.println(item.record);
    });

为避免版本问题,一下测试均为低版本

表达式查询

client.query()、 client.scanAll();

*lua脚本学习

注意lua的作用就跟redis中使用lua的作用是差不多的,上文提到的Operation无法实现lua,例如实现先判断后修改的需求(也就是不能实现一个逻辑),Operation是能实现多个操作,但是他不是一段逻辑
待更

*异步操作

上面的操作都是基于同步方式,下面简单举例异步的写入方式

public class AsSyncClient {
    public static void main(String[] args) throws InterruptedException {
        //异步的设置
        ClientPolicy policy = new ClientPolicy();
        EventPolicy ep = new EventPolicy();
        policy.eventLoops = new NioEventLoops(ep, 4);
        AerospikeClient client = new AerospikeClient(policy, "localhost", 3000);
        
        //结果接收
        WriteListener writeListener = new WriteListener() {
            @Override
            public void onSuccess(Key key) {
                System.out.println("写入成功");
                System.out.println(key.toString());
            }
            @Override
            public void onFailure(AerospikeException exception) {
                System.out.println("写入异常");
                System.out.println(exception.toString());
            }
        };

        Key key = new Key("ns1", "set_sync", "key1");
        ArrayList<String> list = new ArrayList<>();
        list.add("wocao");
        list.add("你好");
        Bin bin = new Bin("str",list);

        //开始准备异步写入
        EventLoop eventLoop = policy.eventLoops.next();
        client.put(eventLoop, writeListener, null, key, bin);

        TimeUnit.SECONDS.sleep(5);
        client.close();
        policy.eventLoops.close();
    }
}

aerospike-tools

安装

参考:官方文档

docker pull aerospike/aerospike-tools
docker run -ti  aerospike/aerospike-tools aql --host 172.17.0.2

ip地址是根据docker inspect -f '{{.NetworkSettings.IPAddress}}' as_server获得的,不过一样都是172.17.0.2,其中as_server就是上面aerospike-server的容器名

配置命令

# 查看全部配置
get all 
# 设置输出格式 json or table
set output json

⚡️命名空间已经默认配置在配置文件,而不是由你创建

crud命令

# 查看命名空间、集
show namespaces;
show sets
# 查询
select * from test.set1;
# 插入或修改,pk字段是必须的,不区分大小写(索引--可理解为主键),但bin区分,例如Key="value"和key="value"是两个不同的bin
insert into test.set1 (pk,key) values("index","value");
# 删除
delete from test.set1 where pk="index"

重要概念

Aerospike

分布式、高可用的NoSQL数据库;可以存到固态硬盘,简单省钱;面向行的数据库;

分三层:客户端(AS API)、

数据模型组成

区别于传统数据库,不需要定义表的模式(create table),这是因为存放k-v的Bin,数据类型无需指定,由值确定。

  • namespace数据库:命名空间,包括记录+索引+策略(数据物理存储方式、记录有多少副本、)
  • set表:集
  • Record记录:主键(Key)+元数据(版本、ttl、上次更新时间)+多个Bin
  • Key主键
  • Bin列:存放k-v

每个记录都是由多个k-v构成,其中一个k-v是PK:Key(而且这个字段是不区分大小写的,当你使用tools命令插入一条数据时,他是必须携带一个PK的),也就是存放主键的,他是唯一的,这个PK就是Key,如果使用java客户端添加数据的话,需要Policy.sendKey = true才能看到

索引指向set

digest摘要是通过key和setName计算出来的

集群:命名空间会被分为4096个逻辑分区,然后均匀分布到各个节点中,且记录是通过哈希均匀映射到这些分区的
数据复制和同步:副本,类似es,将分区副本放在其它不同节点。

 类似资料: