python网络编程–池
进程池
为什么要有进程池?进程池的概念:
在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?
在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。
池的用法
# ThreadPoolExcutor
# ProcessPoolExcutor
# 创建一个池子
# tp = ThreadPoolExcutor(池中线程/进程的个数)
# 异步提交任务
# ret = tp.submit(函数,参数1,参数2....)
# 获取返回值
# ret.result()
# 在异步的执行完所有任务之后,主线程/主进程才开始执行的代码
# tp.shutdown() 阻塞 直到所有的任务都执行完毕
# map方法
# ret = tp.map(func,iterable) 迭代获取iterable中的内容,作为func的参数,让子线程来执行对应的任务
# for i in ret: 每一个都是任务的返回值
# 回调函数
# ret.add_done_callback(函数名)
# 要在ret对应的任务执行完毕之后,直接继续执行add_done_callback绑定的函数中的内容,并且ret的结果会作为参数返回给绑定的函数
concurrent.futures模块
ProcessPoolExecutor进程池的使用方法
submit + showdown:
import os import time import random from concurrent.futures import ProcessPoolExecutor def func(i): print('%s start'%i,os.getpid()) time.sleep(random.randint(1,3)) print('%s end'%i,os.getpid()) if __name__ == '__main__': p = ProcessPoolExecutor(5) # 创建进程池个数 for i in range(10): p.submit(func,i) # 提交任务,第一个参数是函数,剩下的全是参数(可以传多个参数) p.shutdown() # 关闭池之后不能继续提交任务,并且会阻塞,直到已经提交的任务完成 print('main --> ',os.getpid())
任务的参数 + 返回值
import os import time import random from concurrent.futures import ProcessPoolExecutor def func(i): print('%s start'%i,os.getpid()) time.sleep(random.randint(1,3)) print('%s end'%i,os.getpid()) return '%s * %s'%(i,os.getpid()) if __name__ == '__main__': p = ProcessPoolExecutor(5) ret_l = [] for i in range(10): ret = p.submit(func,i) ret_l.append(ret) for ret in ret_l: print('ret --> %s'%ret.result()) # 同步阻塞 print('main --> ',os.getpid())
线程池
线程池和进程池使用方法相同,唯一不同的是线程调用的是ThreadPoolExecutor,其余用法一样
用法示例:
进程池的使用
# !/usr/bin/evn python # -*- coding:utf-8 -*- import time import random from threading import Thread,current_thread from concurrent.futures import ThreadPoolExecutor def func(i): print('%i --> start'%i,current_thread().ident) time.sleep(random.random()) print('%i --> end'%i,current_thread().ident) return '%s ** %s'%(i,current_thread().ident) start_time = time.time() # 创建一个池子 tp = ThreadPoolExecutor(20) tp_l = [] for i in range(100): # 异步提交任务 ret = tp.submit(func,i) tp_l.append(ret) for i in tp_l: print('ret -->',i.result()) print('main --> ',current_thread().ident) end_time = time.time() print('time:', end_time - start_time)
map用法
from concurrent.futures import ThreadPoolExecutor def func(i): print('start', os.getpid()) time.sleep(random.randint(1,3)) print('end', os.getpid()) return '%s * %s'%(i,os.getpid()) tp = ThreadPoolExecutor(20) ret = tp.map(func,range(20))
回调函数
# !/usr/bin/evn python
# -*- coding:utf-8 -*-
import requests
from concurrent.futures import ThreadPoolExecutor
def get_page(url):
res = requests.get(url)
return {'url':url,'content':res.text}
def parserpage(ret):
dic = ret.result()
print(dic['url'])
tp = ThreadPoolExecutor(5)
url_lst = [
'http://www.baidu.com', # 3
'http://www.cnblogs.com', # 1
'http://www.douban.com', # 1
'http://www.tencent.com',
'http://www.cnblogs.com/Eva-J/articles/8306047.html',
'http://www.cnblogs.com/Eva-J/articles/7206498.html',
]
ret_l = []
for url in url_lst:
ret = tp.submit(get_page,url)
ret_l.append(ret)
ret.add_done_callback(parserpage) # 哪一个任务先完成,自身的返回值就先调用add_done中的函数