分享

用redis实现有优先级的"celery"

Oner 发表于 2016-2-24 11:37:59 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 11312
本帖最后由 Oner 于 2016-2-24 11:41 编辑
问题导读:
1.  celery可以处理异步任务,为什么还需要redis?
2.  redis优先队列是怎么回事?
3.  怎么实现任务动态切换?
4.  怎么实现多进程化?






1. 需求背景
对于异步任务处理,相信很多人首选celery,不了解celery的,可以查看celery简介。的确,celery处理异步任务非常强悍,使用简单,支持各种并发。但是,大家来看看我所遇到的一个应用场景:每次后台上传一个游戏母包,然后对这个母包处理(添加某种标识,比如id)生成多个游戏子包,其中有一些id号的包是要求尽快的处理的,剩下的可以闲时处理。这里就对要把一个母包分成两个任务来处理,其中一个是优先处理的,另一个是闲时处理。

2. 方案初探
对于上面的场景,最先想到的方案是,把每个母包处理任务分成优先和闲时两个celery任务队列分别处理,分别单独配给cpu资源(土豪的话给多一台机器也行)专门处理。大家估计也想到这种做法的弊端了,这样无法有效使用资源,当优先任务队列没有任务时,闲时任务队列却满载,显然这种设计方案不是很好。

那么有没有更好的处理方案呢?试想如果任务可以按优先级别在队列中排队就好了。显然celery并没有提供优先队列这种机制,那么我们只能自己实现一个celery一样的异步事件队列,并且支持优先级的队列。这时候显然想到的是redis,点击这里查看redis命令参考

3. redis优先队列
redis中提供了BLPOP,RPUSH(RLPOP,LPSUH)这些队列操作。
来看看BLOOP的介绍:
BLPOP key [key …] timeout
BLPOP 是列表的阻塞式(blocking)弹出原语。
它是 LPOP 命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BLPOP命令阻塞,直到等待超时,或有另一个客户端对给定 key 的任意一个执行 LPUSH 或 RPUSH 命令为止。
当给定多个 key 参数时,按参数 key的先后顺序依次检查各个列表,弹出第一个非空列表的头元素(这是就是实现优先级的关键)。
那么我们可以设置两个key,一个表示优先任务的key,姑且叫priority_task,另一个闲时任务的key,就叫normal_task。在添加任务时,把对应任务所要必备参数添加的对应的key值队列即可。具体如下:
[mw_shl_code=java,true]priority_task = {  # 优先任务
    'id_list': [1, 2, 3],  # 对应要生成子包id列表
    'root_package_id': 10086  # 母包数据表索引id
}
redis.rpush('priority_task', json.dumps(priority_task))

normal_task = {  # 普通任务
    'id_list': [5, 6, 7, 8],  # 对应要生成子包id列表
    'root_package_id': 10086  # 母包数据表索引id
}
redis.rpush('normal_task', json.dumps(normal_task))[/mw_shl_code]
成功入队后,接下来就是不断从队列中取出任务,然后对应处理,大概代码如下:
[mw_shl_code=java,true]while 1:
    # 监听任务,没有打包任务则阻塞
    key, task = redis.blpop(['priority_task', 'normal_task'])
    deal_task(key, json.loads(task))[/mw_shl_code]

4. 任务动态切换
上面实现保证了每次从队列取出的任务都是优先级别最高的,但是存在着问题,比如当前正在处理闲时任务,可是这个闲时任务可能要处理200+个包,这时候队列中又来了一个优先任务,那么这个优先任务必须等待之前的闲时任务处理完成才能开始处理,这显然不是我们想要的,那么我们能挂起当前正在处理的闲时任务,先去处理优先任务吗。显然是可以的,就是一个最简单的协程:函数调用。只需要在闲时任务处理完每个子包后,检查优先任务队列是否有元素,有则调用函数先处理优先任务,等优先任务完成后,再继续处理闲时任务。

处理函数大概如下:
[mw_shl_code=java,true]def deal_task(key, task):  # 任务处理函数
    id_list = task['id_list']  # 要生成的子包id
    for id in id_list:
        do something....   # 生成对应的id子包
        if key == 'normal_key':  # 如果当前是闲时任务
            while redis.llen('priority_key') > 0:  # 检查是否有优先任务,有则获取并执行
                priority_task = redis.lpop('priority_key')
                if priority_task:
                    deal_task('priority_key', json.loads(priority_task))  # 执行优先任务处理[/mw_shl_code]
以上就是实现一个单进程处理异步优先任务队列的全过程。

5. 多进程化
上面实现都是单进程处理的,为了提升处理效率,我们可以开多个进程提升并发量,这里建议使用supervisor来管理你的这些进程。这里需要注意:
  • 多进程处理临界资源,如果没有相关临界资源的竞争那最好,如果有,那么你必须考虑怎么处理,一般是用队列顺序化。
  • supervisor持久化进程数据库链接,会导致数据库虽然已经断开连接,但是进程并不知晓,当进程再次执行数据库查询时就会出错,mysql一般会报一个gone away的错误。
    注:还可以用进程池异步处理。

6. 最后
以上是本人的处理方案,如果有更好的建议记得留下宝贵的意见(>▽<)。

原文连接:点击链接 作者:XYM

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条