Skip to content

Python 协程和异步编程

协程(Coroutine)是 Python 中用于实现并发编程的重要概念,而异步编程(Async Programming)则是利用协程实现高效 I/O 操作的编程范式。本章节将详细介绍 Python 中的协程和异步编程概念、语法和使用方法。

什么是协程?

协程是一种特殊的函数,它可以在执行过程中暂停并在将来的某个时间点恢复执行。与线程不同,协程的切换是由程序员控制的,而不是由操作系统调度的。

协程的主要特点包括:

  1. 轻量级:协程比线程更轻量,创建和切换的开销更小
  2. 单线程:协程在单线程中运行,避免了线程安全问题
  3. 协作式:协程之间是协作式的,需要主动让出控制权
  4. 高效 I/O:协程特别适合处理 I/O 密集型任务

协程的发展历程

Python 中的协程发展经历了以下几个阶段:

  1. Python 2.5+:通过 yield 语句实现简单的协程
  2. Python 3.3:引入 yield from 语句,简化协程的实现
  3. Python 3.4:引入 asyncio 模块,提供了异步 I/O 支持
  4. Python 3.5:引入 asyncawait 关键字,使协程的语法更加简洁

基础协程:使用 yield

在 Python 3.5 之前,协程主要通过 yield 语句实现。

简单协程示例

python
# 简单协程示例

def simple_coroutine():
    """简单的协程函数"""
    print("协程开始")
    value = yield
    print(f"收到值: {value}")
    value = yield "协程产生的值"
    print(f"收到另一个值: {value}")
    yield "协程结束"

# 创建协程对象
coro = simple_coroutine()

# 启动协程(到第一个 yield 处)
print("启动协程")
next(coro)

# 发送值并继续执行(到下一个 yield 处)
print("发送第一个值")
result = coro.send("Hello")
print(f"协程返回值: {result}")

# 发送另一个值并继续执行
print("发送第二个值")
result = coro.send("World")
print(f"协程返回值: {result}")

# 尝试继续执行(协程结束,抛出 StopIteration)
try:
    print("继续执行协程")
    next(coro)
except StopIteration:
    print("协程结束")

协程的状态

协程对象有四种状态:

  1. CREATED:协程已创建但未启动
  2. RUNNING:协程正在执行
  3. SUSPENDED:协程在 yield 处暂停
  4. CLOSED:协程执行完毕

可以使用 inspect 模块查看协程的状态:

python
import inspect

def simple_coroutine():
    value = yield
    return value

coro = simple_coroutine()
print(f"协程状态: {inspect.getcoroutinestate(coro)}")  # CREATED

next(coro)
print(f"协程状态: {inspect.getcoroutinestate(coro)}")  # SUSPENDED

try:
    coro.send(42)
except StopIteration as e:
    print(f"协程返回值: {e.value}")

print(f"协程状态: {inspect.getcoroutinestate(coro)}")  # CLOSED

协程的应用:生成器作为协程

python
# 生成器作为协程

def countdown(n):
    """倒计时协程"""
    while n > 0:
        yield n
        n -= 1

# 使用协程
print("倒计时开始")
for i in countdown(5):
    print(f"{i}...")
print("倒计时结束")

# 另一个示例:平均值计算协程
def average():
    """计算平均值的协程"""
    total = 0
    count = 0
    average = None
    while True:
        value = yield average
        if value is None:
            break
        total += value
        count += 1
        average = total / count

# 使用平均值计算协程
avg_coro = average()
next(avg_coro)  # 启动协程

print(f"平均值: {avg_coro.send(10)}")  # 10.0
print(f"平均值: {avg_coro.send(20)}")  # 15.0
print(f"平均值: {avg_coro.send(30)}")  # 20.0

# 结束协程
try:
    avg_coro.send(None)
except StopIteration:
    pass

异步编程:使用 asyncio

Python 3.4 引入了 asyncio 模块,提供了异步 I/O 支持。Python 3.5 引入了 asyncawait 关键字,使异步编程的语法更加简洁。

基本概念

  1. 事件循环(Event Loop):负责调度和执行协程的核心组件
  2. 协程函数(Coroutine Function):使用 async def 定义的函数
  3. 协程对象(Coroutine Object):协程函数的返回值
  4. 任务(Task):包装协程对象,使其可以被事件循环调度
  5. Future:表示异步操作的最终结果

基本语法

python
import asyncio

# 定义协程函数
async def hello():
    """简单的协程函数"""
    print("Hello")
    await asyncio.sleep(1)  # 暂停 1 秒,模拟 I/O 操作
    print("World")

# 运行协程
async def main():
    """主协程函数"""
    print("主协程开始")
    await hello()  # 等待 hello 协程完成
    print("主协程结束")

# 启动事件循环
if __name__ == "__main__":
    asyncio.run(main())  # Python 3.7+

运行结果

主协程开始
Hello
World
主协程结束

异步编程的核心组件

1. 事件循环

事件循环是异步编程的核心,负责调度和执行协程。

python
import asyncio

async def task1():
    """任务 1"""
    print("任务 1 开始")
    await asyncio.sleep(1)
    print("任务 1 结束")
    return "任务 1 结果"

async def task2():
    """任务 2"""
    print("任务 2 开始")
    await asyncio.sleep(0.5)
    print("任务 2 结束")
    return "任务 2 结果"

async def main():
    """主协程"""
    print("主协程开始")
    
    # 创建任务
    t1 = asyncio.create_task(task1())
    t2 = asyncio.create_task(task2())
    
    # 等待任务完成
    result1 = await t1
    result2 = await t2
    
    print(f"任务 1 结果: {result1}")
    print(f"任务 2 结果: {result2}")
    print("主协程结束")

if __name__ == "__main__":
    asyncio.run(main())

2. 任务(Task)

任务是包装协程对象的高级概念,使其可以被事件循环调度。

python
import asyncio

async def task_func():
    """任务函数"""
    print("任务开始")
    await asyncio.sleep(1)
    print("任务结束")
    return "任务结果"

async def main():
    """主协程"""
    # 创建任务
    task = asyncio.create_task(task_func())
    print(f"任务状态: {task.get_name()}, {task.done()}")
    
    # 等待任务完成
    result = await task
    print(f"任务状态: {task.get_name()}, {task.done()}")
    print(f"任务结果: {result}")

if __name__ == "__main__":
    asyncio.run(main())

3. Future

Future 表示异步操作的最终结果,通常由库函数返回。

python
import asyncio

async def main():
    """主协程"""
    # 创建 Future 对象
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    
    # 模拟异步操作
    def set_result():
        future.set_result("Future 结果")
    
    # 安排回调
    loop.call_later(1, set_result)
    
    # 等待 Future 完成
    print("等待 Future 完成")
    result = await future
    print(f"Future 结果: {result}")

if __name__ == "__main__":
    asyncio.run(main())

异步编程的高级特性

1. 并发执行多个任务

使用 asyncio.gather() 并发执行多个任务:

python
import asyncio
import time

async def task(name, delay):
    """带延迟的任务"""
    print(f"任务 {name} 开始,延迟 {delay} 秒")
    await asyncio.sleep(delay)
    print(f"任务 {name} 结束")
    return f"任务 {name} 结果"

async def main():
    """主协程"""
    start_time = time.time()
    
    # 并发执行多个任务
    results = await asyncio.gather(
        task("A", 2),
        task("B", 1),
        task("C", 3)
    )
    
    end_time = time.time()
    print(f"所有任务完成,耗时 {end_time - start_time:.2f} 秒")
    print(f"任务结果: {results}")

if __name__ == "__main__":
    asyncio.run(main())

2. 超时控制

使用 asyncio.wait_for() 设置任务的超时时间:

python
import asyncio

async def long_running_task():
    """长时间运行的任务"""
    print("开始长时间运行的任务")
    await asyncio.sleep(3)
    print("长时间运行的任务完成")
    return "任务结果"

async def main():
    """主协程"""
    try:
        # 设置 2 秒超时
        result = await asyncio.wait_for(long_running_task(), timeout=2)
        print(f"任务结果: {result}")
    except asyncio.TimeoutError:
        print("任务超时")

if __name__ == "__main__":
    asyncio.run(main())

3. 取消任务

使用 task.cancel() 取消任务:

python
import asyncio

async def cancelable_task():
    """可取消的任务"""
    try:
        print("任务开始")
        for i in range(5):
            print(f"工作中... {i}")
            await asyncio.sleep(1)
        print("任务完成")
        return "任务结果"
    except asyncio.CancelledError:
        print("任务被取消")
        raise

async def main():
    """主协程"""
    # 创建任务
    task = asyncio.create_task(cancelable_task())
    
    # 等待 2 秒后取消任务
    await asyncio.sleep(2)
    print("取消任务")
    task.cancel()
    
    try:
        result = await task
        print(f"任务结果: {result}")
    except asyncio.CancelledError:
        print("捕获到任务取消异常")

if __name__ == "__main__":
    asyncio.run(main())

4. 异步上下文管理器

使用 async with 语句创建异步上下文管理器:

python
import asyncio

class AsyncContextManager:
    """异步上下文管理器"""
    
    async def __aenter__(self):
        """进入上下文"""
        print("进入异步上下文")
        await asyncio.sleep(0.5)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """退出上下文"""
        print("退出异步上下文")
        await asyncio.sleep(0.5)

async def main():
    """主协程"""
    print("主协程开始")
    
    async with AsyncContextManager():
        print("在异步上下文中")
        await asyncio.sleep(1)
    
    print("主协程结束")

if __name__ == "__main__":
    asyncio.run(main())

5. 异步迭代器

使用 async for 语句创建异步迭代器:

python
import asyncio

class AsyncIterator:
    """异步迭代器"""
    
    def __init__(self, start, end):
        """初始化"""
        self.start = start
        self.end = end
    
    def __aiter__(self):
        """返回异步迭代器对象"""
        self.current = self.start
        return self
    
    async def __anext__(self):
        """返回下一个值"""
        if self.current >= self.end:
            raise StopAsyncIteration
        value = self.current
        self.current += 1
        await asyncio.sleep(0.5)  # 模拟 I/O 操作
        return value

async def main():
    """主协程"""
    print("开始异步迭代")
    async for num in AsyncIterator(1, 5):
        print(f"异步迭代得到: {num}")
    print("异步迭代结束")

if __name__ == "__main__":
    asyncio.run(main())

异步 I/O 操作

异步编程特别适合处理 I/O 密集型任务,如网络请求、文件操作等。

1. 异步网络请求

使用 aiohttp 库进行异步 HTTP 请求:

python
import asyncio
import aiohttp

async def fetch(session, url):
    """异步获取 URL 内容"""
    async with session.get(url) as response:
        return await response.text()

async def main():
    """主协程"""
    urls = [
        "https://www.example.com",
        "https://www.python.org",
        "https://www.github.com"
    ]
    
    async with aiohttp.ClientSession() as session:
        # 并发执行多个请求
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for url, content in zip(urls, results):
            print(f"URL: {url}, 内容长度: {len(content)}")

if __name__ == "__main__":
    asyncio.run(main())

2. 异步文件操作

使用 aiofiles 库进行异步文件操作:

python
import asyncio
import aiofiles

async def read_file(filename):
    """异步读取文件"""
    async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
        content = await f.read()
    return content

async def write_file(filename, content):
    """异步写入文件"""
    async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
        await f.write(content)

async def main():
    """主协程"""
    # 写入文件
    await write_file('example.txt', 'Hello, Async File!')
    print("文件写入成功")
    
    # 读取文件
    content = await read_file('example.txt')
    print(f"文件内容: {content}")

if __name__ == "__main__":
    asyncio.run(main())

异步编程的最佳实践

1. 使用 asyncio.run() 运行主协程

Python 3.7+ 提供了 asyncio.run() 函数,用于运行主协程:

python
import asyncio

async def main():
    """主协程"""
    print("Hello")
    await asyncio.sleep(1)
    print("World")

if __name__ == "__main__":
    asyncio.run(main())  # 推荐使用

2. 避免阻塞操作

在异步代码中,应避免使用阻塞操作,如 time.sleep()、同步文件 I/O 等。应使用对应的异步版本,如 asyncio.sleep()aiofiles 等。

python
import asyncio
import time

async def bad_example():
    """不好的示例:使用阻塞操作"""
    print("开始")
    time.sleep(1)  # 阻塞操作,会暂停整个事件循环
    print("结束")

async def good_example():
    """好的示例:使用异步操作"""
    print("开始")
    await asyncio.sleep(1)  # 异步操作,只会暂停当前协程
    print("结束")

async def main():
    """主协程"""
    await asyncio.gather(
        bad_example(),
        good_example()
    )

if __name__ == "__main__":
    asyncio.run(main())

3. 使用 async with 和 async for

对于支持异步上下文管理和异步迭代的对象,应使用 async withasync for 语句:

python
import asyncio
import aiohttp

async def main():
    """主协程"""
    # 正确:使用 async with
    async with aiohttp.ClientSession() as session:
        async with session.get("https://www.example.com") as response:
            content = await response.text()
            print(f"内容长度: {len(content)}")

if __name__ == "__main__":
    asyncio.run(main())

4. 合理使用任务和 Future

对于需要并发执行的任务,应使用 asyncio.create_task() 创建任务。对于需要手动控制的异步操作,应使用 Future 对象。

python
import asyncio

async def task1():
    """任务 1"""
    await asyncio.sleep(1)
    return "任务 1 结果"

async def task2():
    """任务 2"""
    await asyncio.sleep(2)
    return "任务 2 结果"

async def main():
    """主协程"""
    # 创建任务
    t1 = asyncio.create_task(task1())
    t2 = asyncio.create_task(task2())
    
    # 等待任务完成
    result1 = await t1
    result2 = await t2
    
    print(f"任务 1 结果: {result1}")
    print(f"任务 2 结果: {result2}")

if __name__ == "__main__":
    asyncio.run(main())

5. 处理异常

在异步代码中,应妥善处理异常:

python
import asyncio

async def error_task():
    """会抛出异常的任务"""
    await asyncio.sleep(1)
    raise ValueError("任务执行失败")

async def main():
    """主协程"""
    try:
        await error_task()
    except ValueError as e:
        print(f"捕获到异常: {e}")

if __name__ == "__main__":
    asyncio.run(main())

异步编程的实际应用

示例 1:异步 Web 服务器

使用 aiohttp 库创建异步 Web 服务器:

python
from aiohttp import web
import asyncio

async def handle(request):
    """处理 HTTP 请求"""
    name = request.match_info.get('name', "Anonymous")
    await asyncio.sleep(0.5)  # 模拟 I/O 操作
    return web.Response(text=f"Hello, {name}!")

async def main():
    """主协程"""
    app = web.Application()
    app.add_routes([
        web.get('/', handle),
        web.get('/{name}', handle)
    ])
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, 'localhost', 8080)
    await site.start()
    print("服务器启动,监听端口 8080")
    
    # 保持服务器运行
    while True:
        await asyncio.sleep(3600)

if __name__ == "__main__":
    asyncio.run(main())

示例 2:异步数据库操作

使用 asyncpg 库进行异步 PostgreSQL 数据库操作:

python
import asyncio
import asyncpg

async def main():
    """主协程"""
    # 连接数据库
    conn = await asyncpg.connect(
        host='localhost',
        port=5432,
        user='postgres',
        password='password',
        database='test'
    )
    
    try:
        # 创建表
        await conn.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id SERIAL PRIMARY KEY,
                name TEXT NOT NULL,
                email TEXT NOT NULL UNIQUE
            )
        ''')
        print("表创建成功")
        
        # 插入数据
        await conn.execute('''
            INSERT INTO users (name, email) VALUES ($1, $2)
            ON CONFLICT (email) DO NOTHING
        ''', 'Alice', 'alice@example.com')
        print("数据插入成功")
        
        # 查询数据
        users = await conn.fetch('SELECT * FROM users')
        print("查询结果:")
        for user in users:
            print(f"ID: {user['id']}, 姓名: {user['name']}, 邮箱: {user['email']}")
    finally:
        # 关闭连接
        await conn.close()

if __name__ == "__main__":
    asyncio.run(main())

示例 3:异步爬虫

使用 aiohttp 库创建异步爬虫:

python
import asyncio
import aiohttp
from bs4 import BeautifulSoup

async def fetch_page(session, url):
    """异步获取页面内容"""
    try:
        async with session.get(url) as response:
            if response.status == 200:
                return await response.text()
            return None
    except Exception as e:
        print(f"获取 {url} 失败: {e}")
        return None

async def parse_page(url, content):
    """解析页面内容"""
    if not content:
        return []
    
    soup = BeautifulSoup(content, 'html.parser')
    links = []
    
    # 提取所有链接
    for a in soup.find_all('a', href=True):
        href = a['href']
        # 处理相对路径
        if href.startswith('http'):
            links.append(href)
        elif href.startswith('/'):
            base_url = '/'.join(url.split('/')[:3])
            links.append(base_url + href)
    
    return links

async def crawl(url, max_depth=2, current_depth=1):
    """异步爬取网站"""
    if current_depth > max_depth:
        return
    
    print(f"爬取: {url} (深度: {current_depth})")
    
    async with aiohttp.ClientSession() as session:
        # 获取页面内容
        content = await fetch_page(session, url)
        
        # 解析页面
        links = await parse_page(url, content)
        
        # 去重并过滤
        unique_links = list(set(links))
        filtered_links = [link for link in unique_links if 'example.com' in link]
        
        print(f"从 {url} 找到 {len(filtered_links)} 个链接")
        
        # 递归爬取子链接
        if current_depth < max_depth:
            tasks = [crawl(link, max_depth, current_depth + 1) for link in filtered_links[:5]]  # 限制数量
            await asyncio.gather(*tasks)

async def main():
    """主协程"""
    start_url = "https://www.example.com"
    await crawl(start_url, max_depth=2)

if __name__ == "__main__":
    asyncio.run(main())

总结

协程和异步编程是 Python 中实现并发编程的重要技术,特别适合处理 I/O 密集型任务。本章节介绍了:

  1. 协程的基本概念:使用 yield 实现的基础协程
  2. 异步编程的核心组件:事件循环、协程函数、任务、Future
  3. 异步编程的语法async defawait 关键字
  4. 异步编程的高级特性:并发执行、超时控制、取消任务、异步上下文管理器、异步迭代器
  5. 异步 I/O 操作:网络请求、文件操作
  6. 异步编程的最佳实践:避免阻塞操作、使用 async withasync for、合理使用任务和 Future、处理异常
  7. 异步编程的实际应用:Web 服务器、数据库操作、爬虫

掌握协程和异步编程,可以大大提高 Python 程序的并发性能,特别是在处理大量 I/O 操作的场景中。