当前位置: 首页 > 工具软件 > BeansDB > 使用案例 >

BeansDB

薄瑞
2023-12-01

目录

BeansDB是什么❓

BeansDB 的 CAP 特点表现为❓

BeansDB 功能组成❓

服务器端程序实现的功能❓

核心的 Python 脚本提供的功能❓

1、proxy.py

2、sync.py


BeansDB是什么❓

BeansDB 是豆瓣荣誉出品的分布式 key-value 存储系统,该系统是对经典的 Dynamo 的简化。

BeansDB 的 CAP 特点表现为❓

1)最终一致的(C),可能出现短时间内的数据不一致。

2)高可用的(A),部分节点出现故障不影响服务。

3)分布式的,伸缩性比较好(P)

BeansDB 在豆瓣内部有着广泛的使用,比如图片文件、小媒体文件、profile、properties 等等。Beansdb 不像 GFS 等分布式存储系统,一般不用于存储百兆以上单位的大数据。

主要特性包括:

  • 高可用:通过多个可读写的用于备份实现高可用;

  • 最终一致性:通过哈希树实现快速完整数据同步(短时间内数据可能不一致);

  • 容易扩展:可以在不中断服务的情况下进行容量扩展;

  • 高性能:异步 IO 和高性能的 Key Value 数据 Tokyo Cabinet

  • 可配置的可用性和一致性:通过N,W,R进行配置;

  • 简单协议:Memcached 兼容协议,大量可用客户端。

BeansDB 功能组成❓

BeansDB 代码由两部分组成:

1)C 代码实现的服务器端程序;

2)一些 Python 脚本实现的程序。

服务器端程序实现的功能❓

1、基于 tokyo cabinet 实现 key-value 存储功能。为了提高性能,在存储数据时,BeansDB 先将数据快速的存储到 tcmdb(内存 db )中,由独立线程异步的将 tcmdb 中的数据更新到 tchdb 中。可见,在非正常情况下,BeansDB 是有可能丢失数据的,这种情况就需要对等节点同步数据来修复。

1、BeansDB 是一个基于类 riak bitcask 存储结构的 kvstore,其中每台机器存储了全量的图片数据。(日志型key/value存储模型 Bitcask

  • Bitcask 有以下优点:读写低时延,高写入吞吐量,能处理大规模数据集而性能不会显著下降, 数据持久化更好, 不用担心crash会导致数据丢失,通过简单的rsync就能在线备份数据,还能恢复被错误覆盖的数据.算法简单,代码容易维护和调优,在大数据量和高负载下的性能容易估计。
  • Bitcask 也有缺点,它要求所有key信息全部放入内存,,在启动时一次性载入。这对内存索引的效率提出了非常高的要求。beansdb 中改进的 HashTree 有更好的空间效率,它根据key的特点进行了重新编码,大大降低了空间消耗,每条记录平均只需 20 字节, 其中包括 key 、版本、hash、位置等信息,这样一台 8G 内存的服务器可以存储 4亿条记录。如果记录平均大小为 1k 的文本,则能存 400G,如果是平均大小为 100k 的图片,则能存40T。
  • 启动时间也是 Bitcask 算法的缺点,目前在一分钟大概能载入 5 千万条记录的索引,还有进一步优化的空间。如果是意外 crash 后的重启,时间会稍微长一点,视数据量的大小而定,一般也不会超过 10 分钟。
  • 日志结构存储的另一个问题是会有空洞,beansdb 支持外部控制的在线垃圾回收过程,可以安排在夜里进行。通常在硬盘不是太紧张的情况下,几个月进行一次垃圾回收就可以了。

2、为了解决对等系统中的提交的一致性问题,BeansDB 采用版本号+修改的时间戳来标识数据。提交的策略是:高版本号覆盖低版本号,新数据覆盖旧数据。

3、BeansDB 使用 hashtree 来 diff 两个数据节点的数据一致性。Hashtree 中保存的hash数据会固化到磁盘上,同时为了提高性能,在内存中会完整的构造 hashtree 结构,hashtree 的更新也是由独立线程异步更新(在新的查询或者提交前会再次尝试更新 hashtree,保证 hashtree 的实时性,如果 hashtree 已是 valid,这个操作相当于空操作)。

4、BeansDB 对外提供兼容 memcached 协议的接口。BeansDB 额外提供了两个命令:get @xxx 用来返回 hastree 的状态,get ?xxx 用来返回对应 key 的 meta 信息,这两个命令辅助不同节点间 hashtree 的 diff 及数据同步。

小结:BeansDB 的服务器端程序并没有像 cassandra 等提供一套集数据存储、数据复制、数据校验、对等节点管理等分布式功能,它只是实现一个单机 key-value 存储 server 和 hashtree。从选择方面来说,如果使用如 tokyo tyrant 等现成的 key-value 存储 server,BeansDB 也不必自己再使用 memcached+tc 造遍轮子。而服务器端的 hashtree 只提供了存储,对等节点的 dif f需要由外部脚本完成。

核心的 Python 脚本提供的功能❓

相比于 C 的服务器端程序,BeansDB 的几个 Python 脚本程序看起来有些不够工业化。

1、proxy.py

proxy.py 这个代理程序使得 BeansDB 具有了分布式特点,不过这个脚本还是太朴素了些,它提供的功能如下:

1)客户端程序的请求都走到 proxy.py,由 proxy.py 将请求分发到相应的存储服务器程序。不像 Dynamo 中的复杂的分布式一致性 hash 策略,proxy.py 按照 trunk 手动分配数据的分布。比如,在桶数16时,DB = {“A:7900″:range(8), “B:7900”:range(8,16), “B:7900”:range(16)….} 的配置使得,节点 A 的数据分布在桶 0-7,节点 B 的数据分布在桶 8-15,节点 C 的数据分布在桶 0-15。和分布式一致性 hash 对比,这相当于虚拟节点个数是 16,而每个虚拟节点对应的物理存储节点是已知不变的。proxy.py 将请求的 key 取 hash 后模桶数,以得到该桶中对应的节点列表。这样做的好处是:不像分布式一致性hash那样,数据的分布情况不是透明的,不能够手动迁移数据。缺点是:这种手动策略不适合大型的分布式集群,添加删除机器需要人工干预。

2)proxy.py 当前提供的 NWR 是 N=3, R=1, W=1。读时只要从一个节点读到数据就返回,写时只要写到一个节点就表示写成功。这种策略虽然有些粗暴,不过也是最实时高效的,对于准确性要求不高的应用场景来说,也是可以接受的。

#!/usr/bin/env python

import time, math
import logging
import sys
from eventlet import api, greenio, coros, util, tpool

def fnv1a(s):
    prime = 0x01000193
    h = 0x811c9dc5
    for c in s:
        h ^= ord(c)
        h = (h * prime) & 0xffffffff
    return h

class Client(object):
    hash_space = 1<<32
    def __init__(self, servers={}, buckets_count=16, N=3, R=1, W=1):
        self.socks = dict((addr, []) for addr in servers)
        self.N = N
        self.R = R
        self.W = W
        self.buckets_count = buckets_count
        self.bucket_size = self.hash_space / buckets_count
        self.servers = {}
        self.server_buckets = {}
        self.buckets = [[] for i in range(buckets_count)]
        for s,bs in servers.items():
            for b in bs:
                self.buckets[b].append(s)
        for b in range(self.buckets_count):
            self.buckets[b].sort(key=lambda x:hash("%x::%s"%(b,x) * 2))

    def pop_connection(self, addr):
        try:
            sock = self.socks[addr].pop()
        except IndexError:
            sock = None
        if sock is None:
            sock = greenio.socket.socket()
            if ':' in addr:
                addr, port = addr.split(":")
            else:
                port = 11211
            try:
                sock.connect((addr, int(port)))
            except greenio.socket.error:
                print >>sys.stderr, "connect to %s:%s failed" % (addr, port)
                return
        return sock

    def push_connection(self, addr, sock):
        self.socks[addr].append(sock)

    def get_hosts_by_key(self, key):
        hash = fnv1a(key)
        b = hash / self.bucket_size
        return self.buckets[b]

    def _get(self, addr, key):
        sock = self.pop_connection(addr)
        if not sock:
            return
        try:
            sock.send("get %s\r\n" % key)
            reader = sock.makefile('r')
            line = reader.readline()
            r  = None
            if line.startswith("VALUE"):
                _, key, flag, length = line.split(' ')
                value = reader.read(int(length))
                r = value, int(flag)
                reader.read(2) # \r\n
                line = reader.readline() # END\r\n
            reader.close()
            self.push_connection(addr, sock)
            return r
        except:
            raise

    def get(self, key):
        for addr in self.get_hosts_by_key(key):
            r = self._get(addr, key)
            if r is not None:
                return r

    def _set(self, addr, key, value, flag, results=[]):
        sock = self.pop_connection(addr)
        if not sock:
            return
        reader = sock.makefile('r')
        writer = sock.makefile('w')
        writer.write("set %s %d %d %d\r\n" % (key, flag, 0, len(value)))
        writer.write(value)
        writer.write("\r\n")
        writer.flush()
        line = reader.readline()
        r = line.startswith("STORED")
        reader.close()
        writer.close()
        self.push_connection(addr, sock)
        return r

    def set(self, key, value, flag=0, rev=0):
        rs = [self._set(addr, key, value, flag)
                for addr in self.get_hosts_by_key(key)]
        return rs.count(True) > 0

    def delete(self, key):
        pass

def test():
    c = Client({"localhost:7901":range(16),
                "localhost:7902":range(16),
                "localhost:7903":range(16)})
    print c.set('a', 'aaaa', 0)
    print c.get('a')

def handler(store, sock):
    reader = sock.makefile('r')
    writer = sock.makefile('w')

    def writeline(line):
        writer.write(line)
        writer.write('\r\n')

    while True:
        x = reader.readline()
        if not x: break

        args = x.split()
        cmd = args[0]

        st = time.time()
        if cmd == 'get':
            for key in args[1:]:
                v = store.get(key)
                if v is not None:
                    value, flag = v
                    writeline("VALUE %s %d %d" % (key, flag, len(value)))
                    writeline(value)
                    del value, v
            writeline('END')

        elif cmd == 'set':
            key, flag, rev, bytes = args[1:5]
            flag, rev, bytes = int(flag), int(rev), int(bytes)

            buf = reader.read(bytes)
            while len(buf) < bytes:
                buf += reader.read(bytes - len(buf))
            reader.read(2)
            if store.set(key, buf, flag, rev):
                writeline('STORED')
            else:
                writeline('NOT_STORED')
            del buf

        elif cmd == 'delete':
            key = args[1]
            v = store.delete(key)
            noreply = len(args) > 3 and int(args[3]) or False
            if not noreply:
                writeline(v and 'DELETED' or 'NOT_FOUND')

        elif cmd == 'stat':
            writeline('END')

        elif cmd == 'quit':
            break

        else:
            writeline('CLIENT_ERROR')

        t = time.time() - st
        if t > 0.001:
            logging.info(args)
            print t, args

        writer.flush()
        api.sleep()

    reader.close()
    writer.close()
    sock.close()


def main():
    from optparse import OptionParser
    parser = OptionParser()
    parser.add_option("-l", "--listen", dest="host", default="0.0.0.0",
            help="the ip interface to bind")
    parser.add_option("-p", "--port", default=7902, type=int,
            help="which port to listen")
    parser.add_option("-d", "--daemon", action="store_true",
            help="run in daemon", default=False)

    (options, args) = parser.parse_args()

    cfg = {"localhost:7901":range(16),
            "localhost:7902":range(16),
            "localhost:7903":range(16)}
    store = Client(cfg, 16)

    print "server listening on %s:%s" % (options.host, options.port)
    server = api.tcp_listener((options.host, options.port))
    util.set_reuse_addr(server)

    while True:
        try:
            new_sock, address = server.accept()
        except KeyboardInterrupt:
            break
        api.spawn(handler, store, new_sock)

    print 'close listener ...'
    server.close()

if __name__ == '__main__':
    main()

2、sync.py

sync.py 脚本定时比较位于一个桶中的各存储节点的 hashtree 的 hash 是否相同。这个比较是从 hashtree 的根节点开始的,如果根节点相同就不需要继续比较,否则递归向下比较到hash值不同的叶子节点,diff 出 value 不同时,由高版本节点数据覆盖低版本的节点数据。

#!/usr/bin/python

import sys, os, os.path
from dbclient import Beansdb, db

def get_dir(s, dir):
    def parse(line):
        p,h,c = line.split(' ')
        return p, (int(h), int(c))
    return dict(parse(line) for line in
                filter(None, (s.get(dir) or '').split('\n')))

def is_dir(d):
    return len(d) == 16 and len([k for k in d if k.endswith('/')]) == 16

def mirror(src, dst, path):
    s = get_dir(src, path)
    d = get_dir(dst, path)
    if s == d:
        print path, src, dst, 'skipped'
        return
    if is_dir(s):
        for k in sorted(s):
            if s[k] != d.get(k):
                #print path+k[0], 'mirror ', s[k], d.get(k)
                mirror(src, dst, path+k[0])
    elif is_dir(d):
        for k in sorted(d):
            mirror(dst, src, path+k[0])
    elif not is_dir(s) and not is_dir(d):
        sync_files(src, dst, path, s, d)
        sync_files(dst, src, path, d, s)
    else:
        print path, src, '=>', dst, 'skipped'

def sync_files(src, dst, path, s, d):
    for k in sorted(s.keys()):
        if k not in d:
            data = src.get(k)
            if data is not None:
                print path, k, s[k], d.get(k,(0,0)), src, "=>", dst, dst.set(k, data, s[k][1])
            else:
                print path, src, k, 'is None', src.delete(k)
        elif s[k][0] != d[k][0]:
            if s[k][1] > d[k][1]:
                data = src.get(k)
                if data is not None:
                    print path, k, s[k], d.get(k,(0,0)), src, "=>", dst, dst.set(k, data, s[k][1])
                else:
                    print path, src, k, 'is None', src.delete(k)
            elif s[k][1] == d[k][1]:
                m1 = int((src.get('?'+k) or '0').split(' ')[-1])
                m2 = int((dst.get('?'+k) or '0').split(' ')[-1])
                print path, src, k, 'is broken', s[k], m1, d[k], m2
                if m1 > m2:
                    dst.set(k, src.get(k))
                elif m2 >= m1:
                    src.set(k, dst.get(k))

def stat(s):
    st = {}
    for d,h,c in [line.split(' ') for line in (s.get('@') or '').split('\n') if line]:
        if len(d) != 2 and not d.endswith('/'):
            return {}
        try:
            st[int(d[0],16)] = (h,int(c))
        except:
            pass
    return st

def almost(a,b):
    return abs(a-b) < 0.2*(abs(a)+abs(b))

def sync(db, start=0):
    stats = {}
    for n,s in db.servers.items():
        stats[str(s)] = stat(s)
    for b in range(start, db.buckets_count):
        N = len(db.buckets[b])
        for s in range(N)[::-1]:
            src = db.buckets[b][s]
            dst = db.buckets[b][(s+1)%N]
            if not stats[str(src)] or not stats[str(dst)]:
                continue
            ss = stats[str(src)].get(b, (0,0))
            ds = stats[str(dst)].get(b, (0,0))
            if ss != ds:
                print '%02x'%b,src,ss, dst, ds
                mirror(src, dst, "@%0x"%b)

def lock(fd):
    import fcntl, errno
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX|fcntl.LOCK_NB)
    except IOError, e:
        if e.errno in (errno.EACCES, errno.EAGAIN):
            print "There is an instance of", sys.argv[0], "running. Quit"
            sys.exit(0)
        else:
            raise

def main():
    import os
    lock_file_path = '/tmp/lsync.lock'
    fd = os.open(lock_file_path, os.O_CREAT|os.O_RDWR, 0660)
    try:
        lock(fd)
        if len(sys.argv)>1:
            sync(db, int(sys.argv[1]))
        else:
            sync(db)
    finally:
        os.close(fd)

if __name__ == "__main__":
    main()

 

 类似资料:

相关阅读

相关文章

相关问答