Appearance
Python 协程和异步编程
协程(Coroutine)是 Python 中用于实现并发编程的重要概念,而异步编程(Async Programming)则是利用协程实现高效 I/O 操作的编程范式。本章节将详细介绍 Python 中的协程和异步编程概念、语法和使用方法。
什么是协程?
协程是一种特殊的函数,它可以在执行过程中暂停并在将来的某个时间点恢复执行。与线程不同,协程的切换是由程序员控制的,而不是由操作系统调度的。
协程的主要特点包括:
- 轻量级:协程比线程更轻量,创建和切换的开销更小
- 单线程:协程在单线程中运行,避免了线程安全问题
- 协作式:协程之间是协作式的,需要主动让出控制权
- 高效 I/O:协程特别适合处理 I/O 密集型任务
协程的发展历程
Python 中的协程发展经历了以下几个阶段:
- Python 2.5+:通过
yield语句实现简单的协程 - Python 3.3:引入
yield from语句,简化协程的实现 - Python 3.4:引入
asyncio模块,提供了异步 I/O 支持 - Python 3.5:引入
async和await关键字,使协程的语法更加简洁
基础协程:使用 yield
在 Python 3.5 之前,协程主要通过 yield 语句实现。
简单协程示例
python
# 简单协程示例
def simple_coroutine():
"""简单的协程函数"""
print("协程开始")
value = yield
print(f"收到值: {value}")
value = yield "协程产生的值"
print(f"收到另一个值: {value}")
yield "协程结束"
# 创建协程对象
coro = simple_coroutine()
# 启动协程(到第一个 yield 处)
print("启动协程")
next(coro)
# 发送值并继续执行(到下一个 yield 处)
print("发送第一个值")
result = coro.send("Hello")
print(f"协程返回值: {result}")
# 发送另一个值并继续执行
print("发送第二个值")
result = coro.send("World")
print(f"协程返回值: {result}")
# 尝试继续执行(协程结束,抛出 StopIteration)
try:
print("继续执行协程")
next(coro)
except StopIteration:
print("协程结束")协程的状态
协程对象有四种状态:
- CREATED:协程已创建但未启动
- RUNNING:协程正在执行
- SUSPENDED:协程在 yield 处暂停
- CLOSED:协程执行完毕
可以使用 inspect 模块查看协程的状态:
python
import inspect
def simple_coroutine():
value = yield
return value
coro = simple_coroutine()
print(f"协程状态: {inspect.getcoroutinestate(coro)}") # CREATED
next(coro)
print(f"协程状态: {inspect.getcoroutinestate(coro)}") # SUSPENDED
try:
coro.send(42)
except StopIteration as e:
print(f"协程返回值: {e.value}")
print(f"协程状态: {inspect.getcoroutinestate(coro)}") # CLOSED协程的应用:生成器作为协程
python
# 生成器作为协程
def countdown(n):
"""倒计时协程"""
while n > 0:
yield n
n -= 1
# 使用协程
print("倒计时开始")
for i in countdown(5):
print(f"{i}...")
print("倒计时结束")
# 另一个示例:平均值计算协程
def average():
"""计算平均值的协程"""
total = 0
count = 0
average = None
while True:
value = yield average
if value is None:
break
total += value
count += 1
average = total / count
# 使用平均值计算协程
avg_coro = average()
next(avg_coro) # 启动协程
print(f"平均值: {avg_coro.send(10)}") # 10.0
print(f"平均值: {avg_coro.send(20)}") # 15.0
print(f"平均值: {avg_coro.send(30)}") # 20.0
# 结束协程
try:
avg_coro.send(None)
except StopIteration:
pass异步编程:使用 asyncio
Python 3.4 引入了 asyncio 模块,提供了异步 I/O 支持。Python 3.5 引入了 async 和 await 关键字,使异步编程的语法更加简洁。
基本概念
- 事件循环(Event Loop):负责调度和执行协程的核心组件
- 协程函数(Coroutine Function):使用
async def定义的函数 - 协程对象(Coroutine Object):协程函数的返回值
- 任务(Task):包装协程对象,使其可以被事件循环调度
- Future:表示异步操作的最终结果
基本语法
python
import asyncio
# 定义协程函数
async def hello():
"""简单的协程函数"""
print("Hello")
await asyncio.sleep(1) # 暂停 1 秒,模拟 I/O 操作
print("World")
# 运行协程
async def main():
"""主协程函数"""
print("主协程开始")
await hello() # 等待 hello 协程完成
print("主协程结束")
# 启动事件循环
if __name__ == "__main__":
asyncio.run(main()) # Python 3.7+运行结果
主协程开始
Hello
World
主协程结束异步编程的核心组件
1. 事件循环
事件循环是异步编程的核心,负责调度和执行协程。
python
import asyncio
async def task1():
"""任务 1"""
print("任务 1 开始")
await asyncio.sleep(1)
print("任务 1 结束")
return "任务 1 结果"
async def task2():
"""任务 2"""
print("任务 2 开始")
await asyncio.sleep(0.5)
print("任务 2 结束")
return "任务 2 结果"
async def main():
"""主协程"""
print("主协程开始")
# 创建任务
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
# 等待任务完成
result1 = await t1
result2 = await t2
print(f"任务 1 结果: {result1}")
print(f"任务 2 结果: {result2}")
print("主协程结束")
if __name__ == "__main__":
asyncio.run(main())2. 任务(Task)
任务是包装协程对象的高级概念,使其可以被事件循环调度。
python
import asyncio
async def task_func():
"""任务函数"""
print("任务开始")
await asyncio.sleep(1)
print("任务结束")
return "任务结果"
async def main():
"""主协程"""
# 创建任务
task = asyncio.create_task(task_func())
print(f"任务状态: {task.get_name()}, {task.done()}")
# 等待任务完成
result = await task
print(f"任务状态: {task.get_name()}, {task.done()}")
print(f"任务结果: {result}")
if __name__ == "__main__":
asyncio.run(main())3. Future
Future 表示异步操作的最终结果,通常由库函数返回。
python
import asyncio
async def main():
"""主协程"""
# 创建 Future 对象
loop = asyncio.get_running_loop()
future = loop.create_future()
# 模拟异步操作
def set_result():
future.set_result("Future 结果")
# 安排回调
loop.call_later(1, set_result)
# 等待 Future 完成
print("等待 Future 完成")
result = await future
print(f"Future 结果: {result}")
if __name__ == "__main__":
asyncio.run(main())异步编程的高级特性
1. 并发执行多个任务
使用 asyncio.gather() 并发执行多个任务:
python
import asyncio
import time
async def task(name, delay):
"""带延迟的任务"""
print(f"任务 {name} 开始,延迟 {delay} 秒")
await asyncio.sleep(delay)
print(f"任务 {name} 结束")
return f"任务 {name} 结果"
async def main():
"""主协程"""
start_time = time.time()
# 并发执行多个任务
results = await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3)
)
end_time = time.time()
print(f"所有任务完成,耗时 {end_time - start_time:.2f} 秒")
print(f"任务结果: {results}")
if __name__ == "__main__":
asyncio.run(main())2. 超时控制
使用 asyncio.wait_for() 设置任务的超时时间:
python
import asyncio
async def long_running_task():
"""长时间运行的任务"""
print("开始长时间运行的任务")
await asyncio.sleep(3)
print("长时间运行的任务完成")
return "任务结果"
async def main():
"""主协程"""
try:
# 设置 2 秒超时
result = await asyncio.wait_for(long_running_task(), timeout=2)
print(f"任务结果: {result}")
except asyncio.TimeoutError:
print("任务超时")
if __name__ == "__main__":
asyncio.run(main())3. 取消任务
使用 task.cancel() 取消任务:
python
import asyncio
async def cancelable_task():
"""可取消的任务"""
try:
print("任务开始")
for i in range(5):
print(f"工作中... {i}")
await asyncio.sleep(1)
print("任务完成")
return "任务结果"
except asyncio.CancelledError:
print("任务被取消")
raise
async def main():
"""主协程"""
# 创建任务
task = asyncio.create_task(cancelable_task())
# 等待 2 秒后取消任务
await asyncio.sleep(2)
print("取消任务")
task.cancel()
try:
result = await task
print(f"任务结果: {result}")
except asyncio.CancelledError:
print("捕获到任务取消异常")
if __name__ == "__main__":
asyncio.run(main())4. 异步上下文管理器
使用 async with 语句创建异步上下文管理器:
python
import asyncio
class AsyncContextManager:
"""异步上下文管理器"""
async def __aenter__(self):
"""进入上下文"""
print("进入异步上下文")
await asyncio.sleep(0.5)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""退出上下文"""
print("退出异步上下文")
await asyncio.sleep(0.5)
async def main():
"""主协程"""
print("主协程开始")
async with AsyncContextManager():
print("在异步上下文中")
await asyncio.sleep(1)
print("主协程结束")
if __name__ == "__main__":
asyncio.run(main())5. 异步迭代器
使用 async for 语句创建异步迭代器:
python
import asyncio
class AsyncIterator:
"""异步迭代器"""
def __init__(self, start, end):
"""初始化"""
self.start = start
self.end = end
def __aiter__(self):
"""返回异步迭代器对象"""
self.current = self.start
return self
async def __anext__(self):
"""返回下一个值"""
if self.current >= self.end:
raise StopAsyncIteration
value = self.current
self.current += 1
await asyncio.sleep(0.5) # 模拟 I/O 操作
return value
async def main():
"""主协程"""
print("开始异步迭代")
async for num in AsyncIterator(1, 5):
print(f"异步迭代得到: {num}")
print("异步迭代结束")
if __name__ == "__main__":
asyncio.run(main())异步 I/O 操作
异步编程特别适合处理 I/O 密集型任务,如网络请求、文件操作等。
1. 异步网络请求
使用 aiohttp 库进行异步 HTTP 请求:
python
import asyncio
import aiohttp
async def fetch(session, url):
"""异步获取 URL 内容"""
async with session.get(url) as response:
return await response.text()
async def main():
"""主协程"""
urls = [
"https://www.example.com",
"https://www.python.org",
"https://www.github.com"
]
async with aiohttp.ClientSession() as session:
# 并发执行多个请求
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, content in zip(urls, results):
print(f"URL: {url}, 内容长度: {len(content)}")
if __name__ == "__main__":
asyncio.run(main())2. 异步文件操作
使用 aiofiles 库进行异步文件操作:
python
import asyncio
import aiofiles
async def read_file(filename):
"""异步读取文件"""
async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
content = await f.read()
return content
async def write_file(filename, content):
"""异步写入文件"""
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
await f.write(content)
async def main():
"""主协程"""
# 写入文件
await write_file('example.txt', 'Hello, Async File!')
print("文件写入成功")
# 读取文件
content = await read_file('example.txt')
print(f"文件内容: {content}")
if __name__ == "__main__":
asyncio.run(main())异步编程的最佳实践
1. 使用 asyncio.run() 运行主协程
Python 3.7+ 提供了 asyncio.run() 函数,用于运行主协程:
python
import asyncio
async def main():
"""主协程"""
print("Hello")
await asyncio.sleep(1)
print("World")
if __name__ == "__main__":
asyncio.run(main()) # 推荐使用2. 避免阻塞操作
在异步代码中,应避免使用阻塞操作,如 time.sleep()、同步文件 I/O 等。应使用对应的异步版本,如 asyncio.sleep()、aiofiles 等。
python
import asyncio
import time
async def bad_example():
"""不好的示例:使用阻塞操作"""
print("开始")
time.sleep(1) # 阻塞操作,会暂停整个事件循环
print("结束")
async def good_example():
"""好的示例:使用异步操作"""
print("开始")
await asyncio.sleep(1) # 异步操作,只会暂停当前协程
print("结束")
async def main():
"""主协程"""
await asyncio.gather(
bad_example(),
good_example()
)
if __name__ == "__main__":
asyncio.run(main())3. 使用 async with 和 async for
对于支持异步上下文管理和异步迭代的对象,应使用 async with 和 async for 语句:
python
import asyncio
import aiohttp
async def main():
"""主协程"""
# 正确:使用 async with
async with aiohttp.ClientSession() as session:
async with session.get("https://www.example.com") as response:
content = await response.text()
print(f"内容长度: {len(content)}")
if __name__ == "__main__":
asyncio.run(main())4. 合理使用任务和 Future
对于需要并发执行的任务,应使用 asyncio.create_task() 创建任务。对于需要手动控制的异步操作,应使用 Future 对象。
python
import asyncio
async def task1():
"""任务 1"""
await asyncio.sleep(1)
return "任务 1 结果"
async def task2():
"""任务 2"""
await asyncio.sleep(2)
return "任务 2 结果"
async def main():
"""主协程"""
# 创建任务
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
# 等待任务完成
result1 = await t1
result2 = await t2
print(f"任务 1 结果: {result1}")
print(f"任务 2 结果: {result2}")
if __name__ == "__main__":
asyncio.run(main())5. 处理异常
在异步代码中,应妥善处理异常:
python
import asyncio
async def error_task():
"""会抛出异常的任务"""
await asyncio.sleep(1)
raise ValueError("任务执行失败")
async def main():
"""主协程"""
try:
await error_task()
except ValueError as e:
print(f"捕获到异常: {e}")
if __name__ == "__main__":
asyncio.run(main())异步编程的实际应用
示例 1:异步 Web 服务器
使用 aiohttp 库创建异步 Web 服务器:
python
from aiohttp import web
import asyncio
async def handle(request):
"""处理 HTTP 请求"""
name = request.match_info.get('name', "Anonymous")
await asyncio.sleep(0.5) # 模拟 I/O 操作
return web.Response(text=f"Hello, {name}!")
async def main():
"""主协程"""
app = web.Application()
app.add_routes([
web.get('/', handle),
web.get('/{name}', handle)
])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
print("服务器启动,监听端口 8080")
# 保持服务器运行
while True:
await asyncio.sleep(3600)
if __name__ == "__main__":
asyncio.run(main())示例 2:异步数据库操作
使用 asyncpg 库进行异步 PostgreSQL 数据库操作:
python
import asyncio
import asyncpg
async def main():
"""主协程"""
# 连接数据库
conn = await asyncpg.connect(
host='localhost',
port=5432,
user='postgres',
password='password',
database='test'
)
try:
# 创建表
await conn.execute('''
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE
)
''')
print("表创建成功")
# 插入数据
await conn.execute('''
INSERT INTO users (name, email) VALUES ($1, $2)
ON CONFLICT (email) DO NOTHING
''', 'Alice', 'alice@example.com')
print("数据插入成功")
# 查询数据
users = await conn.fetch('SELECT * FROM users')
print("查询结果:")
for user in users:
print(f"ID: {user['id']}, 姓名: {user['name']}, 邮箱: {user['email']}")
finally:
# 关闭连接
await conn.close()
if __name__ == "__main__":
asyncio.run(main())示例 3:异步爬虫
使用 aiohttp 库创建异步爬虫:
python
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def fetch_page(session, url):
"""异步获取页面内容"""
try:
async with session.get(url) as response:
if response.status == 200:
return await response.text()
return None
except Exception as e:
print(f"获取 {url} 失败: {e}")
return None
async def parse_page(url, content):
"""解析页面内容"""
if not content:
return []
soup = BeautifulSoup(content, 'html.parser')
links = []
# 提取所有链接
for a in soup.find_all('a', href=True):
href = a['href']
# 处理相对路径
if href.startswith('http'):
links.append(href)
elif href.startswith('/'):
base_url = '/'.join(url.split('/')[:3])
links.append(base_url + href)
return links
async def crawl(url, max_depth=2, current_depth=1):
"""异步爬取网站"""
if current_depth > max_depth:
return
print(f"爬取: {url} (深度: {current_depth})")
async with aiohttp.ClientSession() as session:
# 获取页面内容
content = await fetch_page(session, url)
# 解析页面
links = await parse_page(url, content)
# 去重并过滤
unique_links = list(set(links))
filtered_links = [link for link in unique_links if 'example.com' in link]
print(f"从 {url} 找到 {len(filtered_links)} 个链接")
# 递归爬取子链接
if current_depth < max_depth:
tasks = [crawl(link, max_depth, current_depth + 1) for link in filtered_links[:5]] # 限制数量
await asyncio.gather(*tasks)
async def main():
"""主协程"""
start_url = "https://www.example.com"
await crawl(start_url, max_depth=2)
if __name__ == "__main__":
asyncio.run(main())总结
协程和异步编程是 Python 中实现并发编程的重要技术,特别适合处理 I/O 密集型任务。本章节介绍了:
- 协程的基本概念:使用
yield实现的基础协程 - 异步编程的核心组件:事件循环、协程函数、任务、Future
- 异步编程的语法:
async def、await关键字 - 异步编程的高级特性:并发执行、超时控制、取消任务、异步上下文管理器、异步迭代器
- 异步 I/O 操作:网络请求、文件操作
- 异步编程的最佳实践:避免阻塞操作、使用
async with和async for、合理使用任务和 Future、处理异常 - 异步编程的实际应用:Web 服务器、数据库操作、爬虫
掌握协程和异步编程,可以大大提高 Python 程序的并发性能,特别是在处理大量 I/O 操作的场景中。