脚本专栏 
首页 > 脚本专栏 > 浏览文章

基于golang的简单分布式延时队列服务的实现

(编辑:jimmy 日期: 2025/1/9 浏览:3 次 )

一、引言

背景

我们在做系统时,很多时候是处理实时的任务,请求来了马上就处理,然后立刻给用户以反馈。但有时也会遇到非实时的任务,比如确定的时间点发布重要公告。或者需要在用户做了一件事情的X分钟/Y小时后,EG:

“PM:我们需要在这个用户通话开始10分钟后给予提醒给他们发送奖励”

对其特定动作,比如通知、发券等等。一般我接触到的解决方法中在比较小的服务里都会自己维护一个backend,但是随着这种backend和server增多,这种方法很大程度和本身业务耦合在一起,所以这时需要一个延时队列服务。

名词解释

topic_list队列:每一个来的延时请求都应该又一个延时主题参考kafka,在逻辑上划分出一个队列出来每个业务分开处理;

topic_info队列:每一个队列topic都存在一个新的队列里,每次扫描topic信息检测新的topic建立与销毁管理服务协程数量;

offset:当前消费的进度;

new_offset:新消费的进度,预备更迭offset;

topic_offset_lock:分布式锁。

二、设计目标

 功能清单

1、延时信息添加接口基于http调用

2、拥有存储队列特性,可保存近3天内的队列消费数据

3、提供消费功能

4、延时通知

性能指标

预计接口的调用量:单秒单类任务数3500,多秒单类任务数1300

压测结果:

简单压测

wrk写入qps:259.3s 写入9000条记录 单线程 无并发

触发性能/准确率:单秒1000,在测试机无延长。单秒3000时,偶尔出现1-2秒延迟。受内存和cpu影响。

三、系统设计

交互流程

时序图

基于golang的简单分布式延时队列服务的实现 "color: #ff0000">五、缓存设计

目前使用全缓存模式

key设计:

topic管理list key: XX:DELAY_TOPIC_LIST type:list

topic_list key: XX:DELAY_SIMPLE_TOPIC_TASK-%s(根据topic分key) type:zset

topic_info key: XX:DELAY_REALL_TOPIC_TASK-%s(根据topic分key) type:hash

topic_offset key: XX:DELAY_TOPIC_OFFSET-%s(根据topic分key) type:string

topic_lock key: xx:DELAY_TOPIC_RELOAD_LOCK-%s(根据topic分key) type:string

六、接口设计

delay.task.addv1 (延时队列添加v1)

请求示例

curl -d 
'{
  "topic": "xxx", 								// 业务topic
  "timing_moment": ,							    // 单位秒,要定时时刻
  "content": "{}"								// 消息体,json串
}'
'http://127.0.0.1:xxxx/delay/task/add'

返回示例

{
  "dm_error": 0,
  "error_msg": "操作成功",
  "task_id":112345465765
}

pull回调方式返回(v2不再支持)

请求示例

curl -d 
'{
  "topic": "xxxx", 								// 业务topic
  "task_id":1324568798765							// taskid,选填,有则返回特定消息
}'
'http://127.0.0.1:xxxx/delay/task/pull'

返回示例

{
  "dm_error": 0,
  "error_msg": "操作成功"
  "content":"{"\xxx"\}"
}

delay.task.addv2 (延时队列添加v2)

请求示例

curl -d 
'{
  "topic": "xxx", 						// 业务topic
  "timing_moment": ,						// 单位秒,要定时时刻
  "content": "{                        // 消息内容(json string)
	"sn":"message.call",                  // 服务发现名字(或为配置服务名)
	"url":"/ev/tp/xxxx",                  // 回调url
	"xxx":"xxx"                       // 其他字段
  }"
}'
'http://127.0.0.1:xxxx/delay/task/add'

示例

curl -d '{
  "topic":"xxxx_push",
  "content":"{
    "uid":"111111",
    "sn":"other.server",
    "url":"/xxxx/callback",
    "msg_type":"gift",
  }",
  "timing_moment":1565700615
}' 
http://127.0.0.1:xxxx/delay/task/add

返回示例

{
  "dm_error": 0,
  "error_msg": "操作成功",
  "task_id":112345465765
}

七、MQ设计(v2不再支持)

关于kafka消费方式返回:

topic: delay_base_push

固定返回格式
{
  "topic": "xxxx",								// 业务topic
  "content": "{}"								// 单条生产消息content
}

八、其他设计

唯一号设计

调用存储模块,利用redis的自增结合逻辑生成唯一号具体逻辑如下:

func (c *CacheManager) OperGenTaskid() (uint64, error) {
	now := time.Now().Unix()
	key := c.getDelayTaskIdKey()
	reply, err := c.DelayRds.Do("INCR", key)
	if err != nil {
		log.Errorf("genTaskid INCR key:%s, error:%s", key, err)
		return 0, err
	}
	version := reply.(int64)
	if version == 1 {
    //默认认为1秒能创建100个任务
		c.DelayRds.Expire(key, time.Duration(100)*time.Second)
	}
	incrNum := version % 10000
	taskId := (uint64(now)*10000 + uint64(incrNum))
	log.Debugf("genTaskid INCR key:%s, taskId:%d", key, taskId)
	return taskId, nil
}

分布式锁设计

func (c *CacheManager) SetDelayTopicLock(ctx context.Context, topic string) (bool, error) {
	key := c.getDelayTopicReloadLockKey(topic)
	reply, err := c.DelayRds.Do("SET", key, "lock", "NX", "EX", 2)
	if err != nil {
		log.Errorf("SetDelayTopicLock SETNX key:%s, cal:%v, error:%s", key, "lock", err)
		return false, err
	}
	if reply == nil {
		return false, nil
	}
	log.Debugf("SetDelayTopicLock SETNXEX topic:%s lock:%d", topic, false)
	return true, nil
}

九、设计考虑

健壮性

熔断策略:

基于golang的简单分布式延时队列服务的实现 "htmlcode">

for {
  time.Sleep(time.Second)
  fmt.Println("test")
}

2、time.Tick函数:

t1:=time.Tick(3*time.Second)
for {
  select {
  case <-t1:
    fmt.Println("test")
  }
}

3、其中Tick定时任务,也可以先使用time.Ticker函数获取Ticker结构体,然后进行阻塞监听信息,这种方式可以手动选择停止定时任务,在停止任务时,减少对内存的浪费。

t:=time.NewTicker(time.Second)
for {
  select {
  case <-t.C:
    fmt.Println("test")
    t.Stop()
  }
}

在最开始以为sleep是单独处理直接停掉了这个协程,所以第一版用的也是sleep,但是在收集资料后发现这几种方式都创建了timer,并加入了定时任务处理协程。实际上这两个函数产生的timer都放入了同一个timer堆(golang时间轮),都在定时任务处理协程中等待被处理。Tick,Sleep,time.After函数都使用的timer结构体,都会被放在同一个协程中统一处理,这样看起来使用Tick,Sleep并没有什么区别。实际上是有区别的,本文不是讨论golang定时执行任务time.sleep和time.tick的优劣,以后会在后续文章进行探讨。使用channel阻塞协程完成定时任务比较灵活,可以结合select设置超时时间以及默认执行方法,而且可以设置timer的主动关闭,所以,建议使用time.Tick完成定时任务。

2、存储模块问题

目前是全缓存,没有DB参与,首先redis(codis)的高可用是个问题,在熔断之后采取“不作为”的判断也是有问题的,所以对未来展望,首先是:

1·单机的数据结构使用多时间轮。为了减少数据的路程,将load数据的过程异步加载到机器,减少网络io所造成的时间损耗。同时也是减少对redis的依赖

2·引入ZooKeeper或者添加集群备份,leader。保证集群中至少有两台机器load一个topic的数据,leader可以协调消费保证高可用

上一篇:Go语言正则表达式的使用详解
下一篇:图文详解go语言反射实现原理
一句话新闻
高通与谷歌联手!首款骁龙PC优化Chrome浏览器发布
高通和谷歌日前宣布,推出首次面向搭载骁龙的Windows PC的优化版Chrome浏览器。
在对骁龙X Elite参考设计的初步测试中,全新的Chrome浏览器在Speedometer 2.1基准测试中实现了显著的性能提升。
预计在2024年年中之前,搭载骁龙X Elite计算平台的PC将面世。该浏览器的提前问世,有助于骁龙PC问世就获得满血表现。
谷歌高级副总裁Hiroshi Lockheimer表示,此次与高通的合作将有助于确保Chrome用户在当前ARM兼容的PC上获得最佳的浏览体验。