当前位置: 首页 > 软件库 > 大数据 > 数据存储 >

gmq

基于 golang 和 redis 实现的简易队列
授权协议 MIT
开发语言 Google Go JavaScript HTML/CSS
所属分类 大数据、 数据存储
软件类型 开源软件
地区 国产
投 递 者 轩辕瑞
操作系统 跨平台
开源组织
适用人群 未知
 软件概览

1. 概述

gmq是基于redis提供的特性,使用go语言开发的一个简单易用的队列;关于 redis 使用特性可以参考之前本人写过一篇很简陋的文章 Redis 实现队列gmq的灵感和设计是基于有赞延迟队列设计,文章内容清晰而且很好理解,但是没有提供源码,在文章的最后也提到了一些未来架构方向; gmq不是简单按照有赞延迟队列的设计实现功能,在它的基础上,做了一些修改和优化,主要如下:

  • 功能上
    • 多种任务模式,不单单只是延迟队列;例如:延迟队列,普通队列,优先级队列
  • 架构上:
    • 添加 job 由dispatcher调度分配各个bucket,而不是由timer
    • 每个bucket维护一个timer,而不是所有bucket一个timer
    • timer每次扫描bucket到期job时,会一次性返回多个到期job,而不是每次只返回一个job
    • timer的扫描时钟由bucket中下个job到期时间决定,而不是每秒扫描一次

2. 应用场景

  • 延迟任务
    • 延迟任务,例如用户下订单一直处于未支付状态,半个小时候自动关闭订单
  • 异步任务
    • 异步任务,一般用于耗时操作,例如群发邮件等批量操作
  • 超时任务
    • 规定时间内(TTR)没有执行完毕或程序被意外中断,则消息重新回到队列再次被消费,一般用于数据比较敏感,不容丢失的
  • 优先级任务
    • 当多个任务同时产生时,按照任务设定等级优先被消费,例如a,b两种类型的job,优秀消费a,然后再消费b

3. 安装

3.1 源码运行

配置文件位于gmq/conf.ini,可以根据自己项目需求修改配置

cd $GOPATH/src # 进入gopath/src目录
git clone https://github.com/wuzhc/gmq.git
cd gmq
go get -u -v github.com/kardianos/govendor # 如果有就不需要安装了
govendor sync -v # 如果很慢,可能需要翻墙
go run main.go start

3.2 执行文件运行

cd $GOPATH/src/gmq
# 编译成可执行文件
go build 
# 启动
./gmq start
# 停止
./gmq stop

# 守护进程模式启动,不输出日志到console
nohup ./gmq start >/dev/null 2>&1  &
# 守护进程模式下查看日志输出(配置文件conf.ini需要设置target_type=file,filename=gmq.log)
tail -f gmq.log

4. 客户端

目前只实现python,go,php语言的客户端的demo,参考:https://github.com/wuzhc/demo/tree/master/mq

运行

# php
# 生产者
php producer.php
# 消费者
php consumer.php

# python
# 生产者
python producer.py
# 消费者
python consumer.py

一条消息结构

{
    "id": "xxxx",	# 任务id,这个必须是一个唯一值,将作为redis的缓存键
    "topic": "xxx",  # topic是一组job的分类名,消费者将订阅topic来消费该分类下的job
    "body": "xxx",   # 消息内容
    "delay": "111",  # 延迟时间,单位秒
    "TTR": "11111",  # 执行超时时间,单位秒
    "status": 1,     # job执行状态,该字段由gmq生成
    "consumeNum":1,  # 被消费的次数,主要记录TTR>0时,被重复消费的次数,该字段由gmq生成
}

延迟任务

$data = [
        'id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_xxx"],
        'body'  => 'this is a rpc test',
        'delay' => '1800', // 单位秒,半个小时后执行
        'TTR'   => '0'
    ];

超时任务

$data = [
        'id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_xxx"],
        'body'  => 'this is a rpc test',
        'delay' => '0', 
        'TTR'   => '100' // 100秒后还未得到消费者ack确认,则再次添加到队列,将再次被被消费
    ];

异步任务

$data = [
        'id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_xxx"],
        'body'  => 'this is a rpc test',
        'delay' => '0', 
        'TTR'   => '0' 
    ];

优先级任务

$data = [
        'id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_A","topic_B","topic_C"], //优先消费topic_A,消费完后消费topic_B,最后再消费topic_C
        'body'  => 'this is a rpc test',
        'delay' => '0', 
        'TTR'   => '0' 
    ];

5. gmq 流程图如下:

一个不规范的流程图

5.1 延迟时间 delay

  • 当 job.delay>0时,job 会被分配到 bucket 中,bucket 会有周期性扫描到期 job,如果到期,job 会被 bucket 移到ready queue,等待被消费
  • 当 job.delay=0 时,job 会直接加到ready queue,等待被消费

5.2 执行超时时间 TTR

参考第一个图的流程,当 job 被消费者读取后,如果job.TTR>0,即 job 设置了执行超时时间,那么 job 会在读取后会被添加到 TTRBucket(专门存放设置了超时时间的 job),并且设置job.delay = job.TTR,如果在 TTR 时间内没有得到消费者 ack 确认然后删除 job,job 将在 TTR 时间之后添加到ready queue,然后再次被消费(如果消费者在 TTR 时间之后才请求 ack,会得到失败的响应)

5.3 确认机制

主要和TTR的设置有关系,确认机制可以分为两种:

  • 当 job.TTR=0 时,消费者pop出 job 时,即会自动删除job pool中的 job 元数据
  • 当 job.TTR>0 时,即 job 执行超时时间,这个时间是指用户pop出 job 时开始到用户ack确认删除结束这段时间,如果在这段时间没有ACK,job 会被再次加入到ready queue,然后再次被消费,只有用户调用了ACK,才会去删除job pool中 job 元数据

6. web 监控

gmq提供了一个简单 web 监控平台(后期会提供根据 job.Id 追踪消息的功能),方便查看当前堆积任务数,默认监听端口为8000,例如:http://127.0.0.1:8000,界面如下: 后台模板来源于https://github.com/george518/PPGo_Job

7. 遇到问题

以下是开发遇到的问题,以及一些粗糙的解决方案

7.1 安全退出

如果强行中止gmq的运行,可能会导致一些数据丢失,例如下面一个例子:
gmq之timer定时器
如果发生上面的情况,就会出现 job 不在bucket中,也不在ready queue,这就出现了 job 丢失的情况,而且将没有任何机会去删除job pool中已丢失的 job,长久之后job pool可能会堆积很多的已丢失 job 的元数据;所以安全退出需要在接收到退出信号时,应该等待所有goroutine处理完手中的事情,然后再退出

7.1.1 gmq退出流程

gmq安全退出.png
首先gmq通过 context 传递关闭信号给dispatcher,dispatcher接收到信号会关闭dispatcher.closed,每个bucket会收到close信号,然后先退出timer检索,再退出bucket,dispatcher等待所有 bucket 退出后,然后退出

dispatcher退出顺序流程: timer -> bucket -> dispatcher

7.1.2 注意

不要使用kill -9 pid来强制杀死进程,因为系统无法捕获 SIGKILL 信号,导致 gmq 可能执行到一半就被强制中止,应该使用kill -15 pid,kill -1 pidkill -2 pid,各个数字对应信号如下:

7.2 智能定时器

  • 每一个bucket都会维护一个timer,不同于有赞设计,timer不是每秒轮询一次,而是根据bucket下一个 job 到期时间来设置timer的定时时间 ,这样的目的在于如果bucket没有 job 或 job 到期时间要很久才会发生,就可以减少不必要的轮询;
  • timer只有处理完一次业务后才会重置定时器,这样的目的在于可能出现上一个时间周期还没执行完毕,下一个定时事件又发生了
  • 如果到期的时间很相近,timer就会频繁重置定时器时间,就目前使用来说,还没出现什么性能上的问题

7.3 原子性问题

我们知道 redis 的命令是排队执行,在一个复杂的业务中可能会多次执行 redis 命令,如果在大并发的场景下,这个业务有可能中间插入了其他业务的命令,导致出现各种各样的问题;
redis 保证整个事务原子性和一致性问题一般用multi/execlua脚本gmq在操作涉及复杂业务时使用的是lua脚本,因为lua 脚本除了有multi/exec的功能外,还有Pipepining功能(主要打包命令,减少和redis server通信次数),下面是一个gmq定时器扫描 bucket 集合到期 job 的 lua 脚本:

-- 获取到期的50个job
local jobIds = redis.call('zrangebyscore',KEYS[1], 0, ARGV[4], 'withscores', 'limit', 0, 50)
local res = {}
for k,jobId in ipairs(jobIds) do 
	if k%2~=0 then
		local jobKey = string.format('%s:%s', ARGV[3], jobId)
		local status = redis.call('hget', jobKey, 'status')
		-- 检验job状态
		if tonumber(status) == tonumber(ARGV[1]) or tonumber(status) == tonumber(ARGV[2]) then
			-- 先移除集合中到期的job,然后到期的job返回给timer
			local isDel = redis.call('zrem', KEYS[1], jobId)
			if isDel == 1 then
				table.insert(res, jobId)
			end
		end
	end
end

local nextTime
-- 计算下一个job执行时间,用于设置timer下一个时钟周期
local nextJob = redis.call('zrange', KEYS[1], 0, 0, 'withscores')
if next(nextJob) == nil then
	nextTime = -1
else
	nextTime = tonumber(nextJob[2]) - tonumber(ARGV[4])
	if nextTime < 0 then
		nextTime = 1
	end
end

table.insert(res,1,nextTime)
return res

7.4 redis 连接池

可能一般 phper 写业务很少会接触到连接池,其实这是由 php 本身所决定他应用不大,当然在 php 的扩展swoole还是很有用处的
gmq的 redis 连接池是使用gomodule/redigo/redis自带连接池,它带来的好处是限制 redis 连接数,通过复用 redis 连接来减少开销,另外可以防止 tcp 被消耗完,这在生产者大量生成数据时会很有用

// gmq/mq/redis.go
Redis = &RedisDB{
    Pool: &redis.Pool{
        MaxIdle:     30,    // 最大空闲链接
        MaxActive:   10000, // 最大链接
        IdleTimeout: 240 * time.Second, // 空闲链接超时
        Wait:        true, // 当连接池耗尽时,是否阻塞等待
        Dial: func() (redis.Conn, error) {
            c, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword(""))
            if err != nil {
                return nil, err
            }
            return c, nil
		},
        TestOnBorrow: func(c redis.Conn, t time.Time) error {
            if time.Since(t) < time.Minute {
            	return nil
            }
        	_, err := c.Do("PING")
        	return err
        },
    },
}

8. 注意问题

  • job.id 在job pool是唯一的,它将作为 redis 的缓存键;gmq不自动为 job 生成唯一 id 值是为了用户可以根据自己生成的 job.id 来追踪 job 情况,如果 job.id 是重复的,push 时会报重复 id 的错误
  • bucket 数量不是越多越好,一般来说,添加到ready queue的速度取决与 redis 性能,而不是 bucket 数量

9. 使用中可能出现的问题

9.1 客户端出现大量的 TIME_WAIT 状态,并且新的连接被拒绝

netstat -anp | grep 9503 | wc -l
tcp        0      0 10.8.8.188:41482        10.8.8.185:9503         TIME_WAIT   -

这个是正常现象,由 tcp 四次挥手可以知道,当接收到 LAST_ACK 发出的 FIN 后会处于TIME_WAIT状态,主动关闭方(客户端)为了确保被动关闭方(服务端)收到 ACK,会等待 2MSL 时间,这个时间是为了再次发送 ACK,例如被动关闭方可能因为接收不到 ACK 而重传 FIN;另外也是为了旧数据过期,不影响到下一个链接;如果要避免大量TIME_WAIT的连接导致 tcp 被耗尽,一般方法如下:

  • 使用长连接
  • 配置文件,网上很多教程,就是让系统尽快的回收TIME_WAIT状态的连接
  • 使用连接池,当连接池耗尽时,阻塞等待,直到回收再利用

10. 相关链接

  • GMQGroup于2017年创立,经菲律宾政府批准成立,正规合法企业,是一家集区块链数字资产交易、区块链生态产业、地产开发、旅游、投资管理等全球多元化企业集团。 GMQGroup的使命与愿景在于让每一个人了解数字货币资产,热爱区块链,热爱数字货币资产,助力优质区块链数字资产的流通及价值发现,推动区块链技术在商业应用上的逐步发展,促进实体经济发展。 根据The NextWeb(TNW)3月8日发布的

  • 2017-2019短短3年,区块链行业经历了其他行业数年,甚至数十年,都难以遇到的机遇与起伏。在激烈震荡的3年发展中,区块链行业逐渐从默默无名的技术创新,发展到了获得世界认可的热门应用。 据德勤2017年底的报告显示,2017年区块链项目总数接近9万个,比2016年新增2.6万,比2013年1.5万个更是超出将近6倍。新项目为人们带来了更多的优质数字资产,也令区块链行业的市值保持持续上升。截止20

  • 区块链行业正在逐步完善,我们由衷希望GMQ平台可以满足广大用户投资需求,助力实体经济发展,如今经过GMQ技术团队精心优化已正式推出GMQ Token(简称:GMQT)全球通用积分。 GMQT基于区块链强大的技术、采用通证经济模型,是ERC20标准代币,发行总量限定为5亿,总量恒定永不增发。GMQT未来将链接优秀的创新型数字资产项目和GMQ优质的用户,把GMQT建设成GMQ生态中的重要组成部分,共建

  • 区块链具有开放、协同、共享、防篡改、可追溯等基本特征,区块链在网络中建立点对点之间可靠的信任,使得价值传递过程去除了中介的干 扰,提高价值交互的效率并降低成本,成为构建价值互联网的基石。同时也是共建信用、重构价值、重构网络生态的一股重要力量。 随着“互联网+”的深入发展,大数据、云计算、互联网以及新一代的移动 通信、人工智能等技术发挥的作用日益突出,互联网所创造和承载的价值快速增长,如何保证价值在

  • 区块链被认为是继大型机、个人电脑、互联网、移动/社交网络之后计算范式的第5次颠覆式创新,是人类信用进化史上的全新里程碑,是从信息互联网时代过渡到价值互联网时代的引擎。2008年由中本聪(Satoshi Nakamoto)第一次提出了区块链的概念,在此之前虽然区块链经过了前几代的演进,但是这个期间均属于数字资产的发展阶段。直至2009比特币的诞生,开始标志着区块链技术给数字经济时代带来了巨变的曙光,

  • 钱包行业竞争激烈,进场的人越来越多,区块链钱包是区块链的一个超级入口,功能将不断发展完善,金融属性也将日趋强化,将成为多种资产一体化管理入口、DAPP应用入口、通证使用权、收益权等功能性入口,交易和理财服务也将不断加强完善。 对于钱包来说,DApp市场入口绝对是最具想象力的前景。随着公链的成熟,一些游戏类、金融类、社交类、泛娱乐类的DApp应用逐步发展起来。 基于此,区块链钱包能提供标准完善的AP

  • 2018年是区块链技术坚实发展的一年,是区块链技术由虚务实,从抽象概念走向真正实践;由繁向简,从复杂结构走向简单应用;由少向多,从少数极客研发走向世界创新、由少数人了解走向大众普及的一年。 我们站在2019看2018,作为老牌的区块链应用,钱包的发展是显著的,区块链钱包领域也会在2019结出新硕果以及迎来新趋势! 2018年全球使用区块链钱包的人数(钱包用户)迅速上涨。据相关数据显示,2018年Q

  • 真正的区块链技术对社会发展将具有正面价值:据某资深媒体消息,中央财经大学法学院教授邓建鹏近日谈及区块链政策监管时表示,目前政府的政策很明确,要将正常的区块链技术、区块链融资同其它打着此类旗号进行非法集资等违法违规行为区别开来。近年来,虽然区块链行业夹杂着一些金融风险,但是真正的区块链技术极具创新性,对社会发展将具有正面价值。 各个国家也纷纷部署区块链,各地也在大力助推区块链行业落地,各企业也纷纷入

  • GMQ致力于为用户提供更多符合当代需求的应用 区块链技术数字资产正在迅速走红,被普遍推崇为下一代全球信用认证和价值互联网的基础协议之一,数字资产预示了一个新时代的来临,这个新时代比如今的互联网更有颠覆性。这样的颠覆性可以说是件好事,特别是当颠覆对象是银行的时候,作为一种处于衰退状态的商业模式,虽然银行业进行了各种各样的微调,但几百年来基本上毫无变化。 而搭载区块链技术的数字货币,似乎给行业带来了新

  • GMQ Wallet致力于成为您专属的保险柜 每当进行数字资产投资时,我们不能仅仅依靠交易所来存储数字资产,我们更需要有个自己的数字钱包,来存储属于我们个人的数字资产,了解“区块链数字钱包”(以下简称钱包)的相关知识更是我们投资前就应该弄清楚的。“区块链数字钱包”就是未来世界的银行卡,认识和保护它就是在守卫我们自己的财产。 钱包作为区块链产业必不可少的一环,随着区块链产业的发展和扩张也呈现并行加速

  • GMQ Group加速区块链项目快速发展,让更多享人享受项目服务 区块链被视为第四次工业革命的重要组成部分,我们都知道每次工业革命都会带来人类生产力跳跃式的进步,人类的发展史其实就是一步生产力进步的历史。 由于网络中的所有节点都可以扮演“监督者”的身份,因此不必担心造假和欺诈的问题,沟通和获得信用的成本都会越来越低。现在经济上最大的成本就来自于信任,如果降低了信任的成本,那么整个经济效益就会大幅度

  •   近期,发现有不法分子利用修图软件伪造截图方式,在网络上恶意捏造,发布虚假信息恶意诽谤及诋毁GMQ交易平台(以下简称“交易中心”)名誉,从而谋取非法利益,请广大投资者予以识别。面对网络不实信息造谣,损害交易中心利益,GMQ交易平台坚决抵制恶意诽谤诈骗,并追究其网络诽谤违法犯罪责任。 GMQ交易平台获得菲律宾CEZA经济特区政府第一张金融服务与区块链数字资产交易所牌照,是经菲律宾政府批准成立的全球

  • 区块链技术是当今非常火而实用的一项技术,关系着个人或企业的资金管理,掌管着生存的命脉。在互联时代,由区块链技术的特性保障了存储、读取、执行整个过程的透明壳跟踪、不可篡改,真正实现安全、有效、利益最大化。GMQ区块链数字资产交易所就是一家资深专业的数字资产交易平台。 GMQ交易平台是全球领先的新生代数字资产交易平台,甄选全球最优质的区块链资产,为投资者提供有效的数字资产配置。平台拥有最先进的技术,采

  • 钱包行业竞争激烈,进场的人越来越多,区块链钱包是区块链的一个超级入口,功能将不断发展完善,金融属性也将日趋强化,将成为多种资产一体化管理入口、DApp应用入口、通证使用权、收益权等功能性入口,交易和理财服务也将不断加强完善。 目前来看区块链行业才刚刚起步,未来可能也会随着市场的发展产生不同的产业格局,钱包在行业中的价值作用也在逐渐凸显出来。管理数字资产,是数字货币钱包的基本功能。现在,GMQ Co

  •   近期,我司发现社会上有些个人或团伙有组织有预谋地拉拢团伙,通过捏造事实、发布虚假信息等方式对我司进行恶意诋毁,散播谣言。其行为不仅损害了我司的商业信誉和品牌形象,还变相进行敲诈勒索,严重扰乱金融秩序,误导广大不知情的投资者。GMQ交易平台为依法成立的区块链数字资产交易平台,拥有正规合法数字资产交易所经营牌照,一直以来以“公平、公正、公开”为原则,坚持“合法、合规”运营,绝不容忍上述行为继续存在

 相关资料
  • 本文向大家介绍ios实现简易队列,包括了ios实现简易队列的使用技巧和注意事项,需要的朋友参考一下 本文实例为大家分享了ios实现简易队列的具体代码,供大家参考,具体内容如下 满足一些特殊需求 接口部分(队列支持需求) 实现方法 测试 结果 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持呐喊教程。

  • 我需要一个简单的FIFO实现的队列来存储一堆整数(我不介意它是泛型实现)。 在或Trove/Guava库中已经为我烘焙了什么?

  • 问题内容: 我有一个内存限制的队列,其中多个线程将对象排队。通常,应该由处理队列中项目的单个读取器线程清空队列。 但是,队列可能已满。在这种情况下,我想将磁盘上的所有其他项保留在磁盘上,这将由另一个后台读取器线程处理,该线程将扫描目录中的此类文件并处理这些文件中的条目。我熟悉Active MQ,但是更喜欢轻量级的解决方案。如果未严格遵循“ FIFO”,则可以(因为保留的条目可能会乱序处理)。 有没

  • 本文向大家介绍基于ZooKeeper实现队列源码,包括了基于ZooKeeper实现队列源码的使用技巧和注意事项,需要的朋友参考一下 实现原理 先进先出队列是最常用的队列,使用Zookeeper实现先进先出队列就是在特定的目录下创建PERSISTENT_EQUENTIAL节点,创建成功时Watcher通知等待的队列,队列删除序列号最小的节点用以消费。此场景下Zookeeper的znode用于消息存储

  • 本文向大家介绍基于html+css+js实现简易计算器代码实例,包括了基于html+css+js实现简易计算器代码实例的使用技巧和注意事项,需要的朋友参考一下 使用html+css+js实现简易计算器, 效果图如下: html代码如下 CSS代码如下: JS代码如下: 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持呐喊教程。

  • 本文向大家介绍python基于mysql实现的简单队列以及跨进程锁实例详解,包括了python基于mysql实现的简单队列以及跨进程锁实例详解的使用技巧和注意事项,需要的朋友参考一下 通常在我们进行多进程应用开发的过程中,不可避免的会遇到多个进程访问同一个资源(临界资源)的状况,这时候必须通过加一个全局性的锁,来实现资源的同步访问(即:同一时间里只能有一个进程访问资源)。 举个例子如下: 假设我们