多线程是个坑爹的课题,有多线程就有锁。Python中有低级线程模块_thread、再封装一下就是threading、再来一下就是from concurrent.futures import ThreadPoolExecutor。其中涉及到的锁大概有Lock、RLock、Condition、Semaphore、BoundedSeamphore、Event。这么多看起来怪吓人的,其实这么多的锁全部只是由Lock演化出来的

Lock

Lock = _thread._allocate_lock 这个是直接由最低级别的_thread直接引用过来的。创建锁的时候是处于未被锁定的状态。这应该是最容易被理解和使用的一种锁了。刚学习的时候铁定写过类似下面这种示例。本质原因是python中的原子操作是针对单条指令。而a+=1被翻译成了多条指令。执行过程中任何时刻可能被打断

1
2
3
4
5
6
7
8
9
10
11
a = 0

def change():
global a
for i in range(100000):
a += 1

t = [threading.Thread(target=change) for i in range(100)]
[i.start() for i in t]
[i.join() for i in t]
print(a)

解决方法就是所有线程共享同一个锁,虽然锁有很多种,但是几乎都是这样暴露给用户使用的,这是大家都知道的。一个锁都好理解。如果有两个锁就不是那么好理解了,比如这个有点装逼的例子。交叉打印Hello和World

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
lock_a = Lock()
lock_b = Lock()

def hello():
for i in range(5):
lock_a.acquire()
time.sleep(0.1)
print("Hello")
lock_b.release()

def world():
for i in range(5):
lock_b.acquire()
time.sleep(0.1)
print("World")
lock_a.release()

Thread(target=hello).start()
Thread(target=world).start()

RLock

可重入锁允许线程获得锁之后该线程可以无数次的再次获得锁。看起来没什么用,大概也确实没啥用吧,不过考虑一下下面这种场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Change:
def __init__(self):
self.a = 1
self.b = 1
self.lock = RLock()

def adda(self):
with self.lock:
self.a += 1

def addb(self):
with self.lock:
self.b += 1

def addab(self):
with self.lock:
self.adda()
self.addb()

你有一个类。其中adda改变a,addb改变b.还提供方法addab同时调用前两者。如果没有可重入锁那么你将不得不把两个函数粘贴到addab下面。它的实现原理是获取锁的时候根据get_ident得到当前线程标识。如果和当前锁的标识一样则仅仅是给值加1,否则就尝试去获取锁。简要代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from threading import Lock, get_ident

class RLock:

def __init__(self):
self.lock = Lock()
self._owner = None
self._count = 0

def acquire(self):
me = get_ident()
if self._owner == me:
self._count += 1
return
rc = self.lock.acquire()
if rc:
self._owner = me
self._count = 1

__enter__ = acquire

def release(self):
if self._owner != get_ident():
raise RuntimeError()
self._count -= 1
if self._count == 0:
self._owner = None
self.lock.release()

def __exit__(self, exc_type, exc_val, exc_tb):
self.release()

Condition

被称为条件变量,这个应该是很少被使用的。它允许传入一个锁,当满足条件的时候触发通知(可以看一下queue模块的实现,当get不到数据的时候就执行wait等待,新加入数据的时候执行notify通知唤醒),这应该是它被称为条件变量的原因(感觉都有点牵强😕),和Lock以及RLock不同。这个东东主要是用来通知其他线程的。大概有两个方法wait(阻塞直到被通知),notify(唤醒wait)。
实现的思路是所有的线程共享同一个锁A,同时有一个共用的锁列表。每当调用wait的时候生成一个新的锁放入锁列表,然后获得该锁,再释放锁A(因为wait必须在获得锁A才能调用,如果不释放则其他线程无法获得锁A)。最绝的来了,再次获得新生成的锁造成死锁。由此该线程被挂起。直到其他获得锁A的线程执行notify操作。将wait的锁释放。
实现代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from collections import deque

class Condition:

def __init__(self):
self._lock = RLock()
self._waiters = deque()

def __enter__(self):
return self._lock.__enter__()

def __exit__(self, *args):
return self._lock.__exit__(*args)

def wait(self):
assert get_ident() == self._lock._owner
waiter = Lock()
self._waiters.append(waiter)

waiter.acquire()
self._lock.release()
waiter.acquire()
self._lock.acquire()
del waiter

def notify(self):
assert get_ident() == self._lock._owner
for waiter in self._waiters.copy():
waiter.release()
self._waiters.remove(waiter)

这个东东吧其实不太适合被直接使用。但是网上还是有人硬生生凑了一个消费者生产者的例子出来做演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import threading
import random
import time

class Producer(threading.Thread):
def __init__(self, integers, condition):
super(Producer, self).__init__()
self.integers = integers
self.condition = condition

def run(self):
for i in range(10):
integer = random.randint(0, 256)
with self.condition:
self.integers.append(integer)
self.condition.notify()
time.sleep(0.5)

class Consumer(threading.Thread):

def __init__(self, integers, condition):
super(Consumer, self).__init__()
self.integers = integers
self.condition = condition

def run(self):
with self.condition:
while True:
while self.integers:
integer = self.integers.pop()
print(integer)
self.condition.wait()

integers = []
condition = Condition()
Producer(integers, condition).start()
Consumer(integers, condition).start()

这个例子有个地方需要注意。notify和wait并不是一一对应的。不代表执行了notify,wait部分一定会被执行。所以如果消费者部分这样写会造成部分没有被处理

1
2
3
4
5
6
7
8
9
10
11
12
13
class Consumer(threading.Thread):

def __init__(self, integers, condition):
super(Consumer, self).__init__()
self.integers = integers
self.condition = condition

def run(self):
with self.condition:
while True:
self.condition.wait()
integer = self.integers.pop()
print(integer)

Semaphore & BoundedSeamphore

上面先介绍Condition是有原因的,因为多线程信号量和事件都是基于它生成的。信号量常用于限制对有限资源的访问。比如你有1000个线程,如果每一个线程都去创建数据库连接,那么数据库可能会崩。或者爬虫创建对网站的连接,太过凶残并不好,这个时候用信号量来处理就不错。

这里如果直接使用Lock来构建。那么一个锁同时只允许一个线程来获得显然是不达不到要求的。祭出刚才构建的Condition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Semaphore:
# value值用完了就被调用wait.只有notify能解除
def __init__(self, value=1):
self._lock = Condition()
self.value = value

def __enter__(self):
with self._lock:
if self.value == 0:
self._lock.wait()
else:
self.value -= 1

def __exit__(self, exc_type, exc_val, exc_tb):
self.value += 1
self._lock.notify()


class BoundedSemaphore(Semaphore):

def __init__(self, value=1):
super(BoundedSemaphore, self).__init__(value=value)

Event

Event和Condition差不多。区别就是上面提到的,使用Condition,如果在单线程,那么当notify的时候是无法触发wait的,再调用wait线程会被阻塞。但是Event就支持在单线程使用。提供两个主要方法,set和wait。但是它并不需要先获得锁。使用好像更方便一点,实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Event:
def __init__(self):
self._cond = Condition()
self._flag = False

def set(self):
with self._cond:
self._flag = True
self._cond.notify()

def clear(self):
with self._cond:
self._flag = False

def wait(self):
with self._cond:
signaled = self._flag
if not signaled:
signaled = self._cond.wait()
return signaled

如果把刚才那个生产者消费者换到它上面来,大概是这个样子了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import threading
import random
import time

class Producer(threading.Thread):
def __init__(self, integers, condition):
super(Producer, self).__init__()
self.integers = integers
self.condition = condition

def run(self):
for i in range(10):
integer = random.randint(0, 256)
self.integers.append(integer)
self.condition.set()
time.sleep(0.5)

class Consumer(threading.Thread):

def __init__(self, integers, condition):
super(Consumer, self).__init__()
self.integers = integers
self.condition = condition

def run(self):
while True:
while self.integers:
integer = self.integers.pop()
print(integer)
self.condition.wait()

integers = []
condition = Event()
Producer(integers, condition).start()
Consumer(integers, condition).start()

当然,没啥必要用这些比较低级的东东。能用通用高级数据结构就直接用,这样大家都比较好理解,总结就是只是给共享资源加锁用Lock、限制资源访问使用BoundedSeamphore,唤醒相关线程使用Event