票价40元,需要单独拉出来单列一行.
之前一直很好奇像sanic或者aiohttp是如何实现并发的,其实从网上看无非就是使用了asyncio,更底层就是事件驱动,libevent之类的,但是纯从应用层来讲,这个会是怎么一回事呢,举个例子:
1 |
|
从上面代码来看,整个执行周期为3s,但是有个问题是:如果请求一个一个来,如何不阻塞呢??
1 |
|
如果按照例如eventlet的做法:
1 | procpool = GreenPool(size=poolsize) |
消息过来,直接抛给pool,并不会进行阻塞,并且有poolsize控制池大小,
额,这里是sanic源码中的一部分…
1 | # -*- coding: utf-8 -*- |
目前在开发的内部爬虫框架中,对于从消息队列中取任务部分,如下图所示,遇到了一些问题。
因为我们整个模型为async的,底层使用的asyncio,对于消息队列的客户端来讲,可选择有的pika和kombu,对于kombu,的确是一个不错的选择,相比pika callback的写法,的确封装的更为高级,写起来比较方便,另外官方支持与eventlet工作,例如openstack的nova底层使用的就是eventlet与kombu,但是kombu目前不支持asyncio,官方将于5.0版本提供支持,我很看好,题外话。
那么目前能够选择貌似只有pika了,官方示例,奈何实力不高,只能在callback的基础上加代码,此处分成两个版本,如下所示。
1 |
|
异步取来的任务,将其抛给线程池/进程池处理,那么上层应用者直接在get_list
和get_data
中进行处理,那么此处就有非常大的问题了,因为一个好好的异步模型活生生的给改成了有点同步的感觉了,如果我想在业务层执行异步任务,会发现在当前线程中无法get eventloop,哈哈,好尴尬。这点实际在tornado官方文档中描述如何执行同步代码中有提示,是不是和这个很类似.
1 |
|
上面的实在受不了,太烂了,所以此处还是要想办法给异步了,此处斜眼。
此前在看nameko中消费者处理时,他是使用eventlet.spawn方法开启一个新的协程进行处理,从而不阻塞当前loop,那么在asyncio中也一定有相应的方法,下面出场asyncio.async和asyncio.ensure_future方法,其实也是一个方法,asyncio.async将被放弃啦,所以所以介绍ensure_future方法。
1 |
|
由于使用的pika连接器是asyncio的,那么根据pika的官方文档描述,获取到的任务只有在完成的时候才会进行下发新的任务,
如果如上get_list
方法下面的使用者写的是同步代码,会导致效率非常地下,所以此处会强制提升业务代码至异步模型,貌似有点激进,所以暂时不更新.
额,几天没有更新了,经过几天的思考,目前采用多线程/多进程模型,为什么没有使用异步呢?我觉得可以从一下几方面总结:
先把第一个版本做稳定了,因为目前还是有一些问题的,因为分布式爬虫框架,程序异常退出以及退出迟迟没有在server端看到客户端下线,还要定位原因以及加强处理,等稳定后再加入timer等其他功能。
1 |
|
1 |
|
但是今天将介绍另外两种不同的写法,而这两种写法也在库中比较常见
1 | class App(object): |
为什么叫flask-like呢?因为我也没有想到好的名字来叫,另外这种写法也是在flask中见,因为他没有包装func(*args, **kwargs)这层,而是仅给func传递参数.
1 |
|
今天在同事的推荐下,看了下扇贝的sea代码,
没细看,突然看到了cached_property
的代码,这个让我突然想到了我们内部爬虫框架的cached_property
,当时我在写这部分代码的时候主要目的有点类似如下:
1 | class A(object): |
A
为一个类,然后B
有点类似A
的一个超集,平常使用的使用为实例化B
, 然后为了操作A
下面的方法,基本流程为:
1 | >>> b = B() |
当时设计的时候,并没有想到通过property这个将方法变成属性的方法,当时想我只需要初始化两次就行,如下:
1 |
|
但是后来我就意识到我这种设计有点low,因为其他同事调用的时候可能不会这么使用,另外因为B
下面还会有其他类似A
这种东西,例如:
1 | class B(object): |
难道每次使用的时候都拿到c这个实例吗,有点傻,所以参考了Flask
的cached_property
实现,改成了如上了流程.那么进入主题,cached_property
到底干了什么事情?
先看Flask
的实现:
1 |
|
再看sea
的实现:
1 |
|
唯一不同点应该就在于加了一个锁吧,那么抛去锁的部分,单纯讲cached_propery
的实现
1 |
|
学过Python的应该蛮清楚关于装饰器这个概念,当将cached_property加在b上时,就已经完成了cached_property类的实例化(看最后一个类版本的计算时间装饰器),那怎么传进去的呢?
1 |
|
调用的时候怎么个过程?看Python Document
1 | class Property(object): |
1 | def __get__(self, instance, cls=None): |
1 | class B(object): |
剩下自行理解吧..
1 | # 类版本的计算时间的装饰器 |
这里说的 AdjacencyList , 就是最常用来在关系数据库中表示树结构的,parent
方式:
id | name | parent |
---|---|---|
1 | 一 | null |
2 | 二 | 1 |
3 | 三 | 2 |
上面的数据, 表示的结构就是:
一
|- 二
|- 三
模型定义很好做:
1 | # -*- coding: utf-8 -*- |
这里不让parent
字段有null
, 而使用0
代替.
这个例子在关系上, 有一个纠结的地方, 因为 node
这个表, 它是自关联的, 所以如果想要children
和 parent_obj
这两个关系时:
1 | children = relationship('Node') |
呃, 尴尬了.
如果是两个表, 那么 SQLAlchemy 可以通过外键在哪张表这个信息, 来确定关系的方向:
1 | class Blog(BaseModel): |
因为外键在 Blog
中, 所以 Blog -> User
的 user_obj
是一个 N -> 1
关系.
反之, User -> Blog
的 blog_list
则是一个 1 -> N
的关系.
而自相关的 Node
无法直接判断方向, 所以 SQLAlchemy
会按 1 -> N
处理, 那么:
1 | children = relationship('Node') |
这两条之中, children
是正确的, 是我们想要的. 要定义 parent_obj
则需要在 relationship
中通过参数明确表示方向:
1 | parent_obj = relationship('Node', remote_side=[id]) |
这种方式就定义了一个, “到 id” 的 N -> 1
关系.
现在完整的模型定义是:
1 | class Node(BaseModel): |
查询方面没什么特殊的了, 不过我发现在自相关的模型关系, lazy
选项不起作用:
1 | children = relationship('Node', lazy="joined") |
都是无效的, 只有在查询时, 手动使用 options()
定义:
1 | n = session.query(Node).filter(Node.name==u'一')\ |
如果要一次查出多级的子节点:
1 | n = session.query(Node).filter(Node.name==u'一')\ |
多个 joinedload()
串连的话, 可以使用 joinedload_all()
来整合:
1 | from sqlalchemy.orm import joinedload_all |
在修改方面, 删除的话, 配置了 cascade
, 删除父节点, 则子节点也会自动删除:
1 | children = relationship('Node', lazy='joined', cascade='all') # 1 -> N |
如果只删除子节点, 那么 delete-orphan
选项就很好用了:
1 | children = relationship('Node', lazy='joined', cascade='all, delete-orphan') # 1 -> N |
每次使用mock都记不住,今天也是这样,但是这里出现了不一样的地方,先举个例子:
1 | from mock import Mock, patch |
上述看也没什么问题,但是此处讲一个Where to patch
的问题.
简单描述:
1 | a.py |
如上,a.py定义了一个SomeClass,然后b.py引入这个Class,然后在某个地方进行实例化此类,ok,如果要patch SomeClass这个类,要从a.py进行patch呢还是从b.py进行patch呢?
例如:
1 |
|
1 |
|
上述哪一种方法才能被正常mock??
引用外文的描述:
1 | Now we want to test some_function but we want to mock out SomeClass using patch. The problem is that when we import module b, which we will have to do then it imports SomeClass from module a. If we use patch to mock out a.SomeClass then it will have no effect on our test; module b already has a reference to the real SomeClass and it looks like our patching had no effect. |
翻译成中文(翻译不好-_-!)
1 |
|
- mock class staticmethod
1 |
|
- 如何mock一个异常
1 | with patch( |
- mock class method
1 |
|
创建一个m位的位数组(bitmap),先将所有的位数组初始化为0。然后选择k个不同的哈希函数。第i个哈希函数对应的字符串str哈希的结果记为h(i, str),且h(i, str)的范围要在0至m-1。如下图所示。
如何判断字符串是否存在呢?
字符串也经过h(i, str),h(2, str),h(3, str)…哈希映射,检查每一个映射到m位的位数组上是否为1,如果不全为1,则表示一定不存在,否则,不能说明完全存在,有误差率在里面,为什么不能说一定存在呢,没看懂,可看:BloomFilters
既然不能完全表示存在,那么如何计算这个误差率呢?一共有三个参数:k,m,n。
参数 | 表示 |
---|---|
k | 哈希个数 |
m | 位数组大小 |
n | 字符串个数 |
下图表示m/n的结果与k个哈希函数的选择出现的错误率表格。
如上,举例简单说明,如果声明一个为数组大小为2,只传入一个字符串,那么m/n为2,如果选择k个哈希,导致的容错率分别是1.39,0.393,0.400。ok,如果要存1亿个字符串,那么大概为多少呢?
简单计算:如果容错率要求为5.73e-06,那么m/n=32,如果n为1亿的话,那么m为32亿,32亿 / 8/ 1024/1024=381.4697265625MB内存。
还是相当可以的。关于更多的容错率,可以看BloomFilters。
Python实现:python-bloomfilter
C实现:pybloomfiltermmap
混合属性, 官方文档中称之为Hybrid Attributes
. 这种机制表现为, 一个属性, 在 类 和层面, 和实例 的层面, 其行为是不同的. 之所以需要关注这部分的差异, 原因源于 Python 上下文和 SQL 上下文的差异.
类 层面经常是作为 SQL 查询时的一部分, 它面向的是 SQL 上下文. 而 实例 是已经得到或者创建的结果, 它面向的是 Python 上下文.
定义模型的 Column() 就是一个典型的混合属性. 作为实例属性时, 是具体的对象值访问, 而作为类属性时, 则有构成 SQL 语句表达式的功能.
1 | class Interval(BaseModel): |
实例行为:
1 | ins = session.query(Interval).first() |
类行为:
1 | ins = session.query(Interval).filter(Interval.end - Interval.start > 10).first() |
这种机制其实一直在被使用, 但是可能大家都没有留意一个属性在类和实例上的区别.
如果属性需要被进一步封装, 那么就需要明确声明Hybrid Attributes
了:
1 | from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method |
setter
的定义同样使用对应的装饰器即可:
1 | class Interval(BaseModel): |
前面说的属性, 在类和实例上有不同行为, 可以看到, 在类上的行为, 其实就是生成 SQL 表达式时的行为. 上面的例子只是简单的运算, SQLAlchemy 可以自动处理好 Python 函数和 SQL 函数的区别. 但是如果是一些特性更强的 SQL 函数, 就需要手动指定了. 于时, 这时的情况变成, 实例行为是 Python 范畴的调用行为, 而类行为则是生成SQL 函数的相关表达式.
同时是前面的例子, 对于 length 的定义, 更严格上来说, 应该是取绝对值的.
1 | class Interval(BaseModel): |
但是, 如果使用了 Python 的abs()
函数, 在生成 SQL 表达式时显示有无法处理了. 所以, 需要手动定义:
1 | from sqlalchemy import func |
这样查询时就可以直接使用:
1 | ins = session.query(Interval).filter(Interval.length > 1).first() |
对应的 SQL :
1 | SELECT * |
总体上没有特别之处:
1 | class Account(BaseModel): |
查询时:
1 | user = session.query(User).first() |
这里涉及的东西都是 Python 自己的, 包括那个sum()
函数, 和SQL
没有关系.
如果想实现的是, 使用SQL
的sum()
函数, 取出指定用户的总账户金额数, 那么就要考虑把balance 作成表达式的形式:
1 | from sqlalchemy import select |
这样的话,User.balance
只是单纯的一个表达式了, 查询时指定字段:
1 | user = session.query(User, User.balance).first() |
注意, 如果写成:
1 | session.query(User.balance).first() |
意义就不再是”获取第一个用户的总金额”, 而变成”获取总金额的第一个”. 这里很坑吧.
像上面这样改, 实例层面就无法使用 balance 属性. 所以, 还是先前介绍的, 表达式可以单独处理:
1 | @hybrid_property |
定义了表达式的 balance , 这部分作为查询条件上当然也是可以的:
1 | user = session.query(User).filter(User.balance > 1).first() |
各种激烈的讨论都会经常的提到Python的多线程工作是多么的困难,指向被称为全局解释器锁阻碍了Python代码不能多线程同时执行。由于这个,如果你不是一个Python开发者以及你来自其他语言例如C++活着Java,线程模块表现出来的行为可能不是你所期望的那样。但是必须指出的是你仍可以写出并发或者并行的代码并使其表现出色,只要考虑全面。如果你还没有读到,我建议你看一下 Eqbal Quran’s 在Toptal博客上的article on concurrency and parallelism in Ruby。
在Python并发教程中,我们将会写出一个简单的Python脚本去下载来自Imgur排行靠前的图片,我们将从一个顺序下载的版本开始,或一次一个,作为先决条件,你将不得不注册一个应用在Imgur,如果你还没有创建一个Imgur账户,请先创建一个。
这个教程的脚本已经在Python3.4.2上测试通过,稍微作修改,就可以运行在Python2,urllib改动最大在这两个Python版本上。
让我们开始创建一个Python模块,并将其命名成”download.py”, 这个文件将会包含所有需要的函数去获取图片列表以及下载它们。我们将会分开这些函数为三个分离的函数。
第三个函数, “setup_download_dir”, 将会被用于创建下载保存目录如果不存在。
Imgur’s ApI要求HTTP请求头上携带客户端ID的”Authorization”标头,你可以从已经注册的Imgur的应用面板上查看客户端ID,请求返回结果会是JSON格式。我们可以使用Python的标准JSON库去解码。下载这些图片是一个非常简单的任务,只要我们获取到这些图片的URL以及将它们写入文件。
这个脚本看起来是这样的:
1 | import json |
接下来,我们将会写一个模块并使用这些函数去下载这些图片,一个接着一个,我们将会将其命名成”simple.py”,这个首先将会包含主函数,最初的Imgur图片下载器。这个模块将会从环境变量”IMGUR_CLIENT_ID”中接收Imgur的客户端ID,它将会在”setup_download_dir”中被调用去创建下载目标目录,最终,它会获取一个图片列表使用”get_links”函数,过滤所有的GIF和唱片URLs,然后使用”download_link”区下载以及保存每一张图片到硬盘中,这个”single.py”看起来如下:
1 | import logging |
在我的笔记本上,这个脚本下载91张图片使用了19.4秒,请注意这些数字将因你的网络环境而异。但是我们我们想要下载更多的图片呢?或者900张,而不是90张,每一张图片平均使用0.2秒,900张图片将会使用大约3分钟,那么9000张图片将会使用掉30分钟。好消息是通过并发或并行介绍,我们可以显著的加速。
所有后续的代码示例将只会导入新的和特定的导入语句,为了方便,这些Python脚本可以在这个GitHub仓库找到。
线程时最为熟知的方法实现Python并发与并行。线程通常由操作系统提供的功能。线程比进程更为轻量级,并且共用同一内存空间。
在我们的Python线程教程中,我们将会写一个新的模块代替”single.py”,这个模块将会创建一个8个线程的池,总共包含主线程一共9个线程。我选择8个线程,因为我的电脑有8个CPU以及每个工作线程运行在每个CPU核心上同时运行看起来会是一个好的选择,在练习上,这个数字是基于其他因素选择得应更为仔细,例如其他应用以及服务运行在同一台机器上。
这个和之前的一个非常相似,除了我们拥有了一个新类,DownloadWorker, 是Thread类下的一个子类,运行方法被重写了。它运行一个无限循环,在每一次的迭代上,它会调用”self.queue.get”去试着从线程安全的队列中获取一个URL,它会阻塞直到队列中有其他worker处理。一旦worker从队列中接收到一条,它接下来将会调用和之前脚本相同的”download_link”方法去下载图片以及保存到图片目录。在下载完成后,worker通知队列这个任务已经完成,这是非常重要的,因为队列跟踪有多少任务入队。然后调用”queue.join()”将会阻塞主线程如果workers没有通知它们已经完成了任务。
1 | from queue import Queue |
运行这个脚本在同一台机器上会更快的获取结果,只用了4.1秒!和之前的相比快了4.7倍。虽然这要快很多,但是值得提及的是由于GIL的原因同一时刻只有一个线程被执行,因此,这个代码是并发的但是不是并行的,相比之前快的原因在于这是一个IO绑定任务。处理器在下载这些图片时并不需要太耗费力气。大部分时间使用在等待网络上。这也是为什么线程可以提供大幅度的速度提升。处理器可以切换上下文在这些线程无论其中的哪一个准备去做一些任务。在Python或者任务其他带有GIL解释性语言使用线程模块实际上可能导致性能下降,如果你的代码是CPU绑定任务,例如解压文件,使用多线程模块将会使结果更慢,对于CPU绑定的任务来讲和真正的并行执行,我们将会使用multiprocessing模块。
事实上参考Python的实现,CPython,拥有GIL,不是所有的Python解释器都是这样。例如,IronPython,一个是使用.NET 框架实现的Python解释器,并不会有GIL,诸如Jython,基于Java实现的,你可以在这找到一系列Python解释器。
多进程模块比多线程模块更容易使用,因为我们不需要添加类似于线程示例的类。唯一改变的是主函数。
使用多进程我们可以创建一个进程池,使用提供的map方法,我们可以将列表中的URLs放入池中。将会产生8个新进程并使用每个进程并行下载这些图片,这是真正的并行。但它也带来的成本。脚本的整个内存被复制到各个子进程中。在这个简单的例子中,这不是什么大不了的事情,但是它可能很容易产生严重开销。
1 | from functools import partial |
虽然线程和进程脚本都非常适合运行在个人电脑上,如果你想运行在不同的机器上你应该怎么做呢?或者向上扩展增加更多的CPU。 对于长期运行的web项目来讲会是一个很好的用例。如果你有长时间运行任务,你不想在同一台机器上启动一堆需要运行的应用程序在多进程或多线程上。这将会降低你的应用性能对所有的你的用户,最好能够在另外一台或多台上运行这些任务。
对于这种任务一个出色的Python库是RQ,一个非常简单但是又非常功能强大的库,你首先需要确定函数和它的参数,pickles将会被调用,并将其加入到Redis列表中,声明工作排队是第一步,但是他并不会做任何事情,我们至少需要一个worker去监听这个工作队列。
第一步我们需要安装和运行Redis服务在你的个人电脑上,或者拥有访问Redis服务的权限,然后,只会在原有的代码基础上稍作改动,我们首先创建一个RQ队列实例并且连接到Redis服务上通过redis-py library,然后,代替刚才调用的”download_link”方法,我们使用“q.enqueue(download_link, download_dir, link)”。这个队列将函数作为它的第一个参数,然后其他参数将会带入到这个函数直到这个任务被执行。
最后一步我们需要启动这些worker,RQ提供了一系列的脚本去运行这些workers在默认的队列上,只需要在终端是那个执行”rqworker”然后它会启动一个worker监听这个默认的队列,请确保你目前的工作目录和这些脚本是在同一目录中,如果你想监听不同的队列,你可以运行 “rqworker queue_name”然后它就会监听倍命名的队列。关于RQ最出色的地方在于只要你连接到Redis,你可以运行任意多个worker在不同的机器上。这是非常容易扩展的。这个是RQ的版本:
1 | from redis import Redis |
然而,RQ不是唯一的Python工作队列解决方式,RQ是简单使用并且非常好的覆盖简单用例,如果需要其他高级的特性,其他工作队列,如Celery可以被使用。
如果你的代码是IO绑定的,multiprocessing和multithreading都适合你,多进程比多线程更容易使用,但是会占用更多的内存,如果你的代码是CPU绑定的,那么multiprocessing可能是更优的选择。尤其目标机器拥有多个CPU,对于web应用,如果想要扩展worker,RQ会是更好的选择。
此片文章翻译自:beginners-guide-to-concurrency-and-parallelism-in-python.
缺失模块。
1、请确保node版本大于6.2
2、在博客根目录(注意不是yilia根目录)执行以下命令:
npm i hexo-generator-json-content --save
3、在根目录_config.yml里添加配置:
jsonContent: meta: false pages: false posts: title: true date: true path: true text: false raw: false content: false slug: false updated: false comments: false link: false permalink: false excerpt: false categories: false tags: true