目前在开发的内部爬虫框架中,对于从消息队列中取任务部分,如下图所示,遇到了一些问题。
因为我们整个模型为async的,底层使用的asyncio,对于消息队列的客户端来讲,可选择有的pika和kombu,对于kombu,的确是一个不错的选择,相比pika callback的写法,的确封装的更为高级,写起来比较方便,另外官方支持与eventlet工作,例如openstack的nova底层使用的就是eventlet与kombu,但是kombu目前不支持asyncio,官方将于5.0版本提供支持,我很看好,题外话。
那么目前能够选择貌似只有pika了,官方示例,奈何实力不高,只能在callback的基础上加代码,此处分成两个版本,如下所示。
推入线程池/进程池
1 |
|
异步取来的任务,将其抛给线程池/进程池处理,那么上层应用者直接在get_list
和get_data
中进行处理,那么此处就有非常大的问题了,因为一个好好的异步模型活生生的给改成了有点同步的感觉了,如果我想在业务层执行异步任务,会发现在当前线程中无法get eventloop,哈哈,好尴尬。这点实际在tornado官方文档中描述如何执行同步代码中有提示,是不是和这个很类似.
1 |
|
异步处理
上面的实在受不了,太烂了,所以此处还是要想办法给异步了,此处斜眼。
此前在看nameko中消费者处理时,他是使用eventlet.spawn方法开启一个新的协程进行处理,从而不阻塞当前loop,那么在asyncio中也一定有相应的方法,下面出场asyncio.async和asyncio.ensure_future方法,其实也是一个方法,asyncio.async将被放弃啦,所以所以介绍ensure_future方法。
1 |
|
由于使用的pika连接器是asyncio的,那么根据pika的官方文档描述,获取到的任务只有在完成的时候才会进行下发新的任务,
如果如上get_list
方法下面的使用者写的是同步代码,会导致效率非常地下,所以此处会强制提升业务代码至异步模型,貌似有点激进,所以暂时不更新.
同步处理
额,几天没有更新了,经过几天的思考,目前采用多线程/多进程模型,为什么没有使用异步呢?我觉得可以从一下几方面总结:
- 关于消息队列客户端,没有异步支持的客户端,kazoo亦是如此,但是这两者都有gevent、eventlet的支持,为什么不使用呢?因为在看openstack官方论坛以及asyncio的发展趋势,更应该顺应技术发展,如果kombu明年支持了,可以自行再重新实现一遍,整体架构会比目前的更为成熟.
- 代理隧道数量有限,其实这个不应该考虑到框架层面上的,因为这个可以通过并发数(目前通过信号量控制)控制,目前没有达到必须使用异步的地步,去加速速度或者减轻资源的消耗.
- 满足目前的整体需求,另外使用同步的话整个爬虫团队更方便和他们熟悉的软件工具配合使用,例如chrome headless、selenium、splash等等,如果使用异步的话,还要把他run_in_executor中,还要在同步处理上进一步封装,也比较麻烦.
- 真正使用时遇到了其他的问题. 因为我们list和data任务推入同一个消息队列中,导致list任务会非常多导致data任务很难被消费,以及消息堆积导致最新消息不能被及时消费。由于消息队列是FIFO模型,由于生产速度大于消费速度,导致迟迟无法看到data结果,这一方面准备改成两个队列,list和data队列,data队列优先级提高,加速获取结果.
另外由于list任务生产速度过快,如何加速处理呢,可以使用多个客户端同时处理,由于有去重,那么就会加速消费速度.
先把第一个版本做稳定了,因为目前还是有一些问题的,因为分布式爬虫框架,程序异常退出以及退出迟迟没有在server端看到客户端下线,还要定位原因以及加强处理,等稳定后再加入timer等其他功能。