Appearance
Python MySQL 数据库操作
MySQL 是一种流行的关系型数据库管理系统,而 Python 提供了多种库来连接和操作 MySQL 数据库。本章节将详细介绍如何在 Python 中使用 MySQL 数据库。
准备工作
安装 MySQL 数据库
首先,需要在系统上安装 MySQL 数据库。可以从 MySQL 官方网站 下载并安装适合您操作系统的版本。
安装 Python MySQL 驱动
Python 中常用的 MySQL 驱动包括:
- mysql-connector-python:MySQL 官方提供的驱动
- PyMySQL:纯 Python 实现的 MySQL 驱动
- MySQLdb:C 语言实现的 MySQL 驱动(仅支持 Python 2)
本章节主要介绍 mysql-connector-python 和 PyMySQL 的使用。
安装 mysql-connector-python
bash
pip install mysql-connector-python安装 PyMySQL
bash
pip install PyMySQL创建测试数据库和表
在开始之前,需要创建一个测试数据库和表:
- 启动 MySQL 服务
- 登录 MySQL 命令行客户端
- 执行以下 SQL 语句:
sql
-- 创建数据库
CREATE DATABASE IF NOT EXISTS test_db;
-- 使用数据库
USE test_db;
-- 创建表
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL UNIQUE,
age INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 插入测试数据
INSERT INTO users (name, email, age) VALUES
('Alice', 'alice@example.com', 30),
('Bob', 'bob@example.com', 25),
('Charlie', 'charlie@example.com', 35);
-- 查看数据
SELECT * FROM users;使用 mysql-connector-python
连接数据库
python
import mysql.connector
# 连接数据库
try:
conn = mysql.connector.connect(
host="localhost", # 数据库主机地址
user="root", # 数据库用户名
password="password", # 数据库密码
database="test_db" # 数据库名称
)
if conn.is_connected():
print("成功连接到数据库")
# 获取数据库服务器信息
db_info = conn.get_server_info()
print(f"数据库服务器版本: {db_info}")
# 获取当前数据库名称
cursor = conn.cursor()
cursor.execute("SELECT DATABASE()")
db_name = cursor.fetchone()[0]
print(f"当前数据库: {db_name}")
cursor.close()
except mysql.connector.Error as e:
print(f"连接数据库失败: {e}")
finally:
if 'conn' in locals() and conn.is_connected():
conn.close()
print("数据库连接已关闭")执行 SQL 查询
python
import mysql.connector
# 连接数据库
conn = mysql.connector.connect(
host="localhost",
user="root",
password="password",
database="test_db"
)
# 创建游标对象
cursor = conn.cursor()
try:
# 执行查询
cursor.execute("SELECT * FROM users")
# 获取所有结果
results = cursor.fetchall()
print("查询结果:")
for row in results:
print(f"ID: {row[0]}, 姓名: {row[1]}, 邮箱: {row[2]}, 年龄: {row[3]}, 创建时间: {row[4]}")
# 获取单个结果
cursor.execute("SELECT * FROM users WHERE id = %s", (1,))
single_result = cursor.fetchone()
if single_result:
print(f"\n单个查询结果: ID: {single_result[0]}, 姓名: {single_result[1]}")
# 获取部分结果
cursor.execute("SELECT * FROM users")
partial_results = cursor.fetchmany(2)
print("\n部分查询结果:")
for row in partial_results:
print(f"ID: {row[0]}, 姓名: {row[1]}")
except mysql.connector.Error as e:
print(f"执行查询失败: {e}")
finally:
# 关闭游标和连接
cursor.close()
conn.close()
print("数据库连接已关闭")执行 SQL 插入
python
import mysql.connector
# 连接数据库
conn = mysql.connector.connect(
host="localhost",
user="root",
password="password",
database="test_db"
)
# 创建游标对象
cursor = conn.cursor()
try:
# 插入单条数据
sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
val = ("David", "david@example.com", 28)
cursor.execute(sql, val)
# 提交事务
conn.commit()
print(f"插入成功,影响行数: {cursor.rowcount}")
print(f"插入的记录 ID: {cursor.lastrowid}")
# 插入多条数据
sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
vals = [
("Eve", "eve@example.com", 22),
("Frank", "frank@example.com", 33),
("Grace", "grace@example.com", 27)
]
cursor.executemany(sql, vals)
# 提交事务
conn.commit()
print(f"批量插入成功,影响行数: {cursor.rowcount}")
# 验证插入结果
cursor.execute("SELECT * FROM users ORDER BY id DESC LIMIT 4")
results = cursor.fetchall()
print("\n最新插入的记录:")
for row in results:
print(f"ID: {row[0]}, 姓名: {row[1]}, 邮箱: {row[2]}, 年龄: {row[3]}")
except mysql.connector.Error as e:
# 回滚事务
conn.rollback()
print(f"插入失败: {e}")
finally:
# 关闭游标和连接
cursor.close()
conn.close()
print("数据库连接已关闭")执行 SQL 更新
python
import mysql.connector
# 连接数据库
conn = mysql.connector.connect(
host="localhost",
user="root",
password="password",
database="test_db"
)
# 创建游标对象
cursor = conn.cursor()
try:
# 更新数据
sql = "UPDATE users SET age = %s WHERE name = %s"
val = (31, "Alice")
cursor.execute(sql, val)
# 提交事务
conn.commit()
print(f"更新成功,影响行数: {cursor.rowcount}")
# 验证更新结果
cursor.execute("SELECT * FROM users WHERE name = %s", ("Alice",))
result = cursor.fetchone()
if result:
print(f"更新后的数据: ID: {result[0]}, 姓名: {result[1]}, 年龄: {result[3]}")
# 批量更新
sql = "UPDATE users SET age = age + 1 WHERE id > %s"
val = (3,)
cursor.execute(sql, val)
# 提交事务
conn.commit()
print(f"批量更新成功,影响行数: {cursor.rowcount}")
# 验证批量更新结果
cursor.execute("SELECT * FROM users WHERE id > %s", (3,))
results = cursor.fetchall()
print("\n批量更新后的数据:")
for row in results:
print(f"ID: {row[0]}, 姓名: {row[1]}, 年龄: {row[3]}")
except mysql.connector.Error as e:
# 回滚事务
conn.rollback()
print(f"更新失败: {e}")
finally:
# 关闭游标和连接
cursor.close()
conn.close()
print("数据库连接已关闭")执行 SQL 删除
python
import mysql.connector
# 连接数据库
conn = mysql.connector.connect(
host="localhost",
user="root",
password="password",
database="test_db"
)
# 创建游标对象
cursor = conn.cursor()
try:
# 删除数据
sql = "DELETE FROM users WHERE id = %s"
val = (6,)
cursor.execute(sql, val)
# 提交事务
conn.commit()
print(f"删除成功,影响行数: {cursor.rowcount}")
# 批量删除
sql = "DELETE FROM users WHERE id > %s"
val = (4,)
cursor.execute(sql, val)
# 提交事务
conn.commit()
print(f"批量删除成功,影响行数: {cursor.rowcount}")
# 验证删除结果
cursor.execute("SELECT * FROM users")
results = cursor.fetchall()
print("\n删除后的数据:")
for row in results:
print(f"ID: {row[0]}, 姓名: {row[1]}, 邮箱: {row[2]}, 年龄: {row[3]}")
except mysql.connector.Error as e:
# 回滚事务
conn.rollback()
print(f"删除失败: {e}")
finally:
# 关闭游标和连接
cursor.close()
conn.close()
print("数据库连接已关闭")使用 PyMySQL
PyMySQL 是另一个流行的 MySQL 驱动,使用方法与 mysql-connector-python 类似。
连接数据库
python
import pymysql
# 连接数据库
try:
conn = pymysql.connect(
host="localhost", # 数据库主机地址
user="root", # 数据库用户名
password="password", # 数据库密码
database="test_db", # 数据库名称
charset="utf8mb4", # 字符集
cursorclass=pymysql.cursors.DictCursor # 使用字典游标
)
if conn:
print("成功连接到数据库")
# 获取数据库服务器信息
print(f"数据库服务器版本: {conn.server_version}")
# 获取当前数据库名称
cursor = conn.cursor()
cursor.execute("SELECT DATABASE()")
db_name = cursor.fetchone()['DATABASE()']
print(f"当前数据库: {db_name}")
cursor.close()
except pymysql.Error as e:
print(f"连接数据库失败: {e}")
finally:
if 'conn' in locals() and conn:
conn.close()
print("数据库连接已关闭")执行 SQL 查询
python
import pymysql
# 连接数据库
conn = pymysql.connect(
host="localhost",
user="root",
password="password",
database="test_db",
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor
)
# 创建游标对象
cursor = conn.cursor()
try:
# 执行查询
cursor.execute("SELECT * FROM users")
# 获取所有结果
results = cursor.fetchall()
print("查询结果:")
for row in results:
print(f"ID: {row['id']}, 姓名: {row['name']}, 邮箱: {row['email']}, 年龄: {row['age']}, 创建时间: {row['created_at']}")
# 获取单个结果
cursor.execute("SELECT * FROM users WHERE id = %s", (1,))
single_result = cursor.fetchone()
if single_result:
print(f"\n单个查询结果: ID: {single_result['id']}, 姓名: {single_result['name']}")
# 获取部分结果
cursor.execute("SELECT * FROM users")
partial_results = cursor.fetchmany(2)
print("\n部分查询结果:")
for row in partial_results:
print(f"ID: {row['id']}, 姓名: {row['name']}")
except pymysql.Error as e:
print(f"执行查询失败: {e}")
finally:
# 关闭游标和连接
cursor.close()
conn.close()
print("数据库连接已关闭")执行 SQL 插入、更新和删除
PyMySQL 的插入、更新和删除操作与 mysql-connector-python 类似,只是导入的模块不同。
使用 SQLAlchemy ORM
SQLAlchemy 是 Python 中流行的 ORM(对象关系映射)库,它提供了一种更高级的方式来操作数据库,将数据库表映射为 Python 类。
安装 SQLAlchemy
bash
pip install SQLAlchemy
pip install pymysql # SQLAlchemy 需要底层驱动基本使用
python
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
# 创建基类
Base = declarative_base()
# 定义用户模型
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False)
email = Column(String(100), nullable=False, unique=True)
age = Column(Integer)
created_at = Column(DateTime, default=datetime.now)
def __repr__(self):
return f"<User(id={self.id}, name='{self.name}', email='{self.email}', age={self.age})>"
# 创建数据库引擎
engine = create_engine('mysql+pymysql://root:password@localhost/test_db?charset=utf8mb4')
# 创建表(如果不存在)
Base.metadata.create_all(engine)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
try:
# 查询所有用户
users = session.query(User).all()
print("所有用户:")
for user in users:
print(user)
# 根据条件查询
user = session.query(User).filter_by(id=1).first()
print(f"\nID 为 1 的用户: {user}")
# 添加新用户
new_user = User(name="David", email="david@example.com", age=28)
session.add(new_user)
session.commit()
print(f"\n添加新用户: {new_user}")
# 更新用户
user = session.query(User).filter_by(name="Alice").first()
if user:
user.age = 31
session.commit()
print(f"\n更新后的用户: {user}")
# 删除用户
user = session.query(User).filter_by(name="David").first()
if user:
session.delete(user)
session.commit()
print(f"\n删除用户: {user}")
# 再次查询所有用户
users = session.query(User).all()
print("\n所有用户:")
for user in users:
print(user)
except Exception as e:
session.rollback()
print(f"操作失败: {e}")
finally:
# 关闭会话
session.close()
print("会话已关闭")数据库连接池
在生产环境中,通常使用数据库连接池来管理数据库连接,提高性能和可靠性。
使用 PyMySQL 连接池
python
import pymysql
from pymysql import cursors
from DBUtils.PooledDB import PooledDB
# 创建连接池
pool = PooledDB(
creator=pymysql, # 使用 pymysql 作为连接器
maxconnections=10, # 连接池最大连接数
mincached=2, # 初始化时创建的空闲连接数
maxcached=5, # 连接池中最大空闲连接数
maxshared=3, # 共享连接的最大数量
blocking=True, # 连接池满时是否阻塞等待
maxusage=None, # 一个连接最多被重复使用的次数
setsession=[], # 开始会话前执行的命令列表
ping=0, # ping 服务器的频率,0 表示不 ping
host='localhost', # 数据库主机地址
user='root', # 数据库用户名
password='password', # 数据库密码
database='test_db', # 数据库名称
charset='utf8mb4', # 字符集
cursorclass=cursors.DictCursor # 使用字典游标
)
# 从连接池获取连接
def get_connection():
return pool.connection()
# 使用连接池
def test_connection_pool():
# 获取连接
conn = get_connection()
cursor = conn.cursor()
try:
# 执行查询
cursor.execute("SELECT * FROM users")
results = cursor.fetchall()
print("查询结果:")
for row in results:
print(f"ID: {row['id']}, 姓名: {row['name']}, 邮箱: {row['email']}")
except pymysql.Error as e:
print(f"执行查询失败: {e}")
finally:
# 关闭游标和连接(连接会被归还到连接池)
cursor.close()
conn.close()
print("连接已归还到连接池")
# 测试连接池
if __name__ == "__main__":
# 多次执行查询,测试连接池
for i in range(3):
print(f"\n测试 {i+1}:")
test_connection_pool()使用 SQLAlchemy 连接池
SQLAlchemy 内置了连接池功能,默认情况下会自动使用连接池。
python
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 创建数据库引擎,配置连接池
engine = create_engine(
'mysql+pymysql://root:password@localhost/test_db?charset=utf8mb4',
pool_size=10, # 连接池大小
max_overflow=20, # 连接池最大溢出连接数
pool_timeout=30, # 连接超时时间
pool_recycle=1800, # 连接回收时间
echo=False # 是否打印 SQL 语句
)
# 创建会话
Session = sessionmaker(bind=engine)
# 使用会话
def test_sqlalchemy_pool():
session = Session()
try:
# 执行查询
result = session.execute("SELECT * FROM users")
users = result.fetchall()
print("查询结果:")
for user in users:
print(f"ID: {user[0]}, 姓名: {user[1]}, 邮箱: {user[2]}")
except Exception as e:
print(f"操作失败: {e}")
finally:
# 关闭会话
session.close()
print("会话已关闭")
# 测试连接池
if __name__ == "__main__":
# 多次执行查询,测试连接池
for i in range(3):
print(f"\n测试 {i+1}:")
test_sqlalchemy_pool()实际应用示例
示例 1:用户管理系统
python
import pymysql
from pymysql import cursors
class UserManager:
"""用户管理类"""
def __init__(self, host, user, password, database):
"""初始化数据库连接"""
self.host = host
self.user = user
self.password = password
self.database = database
self.conn = None
self.cursor = None
def connect(self):
"""连接数据库"""
try:
self.conn = pymysql.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database,
charset="utf8mb4",
cursorclass=cursors.DictCursor
)
self.cursor = self.conn.cursor()
print("成功连接到数据库")
return True
except pymysql.Error as e:
print(f"连接数据库失败: {e}")
return False
def disconnect(self):
"""断开数据库连接"""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
print("数据库连接已关闭")
def get_all_users(self):
"""获取所有用户"""
try:
self.cursor.execute("SELECT * FROM users")
return self.cursor.fetchall()
except pymysql.Error as e:
print(f"查询失败: {e}")
return []
def get_user_by_id(self, user_id):
"""根据 ID 获取用户"""
try:
self.cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
return self.cursor.fetchone()
except pymysql.Error as e:
print(f"查询失败: {e}")
return None
def get_user_by_email(self, email):
"""根据邮箱获取用户"""
try:
self.cursor.execute("SELECT * FROM users WHERE email = %s", (email,))
return self.cursor.fetchone()
except pymysql.Error as e:
print(f"查询失败: {e}")
return None
def add_user(self, name, email, age):
"""添加用户"""
try:
sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
self.cursor.execute(sql, (name, email, age))
self.conn.commit()
return True, "添加成功"
except pymysql.Error as e:
self.conn.rollback()
return False, f"添加失败: {e}"
def update_user(self, user_id, name=None, email=None, age=None):
"""更新用户信息"""
try:
# 构建更新语句
update_fields = []
update_values = []
if name:
update_fields.append("name = %s")
update_values.append(name)
if email:
update_fields.append("email = %s")
update_values.append(email)
if age is not None:
update_fields.append("age = %s")
update_values.append(age)
if not update_fields:
return False, "没有需要更新的字段"
sql = f"UPDATE users SET {', '.join(update_fields)} WHERE id = %s"
update_values.append(user_id)
self.cursor.execute(sql, update_values)
self.conn.commit()
if self.cursor.rowcount == 0:
return False, "用户不存在"
return True, "更新成功"
except pymysql.Error as e:
self.conn.rollback()
return False, f"更新失败: {e}"
def delete_user(self, user_id):
"""删除用户"""
try:
sql = "DELETE FROM users WHERE id = %s"
self.cursor.execute(sql, (user_id,))
self.conn.commit()
if self.cursor.rowcount == 0:
return False, "用户不存在"
return True, "删除成功"
except pymysql.Error as e:
self.conn.rollback()
return False, f"删除失败: {e}"
# 测试用户管理系统
if __name__ == "__main__":
# 创建用户管理器
user_manager = UserManager(
host="localhost",
user="root",
password="password",
database="test_db"
)
# 连接数据库
if user_manager.connect():
# 获取所有用户
print("所有用户:")
users = user_manager.get_all_users()
for user in users:
print(f"ID: {user['id']}, 姓名: {user['name']}, 邮箱: {user['email']}, 年龄: {user['age']}")
# 添加用户
print("\n添加用户:")
success, message = user_manager.add_user("David", "david@example.com", 28)
print(f"添加结果: {success}, {message}")
# 更新用户
print("\n更新用户:")
success, message = user_manager.update_user(1, age=31)
print(f"更新结果: {success}, {message}")
# 删除用户
print("\n删除用户:")
success, message = user_manager.delete_user(5)
print(f"删除结果: {success}, {message}")
# 再次获取所有用户
print("\n所有用户:")
users = user_manager.get_all_users()
for user in users:
print(f"ID: {user['id']}, 姓名: {user['name']}, 邮箱: {user['email']}, 年龄: {user['age']}")
# 断开连接
user_manager.disconnect()示例 2:博客系统数据库操作
python
import pymysql
from pymysql import cursors
from datetime import datetime
class BlogManager:
"""博客管理类"""
def __init__(self, host, user, password, database):
"""初始化数据库连接"""
self.host = host
self.user = user
self.password = password
self.database = database
self.conn = None
self.cursor = None
def connect(self):
"""连接数据库"""
try:
self.conn = pymysql.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database,
charset="utf8mb4",
cursorclass=cursors.DictCursor
)
self.cursor = self.conn.cursor()
# 创建博客表(如果不存在)
self._create_tables()
print("成功连接到数据库")
return True
except pymysql.Error as e:
print(f"连接数据库失败: {e}")
return False
def _create_tables(self):
"""创建表"""
try:
# 创建博客表
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS blogs (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(200) NOT NULL,
content TEXT NOT NULL,
author_id INT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (author_id) REFERENCES users(id)
)
""")
self.conn.commit()
except pymysql.Error as e:
self.conn.rollback()
print(f"创建表失败: {e}")
def disconnect(self):
"""断开数据库连接"""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
print("数据库连接已关闭")
def create_blog(self, title, content, author_id):
"""创建博客"""
try:
sql = "INSERT INTO blogs (title, content, author_id) VALUES (%s, %s, %s)"
self.cursor.execute(sql, (title, content, author_id))
self.conn.commit()
return True, "创建成功"
except pymysql.Error as e:
self.conn.rollback()
return False, f"创建失败: {e}"
def get_blog(self, blog_id):
"""获取博客"""
try:
sql = """
SELECT b.*, u.name as author_name
FROM blogs b
LEFT JOIN users u ON b.author_id = u.id
WHERE b.id = %s
"""
self.cursor.execute(sql, (blog_id,))
return self.cursor.fetchone()
except pymysql.Error as e:
print(f"查询失败: {e}")
return None
def get_all_blogs(self):
"""获取所有博客"""
try:
sql = """
SELECT b.*, u.name as author_name
FROM blogs b
LEFT JOIN users u ON b.author_id = u.id
ORDER BY b.created_at DESC
"""
self.cursor.execute(sql)
return self.cursor.fetchall()
except pymysql.Error as e:
print(f"查询失败: {e}")
return []
def get_blogs_by_author(self, author_id):
"""根据作者获取博客"""
try:
sql = """
SELECT b.*, u.name as author_name
FROM blogs b
LEFT JOIN users u ON b.author_id = u.id
WHERE b.author_id = %s
ORDER BY b.created_at DESC
"""
self.cursor.execute(sql, (author_id,))
return self.cursor.fetchall()
except pymysql.Error as e:
print(f"查询失败: {e}")
return []
def update_blog(self, blog_id, title=None, content=None):
"""更新博客"""
try:
# 构建更新语句
update_fields = []
update_values = []
if title:
update_fields.append("title = %s")
update_values.append(title)
if content:
update_fields.append("content = %s")
update_values.append(content)
if not update_fields:
return False, "没有需要更新的字段"
sql = f"UPDATE blogs SET {', '.join(update_fields)} WHERE id = %s"
update_values.append(blog_id)
self.cursor.execute(sql, update_values)
self.conn.commit()
if self.cursor.rowcount == 0:
return False, "博客不存在"
return True, "更新成功"
except pymysql.Error as e:
self.conn.rollback()
return False, f"更新失败: {e}"
def delete_blog(self, blog_id):
"""删除博客"""
try:
sql = "DELETE FROM blogs WHERE id = %s"
self.cursor.execute(sql, (blog_id,))
self.conn.commit()
if self.cursor.rowcount == 0:
return False, "博客不存在"
return True, "删除成功"
except pymysql.Error as e:
self.conn.rollback()
return False, f"删除失败: {e}"
# 测试博客管理系统
if __name__ == "__main__":
# 创建博客管理器
blog_manager = BlogManager(
host="localhost",
user="root",
password="password",
database="test_db"
)
# 连接数据库
if blog_manager.connect():
# 创建博客
print("创建博客:")
success, message = blog_manager.create_blog(
title="Python 入门指南",
content="这是一篇关于 Python 入门的博客文章...",
author_id=1
)
print(f"创建结果: {success}, {message}")
# 获取所有博客
print("\n所有博客:")
blogs = blog_manager.get_all_blogs()
for blog in blogs:
print(f"ID: {blog['id']}, 标题: {blog['title']}, 作者: {blog['author_name']}, 创建时间: {blog['created_at']}")
# 获取单个博客
if blogs:
blog_id = blogs[0]['id']
print(f"\n博客详情 (ID: {blog_id}):")
blog = blog_manager.get_blog(blog_id)
if blog:
print(f"标题: {blog['title']}")
print(f"作者: {blog['author_name']}")
print(f"内容: {blog['content']}")
print(f"创建时间: {blog['created_at']}")
# 更新博客
if blogs:
blog_id = blogs[0]['id']
print(f"\n更新博客 (ID: {blog_id}):")
success, message = blog_manager.update_blog(
blog_id=blog_id,
title="Python 入门指南(更新版)",
content="这是一篇关于 Python 入门的博客文章,已经更新..."
)
print(f"更新结果: {success}, {message}")
# 删除博客
if blogs:
blog_id = blogs[0]['id']
print(f"\n删除博客 (ID: {blog_id}):")
success, message = blog_manager.delete_blog(blog_id)
print(f"删除结果: {success}, {message}")
# 再次获取所有博客
print("\n所有博客:")
blogs = blog_manager.get_all_blogs()
for blog in blogs:
print(f"ID: {blog['id']}, 标题: {blog['title']}, 作者: {blog['author_name']}, 创建时间: {blog['created_at']}")
# 断开连接
blog_manager.disconnect()数据库操作的最佳实践
1. 使用参数化查询
避免 SQL 注入攻击:使用参数化查询,不要直接拼接 SQL 语句。
python
# 不安全的做法
user_input = "1' OR '1'='1"
sql = f"SELECT * FROM users WHERE id = {user_input}" # 危险!
# 安全的做法
sql = "SELECT * FROM users WHERE id = %s"
cursor.execute(sql, (user_input,)) # 安全2. 正确处理事务
使用 try-except-rollback:在执行修改操作时,使用事务确保数据一致性。
python
try:
# 执行修改操作
cursor.execute(sql, values)
# 提交事务
conn.commit()
except Exception as e:
# 回滚事务
conn.rollback()
print(f"操作失败: {e}")3. 使用数据库连接池
提高性能:在生产环境中使用数据库连接池管理连接。
4. 合理使用索引
优化查询性能:为常用的查询字段创建索引。
5. 避免使用 SELECT *
减少网络传输:只查询需要的字段。
python
# 不好的做法
cursor.execute("SELECT * FROM users")
# 好的做法
cursor.execute("SELECT id, name, email FROM users")6. 处理 NULL 值
正确处理 NULL:在代码中处理可能的 NULL 值。
7. 关闭数据库连接
释放资源:使用完数据库连接后及时关闭。
8. 记录错误日志
便于排查问题:记录数据库操作的错误日志。
总结
本章节介绍了 Python 中操作 MySQL 数据库的多种方法,包括:
- 使用 mysql-connector-python:MySQL 官方提供的驱动
- 使用 PyMySQL:纯 Python 实现的 MySQL 驱动
- 使用 SQLAlchemy ORM:高级对象关系映射库
- 数据库连接池:提高性能和可靠性
- 实际应用示例:用户管理系统和博客系统
- 最佳实践:安全、性能和可靠性方面的建议
掌握 Python 中的 MySQL 数据库操作,对于开发需要持久化存储的应用程序非常重要。通过合理选择和使用数据库操作方法,可以提高应用程序的性能、可靠性和安全性。