目录
1、Python3 采集 GoBeansproxy 日志路由数据(SET/DELETE)存储到 MySQL
2、Python3 实现 GoBeansDB 集群移除节点后的数据副本平衡
自动随机 sharding,只需调整 GoBeansproxy 配置文件 proxy.yaml 提供的 NWR(R ≤ N ≥ W),举个例子即 N=3, R=1, W=1。与 GoBeansproxy 的 route.yaml 配置文件中的 buckets 参数手动分配无关。
N=3, R=1, W=1
N:一份数据(或一个 key-value)最终保证有 3 副本数据一致,根据 W 的配置可能出现短时间内的数据不一致;
R:读时只要从一个节点读到数据就返回(GoBeansproxy 不记录每个 key 具体路由,而是从各个节点去查找,只要从一个节点读到数据就返回);
W:写时需要写到一个节点就表示写成功(GoBeansproxy 不记录写入 key 的具体路由)。
新增或移除节点只需更新 route.yaml 配置文件,重启 GoBeansproxy 即可。因为 GoBeansproxy 不记录路由,所以只要保证集群中至少有一份数据(或一个 key-value)即可正常使用。
如果加入的节点本身就有数据,通过集群也可正常使用访问等操作,当然这是特殊情况。
一是集群新增节点不会影响固定副本的变化。
二是分布式架构模式下,如果移除集群任意一节点,而且不采取任何措施补充一份该节点数据,就会造成该节点的数据在集群中丢失一个副本,原因就是 GoBeansproxy 不记录数据路由,不能像 MongoDB 分片那样自平衡。
所以需要借助 Python 脚本管理,保证集群内固定副本(N)的平衡。
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
'''===========================================================
@Project -> File :GoBeansDB -> gobeansdb_balance_remove_node.py
@IDE :PyCharm
@Author :Mr. Wufei
@Date :2019/11/1 15:45
@Desc :Python3 实现 GoBeansDB 集群移除节点后的数据副本平衡
============================================================'''
import mysqlconn
import struct
import socket
import python3_libmc_beansdb as plb
class BalanceReplica(object):
def __init__(self, mysql_conn_dict, gobeansproxy_conn_list):
"""
:param mysql_conn_dict: 一个包含 MySQL 连接所需信息的字典(格式:{'host': '127.0.0.1', 'port': 3306, 'user': 'test', 'passwd': 'test', 'db': 'test'})
:param gobeansproxy_conn_list: GoBeansproxy 连接的 IP:PORT 列表(格式:['ip1:port1', 'ip2:port2'])
"""
self.__mysql_conn_dict = mysql_conn_dict
self.__gobeansproxy_conn_list = gobeansproxy_conn_list
def _parse_ip_int(self, ip_str):
"""
将 IP 转换为 int 类型
:param ip_str: 一个字符串 IP
:return: int 类型 IP
"""
ip_int = struct.unpack('!I', socket.inet_aton(ip_str))[0]
return ip_int
def _parse_ip_str(self, ip_int):
"""
将 int 类型 IP 转换为字符串类型
:param ip_int: 一个 int 类型 IP
:return: 字符串 IP
"""
ip_str = socket.inet_ntoa(struct.pack('!I', ip_int))
return ip_str
def _construct_sql(self, gobeansdb_old_ip, gobeansdb_port, gobeansdb_new_ip):
"""
构造 SQL
:param gobeansdb_old_ip: 被移除 GoBeansDB 的 IP
:param gobeansdb_port: 被移除 GoBeansDB 的 PORT
:param gobeansdb_new_ip: 接收数据的新 GoBeansDB 的 IP
:return: 两个标准 SQL 语句
"""
old_int_ip = self._parse_ip_int(gobeansdb_old_ip)
new_int_ip = self._parse_ip_int(gobeansdb_new_ip)
sql_update = '''
update gobeansproxy_record
set gobeansdb_ip = %d
where gobeansdb_ip = %d
and gobeansdb_port = %d
''' %(new_int_ip, old_int_ip, gobeansdb_port)
sql_select = '''
select
request_key
from gobeansproxy_record
where gobeansdb_ip = %d
and method_status = 1
and gobeansdb_port = %d
''' %(new_int_ip, gobeansdb_port)
return sql_update, sql_select
def _mysql_operations(self, mysql_conn_dict, sql_update, sql_select):
"""
MySQL 操作
:param mysql_conn_dict: 一个包含 MySQL 连接所需信息的字典
:param sql_update: 一个可执行的更新 SQL 语句
:param sql_select: 一个可执行的查询 SQL 语句
:return: 一个或多个图片分布式存储路径(即图片在 BeansDB/GoBeansDB 的 key)的集合(用集合主要是为了去重,因为可能存在被 DELETE 的 key)
"""
request_key_set = set()
with mysqlconn.MySqlDB(host = mysql_conn_dict['host'], port = mysql_conn_dict['port'], user = mysql_conn_dict['user'], passwd = mysql_conn_dict['passwd'], db = mysql_conn_dict['db']) as mysqldb:
update_status = mysqldb.execute(sql_update)
if update_status == 1:
mysqldb.execute(sql_select)
for request_key_dict in mysqldb:
request_key_set.add(request_key_dict['request_key'])
else:
return update_status
return request_key_set
def _get_multi_list(self, ips_port_list, image_keys_list):
"""
获取所有(一个或多个)给定 key 的值
:param ips_port_list: GoBeansproxy 连接的 IP:PORT 列表(格式:['ip1:port1', 'ip2:port2'])
:param image_keys_list: 一个或多个图片分布式存储路径(即图片在 BeansDB/GoBeansDB 的 key)的列表(格式:[key1, key2, key3])
:return: GoBeansDB 一个或多个 key-value 的字典(格式:{key1 : value1, key2 : value2 , key3 : value3})【针对图片存储,上限为 720,最优为 100】
"""
mc = plb.InteractiveConn(ips_port_list)
image_keys_values_dict = mc.get_multi_list(image_keys_list)
return image_keys_values_dict
def _set_multi_dict(self, ips_port_list, image_keys_values_dict):
"""
GoBeansDB 同时设置一个或多个 key-value
:param ips_port_list: GoBeansDB 连接的 IP:PORT 列表(格式:['ip1:port1', 'ip2:port2'])
:param image_keys_values_dict: GoBeansDB 一个或多个 key-value 的字典(格式:{key1 : value1, key2 : value2 , key3 : value3})
:return: set_status(True/False)
"""
mc = plb.InteractiveConn(ips_port_list)
set_status = mc.set_multi_dict(image_keys_values_dict)
return set_status
def data_migration(self, gobeansdb_old_ip, gobeansdb_port, gobeansdb_new_ip):
"""
可调用数据迁移操作函数
:param gobeansdb_old_ip: 被移除 GoBeansDB 的 IP
:param gobeansdb_port: 被移除 GoBeansDB 的 PORT
:param gobeansdb_new_ip: 接收数据的新 GoBeansDB 的 IP
:return:
"""
sql_update, sql_select = self._construct_sql(gobeansdb_old_ip, gobeansdb_port, gobeansdb_new_ip)
request_key_set = self._mysql_operations(self.__mysql_conn_dict, sql_update, sql_select)
if request_key_set == 0:
return 'Do not repeat this operation if additional nodes are to be programmed.'
gobeansdb_conn_new_str = str(gobeansdb_new_ip) + ':' + str(gobeansdb_port)
gobeansdb_conn_new_list = [gobeansdb_conn_new_str]
image_keys_list = list(request_key_set)
list_len = len(image_keys_list)
print('当前数据迁移 key-value 数量:', list_len)
# image_keys_values_dict 针对图片存储,上限为 720,最优为 100,当长度大于 360 时每次 100 循环执行
if list_len > 360:
# 取整除,列表切片准则
exactly_divisible = list_len // 100
list_num = 0
while list_num <= exactly_divisible:
slice_start = list_num * 100
# 判断是否为最后一次循环,当为最后一次是 stop 即为列表长度
if list_num == exactly_divisible:
slice_stop = list_len
else:
slice_stop = list_num * 100 + 100
# 连接BeansDB/GoBeansDB,获取所有(一个或多个)给定 key 的值
image_keys_values_dict = self._get_multi_list(self.__gobeansproxy_conn_list, image_keys_list[slice_start: slice_stop])
# 连接BeansDB/GoBeansDB,同时设置一个或多个 key-value(原理还是相当于依次执行 set)
set_status = self._set_multi_dict(gobeansdb_conn_new_list, image_keys_values_dict)
if not set_status:
# 如果为 False,不是整个 image_keys_values_dict 执行失败,而是个别 set 的值不存在,可忽略(因为可通过 Python 分析日志找出 set 失败的key)
# 当然,如果数据量小,可不用采取批量形式,set_key_value 即可
print('Data Migration Fail, Value does not exist.')
else:
print('>>> list_len: %d; slice_start: %d; slice_stop: %d' % (list_len, slice_start, slice_stop))
list_num += 1
else:
# 连接BeansDB/GoBeansDB,获取所有(一个或多个)给定 key 的值
image_keys_values_dict = self._get_multi_list(self.__gobeansproxy_conn_list, image_keys_list)
# 连接BeansDB/GoBeansDB,同时设置一个或多个 key-value
set_status = self._set_multi_dict(gobeansdb_conn_new_list, image_keys_values_dict)
if not set_status:
print('Data Migration Fail', image_keys_list)
else:
print('>>> list_len: %d' % (list_len))
return 'Data Migration Successful'
"""
if __name__ == '__main__':
mysql_xxxx_gobeansdb_dict = {'host': 'xx.xx.2.35', 'port': 3701, 'user': 'xxxx_test', 'passwd': 'xxxxtest', 'db': 'xxxx_gobeansdb'}
gobeansproxy_ips_port_list = ['xx.xx.3.36:7905']
br = BalanceReplica(mysql_xxxx_gobeansdb_dict, gobeansproxy_ips_port_list)
result = br.data_migration('xx.xx.2.35', 7981, 'xx.xx.3.36')
print(result)
"""
当前数据迁移 key-value 数量: 7392
>>> list_len: 7392; slice_start: 0; slice_stop: 100
>>> list_len: 7392; slice_start: 100; slice_stop: 200
>>> list_len: 7392; slice_start: 200; slice_stop: 300
>>> list_len: 7392; slice_start: 300; slice_stop: 400
>>> list_len: 7392; slice_start: 400; slice_stop: 500
>>> list_len: 7392; slice_start: 500; slice_stop: 600
>>> list_len: 7392; slice_start: 600; slice_stop: 700
>>> list_len: 7392; slice_start: 700; slice_stop: 800
>>> list_len: 7392; slice_start: 800; slice_stop: 900
>>> list_len: 7392; slice_start: 900; slice_stop: 1000
>>> list_len: 7392; slice_start: 1000; slice_stop: 1100
>>> list_len: 7392; slice_start: 1100; slice_stop: 1200
>>> list_len: 7392; slice_start: 1200; slice_stop: 1300
>>> list_len: 7392; slice_start: 1300; slice_stop: 1400
>>> list_len: 7392; slice_start: 1400; slice_stop: 1500
>>> list_len: 7392; slice_start: 1500; slice_stop: 1600
>>> list_len: 7392; slice_start: 1600; slice_stop: 1700
>>> list_len: 7392; slice_start: 1700; slice_stop: 1800
>>> list_len: 7392; slice_start: 1800; slice_stop: 1900
>>> list_len: 7392; slice_start: 1900; slice_stop: 2000
>>> list_len: 7392; slice_start: 2000; slice_stop: 2100
>>> list_len: 7392; slice_start: 2100; slice_stop: 2200
>>> list_len: 7392; slice_start: 2200; slice_stop: 2300
>>> list_len: 7392; slice_start: 2300; slice_stop: 2400
>>> list_len: 7392; slice_start: 2400; slice_stop: 2500
>>> list_len: 7392; slice_start: 2500; slice_stop: 2600
>>> list_len: 7392; slice_start: 2600; slice_stop: 2700
>>> list_len: 7392; slice_start: 2700; slice_stop: 2800
>>> list_len: 7392; slice_start: 2800; slice_stop: 2900
>>> list_len: 7392; slice_start: 2900; slice_stop: 3000
>>> list_len: 7392; slice_start: 3000; slice_stop: 3100
>>> list_len: 7392; slice_start: 3100; slice_stop: 3200
>>> list_len: 7392; slice_start: 3200; slice_stop: 3300
>>> list_len: 7392; slice_start: 3300; slice_stop: 3400
>>> list_len: 7392; slice_start: 3400; slice_stop: 3500
>>> list_len: 7392; slice_start: 3500; slice_stop: 3600
>>> list_len: 7392; slice_start: 3600; slice_stop: 3700
>>> list_len: 7392; slice_start: 3700; slice_stop: 3800
>>> list_len: 7392; slice_start: 3800; slice_stop: 3900
>>> list_len: 7392; slice_start: 3900; slice_stop: 4000
>>> list_len: 7392; slice_start: 4000; slice_stop: 4100
>>> list_len: 7392; slice_start: 4100; slice_stop: 4200
>>> list_len: 7392; slice_start: 4200; slice_stop: 4300
>>> list_len: 7392; slice_start: 4300; slice_stop: 4400
>>> list_len: 7392; slice_start: 4400; slice_stop: 4500
>>> list_len: 7392; slice_start: 4500; slice_stop: 4600
>>> list_len: 7392; slice_start: 4600; slice_stop: 4700
>>> list_len: 7392; slice_start: 4700; slice_stop: 4800
>>> list_len: 7392; slice_start: 4800; slice_stop: 4900
>>> list_len: 7392; slice_start: 4900; slice_stop: 5000
>>> list_len: 7392; slice_start: 5000; slice_stop: 5100
>>> list_len: 7392; slice_start: 5100; slice_stop: 5200
>>> list_len: 7392; slice_start: 5200; slice_stop: 5300
>>> list_len: 7392; slice_start: 5300; slice_stop: 5400
>>> list_len: 7392; slice_start: 5400; slice_stop: 5500
>>> list_len: 7392; slice_start: 5500; slice_stop: 5600
>>> list_len: 7392; slice_start: 5600; slice_stop: 5700
>>> list_len: 7392; slice_start: 5700; slice_stop: 5800
>>> list_len: 7392; slice_start: 5800; slice_stop: 5900
>>> list_len: 7392; slice_start: 5900; slice_stop: 6000
>>> list_len: 7392; slice_start: 6000; slice_stop: 6100
>>> list_len: 7392; slice_start: 6100; slice_stop: 6200
>>> list_len: 7392; slice_start: 6200; slice_stop: 6300
>>> list_len: 7392; slice_start: 6300; slice_stop: 6400
>>> list_len: 7392; slice_start: 6400; slice_stop: 6500
>>> list_len: 7392; slice_start: 6500; slice_stop: 6600
>>> list_len: 7392; slice_start: 6600; slice_stop: 6700
>>> list_len: 7392; slice_start: 6700; slice_stop: 6800
>>> list_len: 7392; slice_start: 6800; slice_stop: 6900
>>> list_len: 7392; slice_start: 6900; slice_stop: 7000
>>> list_len: 7392; slice_start: 7000; slice_stop: 7100
>>> list_len: 7392; slice_start: 7100; slice_stop: 7200
>>> list_len: 7392; slice_start: 7200; slice_stop: 7300
>>> list_len: 7392; slice_start: 7300; slice_stop: 7392
Data Migration Successful