生产者消费者模型
生产者消费者模型,简单来说就是为了达到某件是的平衡。
比如生产食物和顾客消费食物,生产是批量的生产,顾客吃食物只能一个一个来吃,使用该模型可以限制生产的量,当量达到最高上限时暂停生产,等待消费者消费再接着进行生产。这样做的好处是:可以根据情况调节生产者消费者个数使之维持平衡,节省内容空间。
概念帖子:生产者消费者模型
什么是生产者消费者模型
# 什么是生产者消费者模型
# 把一个产生数据并且处理数据的过程解耦
# 让生产的数据的过程和处理数据的过程达到一个工作效率上的平衡
# 中间的容器,在多进程中我们使用队列或者可被join的队列,做到控制数据的量
# 当数据过剩的时候,队列的大小会控制这生产者的行为
# 当数据严重不足的时候,队列会控制消费者的行为
# 并且我们还可以通过定期检查队列中元素的个数来调节生产者消费者的个数
# 比如说:一个爬虫,或者一个web程序的server端
# 爬虫
# 请求网页的平均时间是0.3s
# 处理网页代码的时候是0.003s
# 100倍,每启动100个线程生产数据
# 启动一个线程来完成处理数据
# web程序的server端
# 每秒钟有6w条请求
# 一个服务每s中只能处理2000条
# 先写一个web程序,只负责一件事情,就是接收请求,然后把请求放到队列中
# 再写很多个server端,从队列中获取请求,然后处理,然后返回结果
例子:
# !/usr/bin/evn python
# -*- coding:utf-8 -*-
import os
import time
import random
from multiprocessing import Process,Queue
def producer(q,name,food):
print('生产者已启动',os.getpid())
for i in range(20): # 需要生产20个食物
time.sleep(random.random())
fd = '%s%s'%(food,i)
q.put(fd)
print('%s生产了一个%s'%(name,food))
def consumer(q,name):
print('消费者已启动')
while True:
food = q.get()
if not food:break # 没有食物了,退出
time.sleep(random.random())
print('%s吃了%s'%(name,food))
def cp(c_count,p_count):
q = Queue(10) # 队列中的个数;限制有10个食物时停止生产,之后吃一个生产一个直到生产结束
for i in range(c_count):
Process(target=consumer, args=(q, 'baiz')).start()
p_l = []
for i in range(p_count):
p1 = Process(target=producer, args=(q, 'wuye', '烤红薯'))
p1.start()
p_l.append(p1)
for p in p_l:p.join()
for i in range(c_count):
q.put(None) # 食物生产完毕时放入None提示消费者没有食物可以结束了
if __name__ == '__main__':
cp(1,2)
生产者消费者模型实现的爬虫例子:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import os
import re
import requests
from multiprocessing import Process,Queue
heads = {}
heads['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'
def producer(q,url,i):
print('生产者线程 %s 已启动:'%i,os.getpid())
response = requests.get(url,headers=heads)
q.put(response.text)
def consumer(q):
print('消费者线程已启动:',os.getpid())
while True:
s = q.get()
if not s:break
com = re.compile(
'<div class="item">.*?<div class="pic">.*?<em .*?>(?P<id>\d+).*?<span class="title">(?P<title>.*?)</span>'
'.*?<span class="rating_num" .*?>(?P<rating_num>.*?)</span>.*?<span>(?P<comment_num>.*?)评价</span>', re.S)
ret = com.finditer(s)
for i in ret:
print({
"id": i.group("id"),
"title": i.group("title"),
"rating_num": i.group("rating_num"),
"comment_num": i.group("comment_num")}
)
if __name__ == '__main__':
count = 0
q = Queue(3)
p_l = []
for i in range(10):
url = 'https://movie.douban.com/top250?start=%s&filter='%count
count += 25
p = Process(target=producer,args=(q,url,i,))
p.start()
p_l.append(p)
Process(target=consumer, args=(q,)).start()
for c in p_l:c.join()
q.put(None)