python生产消费者线程

队列

在Python中,队列是最常用的线程间的通信方法,因为它是线程安全的

from queue import Queue

# 创建队列
#   -- 限制队中最多有 maxsize 个元素
#   -- 如果省略参数,默认元素个数无限制
q = Queue(100)
q1 = Queue()

# 元素入队
q.put(1)
q.put(True)
q.put('abc')

# 队列的大小
print(q.qsize())

# 判断队满
print(q.full())

# 判断队空
print(q.empty())

# 元素出队
#    注意:如果队空,取元素时,会陷入阻塞状态,知道再往队中加入数据为止【***】
while not q.empty():
    print(q.get())

创建线程

方式1


from threading import Thread

def add(n1, n2):
    print('结果为:' + n1 + n2)

def main():
    # 创建一个线程
    #   -- target 函数的名称
    #   -- args 以元组的形式,传入函数所需的参数
    t = Thread(target=add, args=(1, 2,))
    # 开启线程
    t.start()

if __name__ == '__main__':
    main()

方式2

  1. 通过继承 Thread类 创建线程的步骤
    (1) 定义一个类
    (2) 继承 Thread类
    (3) 重写 run() 方法
    (4) 在 run() 方法中写逻辑代码

  2. 注意事项
    (1) 子类继承 Thread类 后,实例化对象时,会自动执行父类中的 run()方法
    所以我们可以重写 run(),然后在 run() 中执行我们自己的代码
    (2) 一个子类继承了 Thread类,那么在对线程执行任何其他操作之前
    它必须确保已调用基类的构造函数
    – 比如:传参时,需要调用的父类的构造函数

from threading import Thread

class MyThread(Thread):
    # 构造函数
    def __init__(self, n1, n2):
        # 调用父类的构造函数:第一种方法
        # threading.Thread.__init__(self)
        # 调用父类的构造函数:第二种方法
        super().__init__()
        self.n1 = n1
        self.n2 = n2
    # 重写 run() 方法
    def run(self):
        print('线程的名称:' + self.name)
        print(self.n1 + self.n2)

def main():
    # 实例化对象的过程,就是在创建线程
    t1 = MyThread(1, 1)
    # 设置线程的名称
    t1.setName('t1')
    # 开启线程
    t1.start()

if __name__ == '__main__':
    main()

锁的使用

一定要保证相关的线程使用的是同一把锁,否则加锁操作无意义

# 加锁之前
# ----------------------------------------------------------
from threading import Thread

num = 0 # 声明共享资源

def Jia():
    # 标注使用共享的资源
    global num
    # 主逻辑代码
    for i in range(10000000):
        num+=1
    print(num)

def main():
    # 创建线程
    t1 = Thread(target=Jia)
    t2 = Thread(target=Jia)
    t3 = Thread(target=Jia)
    # 开启线程
    t1.start()
    t2.start()
    t3.start()

if __name__ == '__main__':
    main()

# 加锁之后
# ----------------------------------------------------------
from threading import Thread
from threading import Lock

lock = Lock() # 声明锁,要保证相关的线程使用的是同一把锁
num = 0 # 声明共享资源

def Jia(lock):
    # 加锁
    lock.acquire()
    # 标注使用共享的资源
    global num
    # 主逻辑代码
    for i in range(10000000):
        num+=1
    print(num)
    # 释放锁
    lock.release()

def main():
    # 创建线程
    t1 = Thread(target=Jia, args=(lock,))
    t2 = Thread(target=Jia, args=(lock,))
    t3 = Thread(target=Jia, args=(lock,))
    # 开启线程
    t1.start()
    t2.start()
    t3.start()

if __name__ == '__main__':
    main()

Thread.join()

作用:阻塞当前所在的线程,只有当执行 join() 的线程结束之后,才会解除阻塞
分析下面的代码:
阻塞前:在主线程中有一句print(‘结束了’),本意是想要在fn函数执行完之后,再输出结束了,但是因为主线程和t1线程是同步的,他们在同时执行,所以print(‘结束了’)的输出位置不一定是最后面,可能是在fn执行一半的时候就输出结束了
阻塞后:t1线程调用了join(),阻塞了当前所在线程,即阻塞了主线程,所以主线程需要等t1线程结束后才可以继续执行主线程的内容,故实现了print(‘结束了’)在fn执行完后在输出内容的需求

# 阻塞前:也就是不调用 join()
# ----------------------------------------------------------
import time
from threading import Thread

def fn():
    for i in range(10):
        print(i)
    time.sleep(1.5)

def main():
    t1 = Thread(target=fn)
    t1.start()
    print('结束了')

if __name__ == '__main__':
    main()


# 阻塞后:调用了 join()
# ----------------------------------------------------------
import time
from threading import Thread

def fn():
    for i in range(10):
        print(i)
    time.sleep(1.5)

def main():
    t1 = Thread(target=fn)
    t1.start()
    t1.join()
    print('结束了')

if __name__ == '__main__':
    main()

守护进程

  1. 进程分为主进程、守护进程、非守护进程

  2. 守护、非守护是相对于主进程 而言的

  3. 守护进程,可以理解为不重要的进程,当主进程结束后,守护进程会强制结束

  4. 非守护进程,是比守护进程重要的进程,当主进程结束后,守护进程不会被强制结束

    # t1进程是非守护进程:t1进程会陷入死循环
    # ----------------------------------------------------------
    from threading import Thread
    

def fn():
while True:
print(1)

def main():
t1 = Thread(target=fn)
t1.start()
print(‘结束了’)

if name == ‘main‘:
main()

```python
# t1进程是守护进程:t1进程会因为主进程的结束,被强制结束
# ----------------------------------------------------------
from threading import Thread

def fn():
    while True:
        print(1)

def main():
    t1 = Thread(target=fn)
    t1.start()
    t1.setDaemon(True) # 设置为True时,说明此进程是"守护进程"【默认是False】
    print('结束了')

if __name__ == '__main__':
    main()

队列在线程之间的通信

# Queue.join()
'''
    当生产者生产结束时,先阻塞生产者线程,只有当消费者发出已经消费完队中产品时,才解除阻塞
'''

# Queue.task_done()
'''
    消费者消费一个队中的产品,就向生产者发送一次信息
    当消费完队中信息之后,也向生产者发送信息,并发出已经消费完的提示,提示生产者可以解除生产者线程的阻塞了
'''

生产者与消费者模式

阶段1:消费者线程的阻塞

from queue import Queue
from threading import Thread

# 生产者
def produce(q):
    for i in range(1, 11):
        q.put(i)
        print(f'生产产品——{i}')

# 消费者
def consumer(q):
    while True:
        tmp = q.get()
        print(f'消费产品——{tmp}')

# 主进程
def main():
    q = Queue()
    pro = Thread(target=produce,  args=(q,))
    con = Thread(target=consumer, args=(q,))
    pro.start()
    con.start()

if __name__ == '__main__':
    main()

在主线程中创建并开启生产者线程和消费者线程,生产者共生产10个产品
生产者生产产品的同时,消费者在调用q.get()方法消费产品,当生产者把产品全部生产完之后,生产者线程结束,消费者继续调用q.get()方法消费产品,当没有产品可以消费时,消费者再调用q.get()时,会导致消费者线程进入阻塞状态,直到再往里面加数据为止,但是生产者已经把产品生产完,不会再生产了,所以消费者线程会一直处于阻塞状态

阶段2:产品消费不完

from queue import Queue
from threading import Thread

# 生产者
def produce(q):
    for i in range(1, 11):
        q.put(i)
        print(f'生产产品——{i}')

# 消费者
def consumer(q):
    while True:
        tmp = q.get()
        print(f'消费产品——{tmp}')

# 主进程
def main():
    q = Queue()
    pro = Thread(target=produce,  args=(q,))
    con = Thread(target=consumer, args=(q,))
    con.setDaemon(True) # 设置守护线程
    pro.start()
    con.start()

if __name__ == '__main__':
    main()

针对阶段1的代码,只添加了一行代码,将消费者线程为 “守护线程”即可
分析
当生产者将产品全部生产完,生产者线程结束,然后主线程也结束了,接着消费者线程作为守护线程被强制退出,解决了消费者线程阻塞的问题
但是,由下图可看到,虽然解决了消费者线程阻塞的问题,但是消费者本次只消费了5个产品,生产者所生产的产品没有被消费完,这个问题请看阶段3

阶段3:小完美的代码

from queue import Queue
from threading import Thread

# 生产者
def produce(q):
    for i in range(1, 11):
        q.put(i)
        print(f'生产产品——{i}')
    q.join() # 阻塞生产者线程,只有接收到消费者发送来的已经消费了最后一个产品的时候,才解除阻塞

# 消费者
def consumer(q):
    while True:
        tmp = q.get()
        print(f'消费产品——{tmp}')
        q.task_done() # 向生产者发送消息,告诉生产者我已经消费了一个产品

# 主进程
def main():
    q = Queue()
    pro = Thread(target=produce,  args=(q,))
    con = Thread(target=consumer, args=(q,))
    con.setDaemon(True)
    pro.start()
    con.start()

if __name__ == '__main__':
    main()

q.join()
q.task_done()
分析:

当生产者将产品全部生产完,生产者线程因为执行了q.join()而被阻塞,只有接收到消费者发送来的已经消费了最后一个产品的时候,才解除阻塞
而消费者线程会边消费产品,边执行q.task_done()给生产者线程发送消息,直到消费完全部的产品时,在给生产者发送消息时,会通知生产者已经消费完全部的产品
此时生产者接收到消费完全部产品的信息,阻塞被解除,生产者线程结束
然后主线程结束
再接着,由于消费者线程的守护线程,被强制关闭

阶段4:有关线程执行顺序的问题

from queue import Queue
from threading import Thread

# 生产者
def produce(q):
    for i in range(1, 11):
        q.put(i)
        print(f'生产产品——{i}')
    q.join() # 阻塞生产者线程,只有接收到消费者发送来的已经消费了最后一个产品的时候,才解除阻塞

# 消费者
def consumer(q):
    while True:
        tmp = q.get()
        print(f'消费产品——{tmp}')
        q.task_done() # 向生产者发送消息,告诉生产者我已经消费了一个产品

# 主进程
def main():
    q = Queue()
    pro = Thread(target=produce,  args=(q,))
    con = Thread(target=consumer, args=(q,))
    con.setDaemon(True)
    pro.start()
    con.start()
    print('结束了')

if __name__ == '__main__':
    main()

与阶段3相比,仅在主线程中添加一行输出语句
分析
我们想要的是两个子线程结束之后,再打印输出生产者和消费者全部结束了呀!!!,但是很明显,结果不是这样的,下面开始分析
程序中有1个主线程、2个子线程,三者会同时执行,所以主线程中的输出语句的执行时间是随机的,故输出的位置也是随机的
解决方法:阻塞当前线程,也就是阻塞主线程,见阶段5

阶段5:线程执行顺序问题的解决

from queue import Queue
from threading import Thread

# 生产者
def produce(q):
    for i in range(1, 11):
        q.put(i)
        print(f'生产产品——{i}')
    q.join() # 阻塞生产者线程,只有接收到消费者发送来的已经消费了最后一个产品的时候,才解除阻塞

# 消费者
def consumer(q):
    while True:
        tmp = q.get()
        print(f'消费产品——{tmp}')
        q.task_done() # 向生产者发送消息,告诉生产者我已经消费了一个产品

# 主进程
def main():
    q = Queue()
    pro = Thread(target=produce,  args=(q,))
    con = Thread(target=consumer, args=(q,))
    con.setDaemon(True)
    pro.start()
    con.start()
    pro.join() # 阻塞当前所在的线程
    print('结束了')

if __name__ == '__main__':
    main()

与阶段4相比,仅添加一句代码,以达到阻塞主线程的需求
分析:
程序中有1个主线程、2个子线程,三者会同时执行
主线程中执行到pro.join()时,当前线程被阻塞,也即主线程被阻塞,知道生产完全部产品,消费完全部产品,生产者线程结束
主线程才被解除阻塞
然后主线程结束,消费者线程被强制结束

阶段6:一个生产则,两个消费者。A和B,A既是消费者 也是B的生产者

产者线程循环生产数据,然后将数据放入输入队列中。消费者A线程从输入队列中取出数据进行处理,然后将处理结果放入输出队列中。消费者B线程从输出队列中取出数据进行处理,并打印处理结果。
需要注意的是,在消费者A线程和消费者B线程中,使用 queue.Queue.task_done() 方法通知队列任务已完成。这样可以保证队列中的任务完成情况得到通知,避免出现问题。同时,在主线程中,使用 queue.Queue.join() 方法等待输入队列和输出队列为空,以保证所有任务都已完成。

import threading
import time
import queue


# 生产者线程
class ProducerThread(threading.Thread):
    def __init__(self, queue_in, queue_out):
        threading.Thread.__init__(self)
        self.queue_in = queue_in
        self.queue_out = queue_out

    def run(self):
        while True:
            # 生产数据
            data = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
            print("生产者生产的数据:", data)

            # 将数据放入输入队列中
            self.queue_in.put(data)
            time.sleep(1)


# 消费者A线程
class ConsumerAThread(threading.Thread):
    def __init__(self, queue_in, queue_out):
        threading.Thread.__init__(self)
        self.queue_in = queue_in
        self.queue_out = queue_out

    def run(self):
        while True:
            # 从输入队列中取出数据
            data = self.queue_in.get()

            # 处理数据
            result = "消费者处理 A: " + data

            # 将处理结果放入输出队列中
            self.queue_out.put(result)
            print("消费者A处理的数据:", result)

            # 通知队列任务已完成
            self.queue_in.task_done()


# 消费者B线程
class ConsumerBThread(threading.Thread):
    def __init__(self, queue_in, queue_out):
        threading.Thread.__init__(self)
        self.queue_in = queue_in
        self.queue_out = queue_out

    def run(self):
        while True:
            # 从输出队列中取出数据
            data = self.queue_out.get()

            # 处理数据
            result = "由消费者处理 B: " + data

            # 打印处理结果
            print("消费者B处理的数据:", result)

            # 通知队列任务已完成
            self.queue_out.task_done()


# 创建输入队列和输出队列
queue_in = queue.Queue()
queue_out = queue.Queue()

# 创建生产者线程、消费者A线程和消费者B线程
producer_thread = ProducerThread(queue_in, queue_out)
consumer_a_thread = ConsumerAThread(queue_in, queue_out)
consumer_b_thread = ConsumerBThread(queue_in, queue_out)

# 启动线程
producer_thread.start()
consumer_a_thread.start()
consumer_b_thread.start()
# 等待输入队列为空
queue_in.join()
# 等待输出队列为空
queue_out.join()

  目录