目录
BeansDB 是豆瓣荣誉出品的分布式 key-value 存储系统,该系统是对经典的 Dynamo 的简化。
1)最终一致的(C),可能出现短时间内的数据不一致。
2)高可用的(A),部分节点出现故障不影响服务。
3)分布式的,伸缩性比较好(P)
BeansDB 在豆瓣内部有着广泛的使用,比如图片文件、小媒体文件、profile、properties 等等。Beansdb 不像 GFS 等分布式存储系统,一般不用于存储百兆以上单位的大数据。
主要特性包括:
高可用:通过多个可读写的用于备份实现高可用;
最终一致性:通过哈希树实现快速完整数据同步(短时间内数据可能不一致);
容易扩展:可以在不中断服务的情况下进行容量扩展;
高性能:异步 IO 和高性能的 Key Value 数据 Tokyo Cabinet;
可配置的可用性和一致性:通过N,W,R进行配置;
简单协议:Memcached 兼容协议,大量可用客户端。
BeansDB 代码由两部分组成:
1)C 代码实现的服务器端程序;
2)一些 Python 脚本实现的程序。
1、基于 tokyo cabinet 实现 key-value 存储功能。为了提高性能,在存储数据时,BeansDB 先将数据快速的存储到 tcmdb(内存 db )中,由独立线程异步的将 tcmdb 中的数据更新到 tchdb 中。可见,在非正常情况下,BeansDB 是有可能丢失数据的,这种情况就需要对等节点同步数据来修复。
1、BeansDB 是一个基于类 riak bitcask 存储结构的 kvstore,其中每台机器存储了全量的图片数据。(日志型key/value存储模型 Bitcask)
2、为了解决对等系统中的提交的一致性问题,BeansDB 采用版本号+修改的时间戳来标识数据。提交的策略是:高版本号覆盖低版本号,新数据覆盖旧数据。
3、BeansDB 使用 hashtree 来 diff 两个数据节点的数据一致性。Hashtree 中保存的hash数据会固化到磁盘上,同时为了提高性能,在内存中会完整的构造 hashtree 结构,hashtree 的更新也是由独立线程异步更新(在新的查询或者提交前会再次尝试更新 hashtree,保证 hashtree 的实时性,如果 hashtree 已是 valid,这个操作相当于空操作)。
4、BeansDB 对外提供兼容 memcached 协议的接口。BeansDB 额外提供了两个命令:get @xxx 用来返回 hastree 的状态,get ?xxx 用来返回对应 key 的 meta 信息,这两个命令辅助不同节点间 hashtree 的 diff 及数据同步。
小结:BeansDB 的服务器端程序并没有像 cassandra 等提供一套集数据存储、数据复制、数据校验、对等节点管理等分布式功能,它只是实现一个单机 key-value 存储 server 和 hashtree。从选择方面来说,如果使用如 tokyo tyrant 等现成的 key-value 存储 server,BeansDB 也不必自己再使用 memcached+tc 造遍轮子。而服务器端的 hashtree 只提供了存储,对等节点的 dif f需要由外部脚本完成。
相比于 C 的服务器端程序,BeansDB 的几个 Python 脚本程序看起来有些不够工业化。
proxy.py 这个代理程序使得 BeansDB 具有了分布式特点,不过这个脚本还是太朴素了些,它提供的功能如下:
1)客户端程序的请求都走到 proxy.py,由 proxy.py 将请求分发到相应的存储服务器程序。不像 Dynamo 中的复杂的分布式一致性 hash 策略,proxy.py 按照 trunk 手动分配数据的分布。比如,在桶数16时,DB = {“A:7900″:range(8), “B:7900”:range(8,16), “B:7900”:range(16)….} 的配置使得,节点 A 的数据分布在桶 0-7,节点 B 的数据分布在桶 8-15,节点 C 的数据分布在桶 0-15。和分布式一致性 hash 对比,这相当于虚拟节点个数是 16,而每个虚拟节点对应的物理存储节点是已知不变的。proxy.py 将请求的 key 取 hash 后模桶数,以得到该桶中对应的节点列表。这样做的好处是:不像分布式一致性hash那样,数据的分布情况不是透明的,不能够手动迁移数据。缺点是:这种手动策略不适合大型的分布式集群,添加删除机器需要人工干预。
2)proxy.py 当前提供的 NWR 是 N=3, R=1, W=1。读时只要从一个节点读到数据就返回,写时只要写到一个节点就表示写成功。这种策略虽然有些粗暴,不过也是最实时高效的,对于准确性要求不高的应用场景来说,也是可以接受的。
#!/usr/bin/env python
import time, math
import logging
import sys
from eventlet import api, greenio, coros, util, tpool
def fnv1a(s):
prime = 0x01000193
h = 0x811c9dc5
for c in s:
h ^= ord(c)
h = (h * prime) & 0xffffffff
return h
class Client(object):
hash_space = 1<<32
def __init__(self, servers={}, buckets_count=16, N=3, R=1, W=1):
self.socks = dict((addr, []) for addr in servers)
self.N = N
self.R = R
self.W = W
self.buckets_count = buckets_count
self.bucket_size = self.hash_space / buckets_count
self.servers = {}
self.server_buckets = {}
self.buckets = [[] for i in range(buckets_count)]
for s,bs in servers.items():
for b in bs:
self.buckets[b].append(s)
for b in range(self.buckets_count):
self.buckets[b].sort(key=lambda x:hash("%x::%s"%(b,x) * 2))
def pop_connection(self, addr):
try:
sock = self.socks[addr].pop()
except IndexError:
sock = None
if sock is None:
sock = greenio.socket.socket()
if ':' in addr:
addr, port = addr.split(":")
else:
port = 11211
try:
sock.connect((addr, int(port)))
except greenio.socket.error:
print >>sys.stderr, "connect to %s:%s failed" % (addr, port)
return
return sock
def push_connection(self, addr, sock):
self.socks[addr].append(sock)
def get_hosts_by_key(self, key):
hash = fnv1a(key)
b = hash / self.bucket_size
return self.buckets[b]
def _get(self, addr, key):
sock = self.pop_connection(addr)
if not sock:
return
try:
sock.send("get %s\r\n" % key)
reader = sock.makefile('r')
line = reader.readline()
r = None
if line.startswith("VALUE"):
_, key, flag, length = line.split(' ')
value = reader.read(int(length))
r = value, int(flag)
reader.read(2) # \r\n
line = reader.readline() # END\r\n
reader.close()
self.push_connection(addr, sock)
return r
except:
raise
def get(self, key):
for addr in self.get_hosts_by_key(key):
r = self._get(addr, key)
if r is not None:
return r
def _set(self, addr, key, value, flag, results=[]):
sock = self.pop_connection(addr)
if not sock:
return
reader = sock.makefile('r')
writer = sock.makefile('w')
writer.write("set %s %d %d %d\r\n" % (key, flag, 0, len(value)))
writer.write(value)
writer.write("\r\n")
writer.flush()
line = reader.readline()
r = line.startswith("STORED")
reader.close()
writer.close()
self.push_connection(addr, sock)
return r
def set(self, key, value, flag=0, rev=0):
rs = [self._set(addr, key, value, flag)
for addr in self.get_hosts_by_key(key)]
return rs.count(True) > 0
def delete(self, key):
pass
def test():
c = Client({"localhost:7901":range(16),
"localhost:7902":range(16),
"localhost:7903":range(16)})
print c.set('a', 'aaaa', 0)
print c.get('a')
def handler(store, sock):
reader = sock.makefile('r')
writer = sock.makefile('w')
def writeline(line):
writer.write(line)
writer.write('\r\n')
while True:
x = reader.readline()
if not x: break
args = x.split()
cmd = args[0]
st = time.time()
if cmd == 'get':
for key in args[1:]:
v = store.get(key)
if v is not None:
value, flag = v
writeline("VALUE %s %d %d" % (key, flag, len(value)))
writeline(value)
del value, v
writeline('END')
elif cmd == 'set':
key, flag, rev, bytes = args[1:5]
flag, rev, bytes = int(flag), int(rev), int(bytes)
buf = reader.read(bytes)
while len(buf) < bytes:
buf += reader.read(bytes - len(buf))
reader.read(2)
if store.set(key, buf, flag, rev):
writeline('STORED')
else:
writeline('NOT_STORED')
del buf
elif cmd == 'delete':
key = args[1]
v = store.delete(key)
noreply = len(args) > 3 and int(args[3]) or False
if not noreply:
writeline(v and 'DELETED' or 'NOT_FOUND')
elif cmd == 'stat':
writeline('END')
elif cmd == 'quit':
break
else:
writeline('CLIENT_ERROR')
t = time.time() - st
if t > 0.001:
logging.info(args)
print t, args
writer.flush()
api.sleep()
reader.close()
writer.close()
sock.close()
def main():
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-l", "--listen", dest="host", default="0.0.0.0",
help="the ip interface to bind")
parser.add_option("-p", "--port", default=7902, type=int,
help="which port to listen")
parser.add_option("-d", "--daemon", action="store_true",
help="run in daemon", default=False)
(options, args) = parser.parse_args()
cfg = {"localhost:7901":range(16),
"localhost:7902":range(16),
"localhost:7903":range(16)}
store = Client(cfg, 16)
print "server listening on %s:%s" % (options.host, options.port)
server = api.tcp_listener((options.host, options.port))
util.set_reuse_addr(server)
while True:
try:
new_sock, address = server.accept()
except KeyboardInterrupt:
break
api.spawn(handler, store, new_sock)
print 'close listener ...'
server.close()
if __name__ == '__main__':
main()
sync.py 脚本定时比较位于一个桶中的各存储节点的 hashtree 的 hash 是否相同。这个比较是从 hashtree 的根节点开始的,如果根节点相同就不需要继续比较,否则递归向下比较到hash值不同的叶子节点,diff 出 value 不同时,由高版本节点数据覆盖低版本的节点数据。
#!/usr/bin/python
import sys, os, os.path
from dbclient import Beansdb, db
def get_dir(s, dir):
def parse(line):
p,h,c = line.split(' ')
return p, (int(h), int(c))
return dict(parse(line) for line in
filter(None, (s.get(dir) or '').split('\n')))
def is_dir(d):
return len(d) == 16 and len([k for k in d if k.endswith('/')]) == 16
def mirror(src, dst, path):
s = get_dir(src, path)
d = get_dir(dst, path)
if s == d:
print path, src, dst, 'skipped'
return
if is_dir(s):
for k in sorted(s):
if s[k] != d.get(k):
#print path+k[0], 'mirror ', s[k], d.get(k)
mirror(src, dst, path+k[0])
elif is_dir(d):
for k in sorted(d):
mirror(dst, src, path+k[0])
elif not is_dir(s) and not is_dir(d):
sync_files(src, dst, path, s, d)
sync_files(dst, src, path, d, s)
else:
print path, src, '=>', dst, 'skipped'
def sync_files(src, dst, path, s, d):
for k in sorted(s.keys()):
if k not in d:
data = src.get(k)
if data is not None:
print path, k, s[k], d.get(k,(0,0)), src, "=>", dst, dst.set(k, data, s[k][1])
else:
print path, src, k, 'is None', src.delete(k)
elif s[k][0] != d[k][0]:
if s[k][1] > d[k][1]:
data = src.get(k)
if data is not None:
print path, k, s[k], d.get(k,(0,0)), src, "=>", dst, dst.set(k, data, s[k][1])
else:
print path, src, k, 'is None', src.delete(k)
elif s[k][1] == d[k][1]:
m1 = int((src.get('?'+k) or '0').split(' ')[-1])
m2 = int((dst.get('?'+k) or '0').split(' ')[-1])
print path, src, k, 'is broken', s[k], m1, d[k], m2
if m1 > m2:
dst.set(k, src.get(k))
elif m2 >= m1:
src.set(k, dst.get(k))
def stat(s):
st = {}
for d,h,c in [line.split(' ') for line in (s.get('@') or '').split('\n') if line]:
if len(d) != 2 and not d.endswith('/'):
return {}
try:
st[int(d[0],16)] = (h,int(c))
except:
pass
return st
def almost(a,b):
return abs(a-b) < 0.2*(abs(a)+abs(b))
def sync(db, start=0):
stats = {}
for n,s in db.servers.items():
stats[str(s)] = stat(s)
for b in range(start, db.buckets_count):
N = len(db.buckets[b])
for s in range(N)[::-1]:
src = db.buckets[b][s]
dst = db.buckets[b][(s+1)%N]
if not stats[str(src)] or not stats[str(dst)]:
continue
ss = stats[str(src)].get(b, (0,0))
ds = stats[str(dst)].get(b, (0,0))
if ss != ds:
print '%02x'%b,src,ss, dst, ds
mirror(src, dst, "@%0x"%b)
def lock(fd):
import fcntl, errno
try:
fcntl.lockf(fd, fcntl.LOCK_EX|fcntl.LOCK_NB)
except IOError, e:
if e.errno in (errno.EACCES, errno.EAGAIN):
print "There is an instance of", sys.argv[0], "running. Quit"
sys.exit(0)
else:
raise
def main():
import os
lock_file_path = '/tmp/lsync.lock'
fd = os.open(lock_file_path, os.O_CREAT|os.O_RDWR, 0660)
try:
lock(fd)
if len(sys.argv)>1:
sync(db, int(sys.argv[1]))
else:
sync(db)
finally:
os.close(fd)
if __name__ == "__main__":
main()