当前位置: 首页 > 工具软件 > Pika > 使用案例 >

[1005]pika 线程不安全

瞿文柏
2023-12-01

先说结论:Pika is not thread safe. Use a BlockingConnection per-thread。

即 Pika 并不是线程安全的,应该在每个线程里,都使用各种的 BlockingConnection

相关 issue:https://github.com/pika/pika/issues/1237

示例一:线程外创建 connection,线程里创建 channel

来源:https://github.com/pika/pika/issues/1237

# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 18:24
# @Author : wu

import pika
import threading

connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))


def loop1():
    channel = connection.channel()
    channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
    print(1)


def loop2():
    channel = connection.channel()
    channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
    print(2)


threading.Thread(target=loop1, name='LoopThread').start()
threading.Thread(target=loop2, name='LoopThread').start()

输出

1
Exception in thread LoopThread:
Traceback (most recent call last):
  File "/Users/wu/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/Users/wu/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/wu/Work/youmi/projects/ym-crawler-ccs/dataparser/test12.py", line 19, in loop2
    channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
  File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 2248, in basic_publish
    self._flush_output()
  File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 1336, in _flush_output
    self._connection._flush_output(lambda: self.is_closed, *waiters)
  File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
    raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: IndexError('pop from an empty deque')

示例二:线程外创建 connection 和 channel

# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 18:24
# @Author : wu

import pika
import threading

connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))
channel = connection.channel()


def loop1():
    channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
    print(1)


def loop2():
    channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
    print(2)


threading.Thread(target=loop1, name='LoopThread').start()
threading.Thread(target=loop2, name='LoopThread').start()

输出

Exception in thread LoopThread:
Traceback (most recent call last):
  File "/Users/wu/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/Users/wu/.pyenv/versions/3.7.5/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/wu/Work/youmi/projects/ym-crawler-ccs/dataparser/test12.py", line 13, in loop1
    channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
  File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 2248, in basic_publish
    self._flush_output()
  File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 1336, in _flush_output
    self._connection._flush_output(lambda: self.is_closed, *waiters)
  File "/Users/wu/.pyenv/versions/ym-crawler-ccs/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
    raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: AssertionError(('_AsyncTransportBase._produce() tx buffer size underflow', -24, 1))

示例三:线程里创建 connection 和 channel

# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 18:24
# @Author : wu

import pika
import threading


def loop1():
    connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))
    channel = connection.channel()

    channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
    print(1)


def loop2():
    connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))
    channel = connection.channel()

    channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
    print(2)


threading.Thread(target=loop1, name='LoopThread').start()
threading.Thread(target=loop2, name='LoopThread').start()

输出

2
1

可见,在线程里创建 connection 和 channel 是正常的,且线程执行顺序不一定。但是有个问题,我们不可能在每个线程里都创建一次 connection 和 channel ,这样其实是会浪费 cpu 的

示例四:线程加锁

# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 18:24
# @Author : wu

import pika
import threading
import time

lock = threading.Lock()

connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))
channel = connection.channel()


def loop1():
    with lock:
        channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
        print(1)
        time.sleep(2)


def loop2():
    with lock:
        channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
        print(2)


threading.Thread(target=loop1, name='LoopThread').start()
threading.Thread(target=loop2, name='LoopThread').start()

输出

1
2

加锁之后,代码也是正常执行的,但是有个问题,加锁后,也起不到线程的所用了,且很可以明显看到, loop2 一定是在 loop1 之后执行的,且会受到 loop1 的阻塞影响。因为用了锁之后,loop1 函数的 with lock 下的全部逻辑可以看成是一个原子,整个原子被锁住了。与 python 多线程的 GIL 不一样,python 的 GIL 可能是在某个地方锁住的,这里的是在哪块逻辑下加锁,哪块就会被锁。

很明显,这种加锁起不到多线程的作用,也不是我们要的。

示例五:使用线程局部变量

关于线程局部变量:使用 threading 模块中的 local() 函数,可以为各个线程创建完全属于它们自己的变量(又称线程局部变量)。正是由于各个线程操作的是属于自己的变量,该资源属于各个线程的私有资源,因此可以从根本上杜绝发生数据同步问题。

使用线程池 + 线程局部变量

# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 15:37
# @Author : wu
import pika
import threading
from concurrent.futures import ThreadPoolExecutor

local = threading.local()


def init():
    c = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))
    channel = c.channel()
    return channel


def loop1(n):
    if not hasattr(local, 'channel'):
        channel = init()
        thread_id = threading.currentThread().ident
        print(f'线程:{thread_id} 创建 channel')
        local.channel = channel
    local.channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
    print(n, end='\n')


with ThreadPoolExecutor(max_workers=5) as t:
    for i in range(10):
        t.submit(loop1, i)

输出

线程:123145590636544 创建 channel
2
5
6
7
8
9
线程:123145580126208 创建 channel
线程:123145574871040 创建 channel
1
0
线程:123145611657216 创建 channel
线程:123145601146880 创建 channel
4
3

可以看到,由于使用了线程池,且设置 max_workers=5,所以最多会有5个线程。而每个线程因为都有自己的局部变量锁,互不影响,因此需要分别创建5个 pika 的 channel 通道,但这样带来的好处就是,channel 的创建次数只会跟 max_workers 一致,因为在线程池中,一个线程执行任务后,会继续执行其他任务,还是同一个线程,而局部变量中已经存储了 channel这个值,因此可重复使用。

更重要的是,由于每个线程,都是自己创建的channel,互补影响,因此是安全的。这就达到了我们,既想线程安全,又不想每次都频繁创建 connection 和 channel,我们需要做的只是,控制好线程池的数量即可。

示例六: 使用 deferToThread

# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# @Time   : 2020/11/5 15:37
# @Author : wu
import pika
import threading

from twisted.internet import reactor
from twisted.internet.threads import deferToThread

local = threading.local()


def init():
    c = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672'))
    channel = c.channel()
    return channel


def loop1(n):
    thread_id = threading.currentThread().ident
    if not hasattr(local, 'channel'):
        channel = init()
        print(f'线程:{thread_id} 创建 channel')
        local.channel = channel
    local.channel.basic_publish(exchange='', routing_key='tasks', body='Hello World!')
    print(n, end='\n')
    return f'线程:{thread_id}发送成功'


def pprint(res):
    print(res)


def run():
    for i in range(50):
        d = deferToThread(loop1, i)
        d.addCallback(pprint)


if __name__ == '__main__':
    run()
    reactor.run()

这个示例与上一示例基本一致,只是将 concurrent.futures.ThreadPoolExecutor 换成了 twisted 的 reactor 和 deferToThread

这种方式,可创建一个 reactor 的环,通过将 deferToThread 的实例加入 reactor 去执行,然后成功后回调结果,这也是异步的一种方式。

总结

  • Pika 并不是线程安全的,应该在每个线程里,都使用各种的 BlockingConnection
  • 为了避免每次都创建 connection,在多线程中,最好是使用线程池+ threading.local() 结合使用,线程池可以避免线程的频繁创建,threading.local()避免了pika connection 的频繁创建。参考实例五

来源:http://www.jayden5.cn/2020/11/24/pika-%E7%BA%BF%E7%A8%8B%E4%B8%8D%E5%AE%89%E5%85%A8/

 类似资料: