# 耗时任务:听音乐 defmusic(name): print'I am listening to music {0}'.format(name) time.sleep(1)
# 耗时任务:看电影 defmovie(name): print'I am watching movie {0}'.format(name) time.sleep(5)
假如我现在要听10首音乐、看2部电影,那么我就有如下几种方案:
方案一:先一个个听完10首音乐,再一个个看完2部电影,顺序完成,代码如下:
1 2 3 4 5 6 7
# 单线程操作:顺序执行听10首音乐,看2部电影 @fn_timer defsingle_thread(): for i inrange(10): music(i) for i inrange(2): movie(i)
让我们执行一下这段代码,输出如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
I am listening to music 0 I am listening to music 1 I am listening to music 2 I am listening to music 3 I am listening to music 4 I am listening to music 5 I am listening to music 6 I am listening to music 7 I am listening to music 8 I am listening to music 9 I am watching movie 0 I am watching movie 1 [finished function:single_thread in 20.14s]
# 多线程执行:听10首音乐,看2部电影 @fn_timer defmulti_thread(): # 线程列表 threads = [] for i inrange(10): # 创建一个线程,target参数为任务处理函数,args为任务处理函数所需的参数元组 threads.append(threading.Thread(target = music,args = (i,))) for i inrange(2): threads.append(threading.Thread(target = movie,args = (i,)))
for t in threads: # 设为守护线程 t.setDaemon(True) # 开始线程 t.start() for t in threads: t.join()
执行上述代码,运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13
I am listening to music 0 I am listening to music 1 I am listening to music 2 I am listening to music 3 I am listening to music 4 I am listening to music 5 I am listening to music 6 I am listening to music 7 I am listening to music 8 I am listening to music 9 I am watching movie 0 I am watching movie 1 [finished function:multi_thread in 5.02s]
I am listening to music 0 I am listening to music 1 I am listening to music 2 I am listening to music 3 I am listening to music 4 I am listening to music 5 I am listening to music 6 I am listening to music 7 I am listening to music 8 I am listening to music 9 I am watching movie 0 I am watching movie 1 [finished function:use_pool in 6.12s]
# coding:utf-8 # 测试多线程 import threading import time from utils import fn_timer from multiprocessing.dummy import Pool import requests from utils import urls
# 应用:使用单线程下载多个网页的内容 @fn_timer defdownload_using_single_thread(urls): resps = [] for url in urls: resp = requests.get(url) resps.append(resp) return resps
# 应用:使用多线程下载多个网页的内容 @fn_timer defdownload_using_multi_thread(urls): threads = [] for url in urls: threads.append(threading.Thread(target = requests.get,args = (url,))) for t in threads: t.setDaemon(True) t.start() for t in threads: t.join()
# coding:utf-8 # 测试多进程 import os import time from multiprocessing import Process,Pool,Queue from utils import fn_timer import random
# 简单的任务 @fn_timer defdo_simple_task(task_name): print'Run child process {0}, task name is: {1}'.format(os.getpid(),task_name) time.sleep(1.2) return task_name
@fn_timer # 2. 测试使用进程池 deftest_use_process_pool(): # 创建一个进程池,数字表示一次性同时执行的最大子进程数 pool = Pool(5) # 任务返回值列表 results = [] # 任务名称列表 task_names = [] for i inrange(7): task_names.append('task{0}'.format(i)) # 并发执行多个任务,并获取任务返回值 results = pool.map_async(do_simple_task,task_names) print'Many processes will start...' pool.close() pool.join()
print'All processes end, results is: {0}'.format(results.get())
defmain(): test_simple_multi_process() # 输出: ''' Process will start... Run child process 1524, task name is: task2 Run child process 1728, task name is: task1 [finished function:do_simple_task in 1.20s] [finished function:do_simple_task in 1.20s] Process end. [finished function:test_simple_multi_process in 1.34s] '''
test_use_process_pool() # 输出: ''' Many processes will start... Run child process 7568, task name is: task0 Run child process 7644, task name is: task1 Run child process 7628, task name is: task2 Run child process 7620, task name is: task3 Run child process 7660, task name is: task4 [finished function:do_simple_task in 1.20s] Run child process 7568, task name is: task5 [finished function:do_simple_task in 1.20s] Run child process 7644, task name is: task6 [finished function:do_simple_task in 1.20s] [finished function:do_simple_task in 1.20s] [finished function:do_simple_task in 1.20s] [finished function:do_simple_task in 1.20s] [finished function:do_simple_task in 1.20s] All processes end, results is: ['task0', 'task1', 'task2', 'task3', 'task4', 'task5', 'task6'] [finished function:test_use_process_pool in 2.62s] ''' if __name__ == '__main__': main()
defmain(): test_communication_between_process() # 输出 ''' Put value: A to queue. Get value: A from queue. Put value: B to queue. Get value: B from queue. Put value: C to queue. Get value: C from queue. '''
# coding:utf-8 # 测试协程 import requests import gevent import utils from utils import fn_timer from gevent.pool import Pool from gevent import monkey # 打动态补丁,把标准库中的thread/socket等替换掉,让它们变成非阻塞的 monkey.patch_all()
session = requests.Session() session.headers['User-Agent'] = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.131 Safari/537.36'
@fn_timer defdownload_using_single_thread(urls): ''' 顺序执行下载多个网页 :param urls: 要下载的网页内容 :return: 响应列表 ''' resps = [] for url in urls: resps.append(session.get(url)) return resps
# coding:utf-8 # 对比多线程、多进程和协程下载网页 import requests import utils from utils import fn_timer from multiprocessing.dummy import Pool as thread_pool from multiprocessing import Pool as process_pool from gevent.pool import Pool as gevent_pool from gevent import monkey # 打动态补丁,把标准库中的thread/socket等替换掉,让它们变成非阻塞的 monkey.patch_all()
session = requests.Session() session.headers['User-Agent'] = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.131 Safari/537.36'