Skip to content

Python 连接与配置

安装 PyMilvus

基础安装

bash
# 安装最新版本
pip install pymilvus

# 安装指定版本
pip install pymilvus==2.4.0

# 升级现有安装
pip install --upgrade pymilvus

安装模型库(可选)

如果需要使用内置的嵌入模型:

bash
pip install pymilvus[model]

验证安装

python
import pymilvus

print(f"PyMilvus 版本: {pymilvus.__version__}")

# 测试导入核心模块
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection
print("所有模块导入成功!")

建立连接

基本连接

python
from pymilvus import connections

# 连接到本地 Milvus 服务
connections.connect(
    alias="default",           # 连接别名
    host="localhost",          # Milvus 服务器地址
    port="19530"               # Milvus 服务端口
)

print("连接成功!")

带认证的连接

如果 Milvus 启用了认证:

python
from pymilvus import connections

connections.connect(
    alias="default",
    host="localhost",
    port="19530",
    user="username",           # 用户名
    password="password",       # 密码
    secure=False               # 是否使用 TLS
)

连接到 Zilliz Cloud

python
from pymilvus import connections

connections.connect(
    alias="default",
    uri="https://your-cluster.zillizcloud.com",  # 集群地址
    token="your-api-key"                          # API 密钥
)

连接池配置

python
from pymilvus import connections

connections.connect(
    alias="default",
    host="localhost",
    port="19530",
    # 连接池配置
    pool_size=10,              # 连接池大小
    wait_timeout=5,            # 等待超时时间(秒)
    try_connect=True           # 尝试连接
)

连接管理

断开连接

python
from pymilvus import connections

# 断开指定连接
connections.disconnect("default")

# 断开所有连接
connections.disconnect()

检查连接状态

python
from pymilvus import connections

# 检查连接是否存在
if connections.has_connection("default"):
    print("连接存在")
else:
    print("连接不存在")

# 获取所有连接别名
all_aliases = connections.list_connections()
print(f"所有连接: {all_aliases}")

配置参数详解

连接参数

参数类型必填默认值说明
aliasstr-连接别名,用于区分多个连接
hoststr-Milvus 服务器地址
portstr/int-Milvus 服务端口
userstr-用户名(启用认证时)
passwordstr-密码(启用认证时)
tokenstr-API 令牌(Zilliz Cloud)
uristr-完整连接 URI
secureboolFalse是否使用 TLS 加密
pool_sizeint10连接池大小
wait_timeoutint5等待超时时间(秒)

客户端配置

python
from pymilvus import connections, Config

# 设置全局配置
Config.set_log_level("DEBUG")  # 设置日志级别: DEBUG, INFO, WARNING, ERROR

连接最佳实践

使用上下文管理器

python
from pymilvus import connections, Collection

class MilvusConnection:
    def __init__(self, alias="default", host="localhost", port="19530"):
        self.alias = alias
        self.host = host
        self.port = port
    
    def __enter__(self):
        connections.connect(
            alias=self.alias,
            host=self.host,
            port=self.port
        )
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        connections.disconnect(self.alias)

# 使用示例
with MilvusConnection() as conn:
    collection = Collection("my_collection")
    # 执行操作...

单例模式连接管理

python
from pymilvus import connections
import threading

class MilvusClient:
    _instance = None
    _lock = threading.Lock()
    
    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            with cls._lock:
                if not cls._instance:
                    cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self, host="localhost", port="19530"):
        if not hasattr(self, 'initialized'):
            self.host = host
            self.port = port
            self.alias = "default"
            self.initialized = True
    
    def connect(self):
        if not connections.has_connection(self.alias):
            connections.connect(
                alias=self.alias,
                host=self.host,
                port=self.port
            )
        return self
    
    def disconnect(self):
        if connections.has_connection(self.alias):
            connections.disconnect(self.alias)

# 使用示例
client = MilvusClient().connect()

健康检查

检查服务器状态

python
from pymilvus import utility

# 获取服务器版本
version = utility.get_server_version()
print(f"Milvus 版本: {version}")

# 检查服务器是否健康
is_healthy = utility.health()
print(f"服务器健康状态: {is_healthy}")

完整连接测试示例

python
from pymilvus import connections, utility

def test_connection(host="localhost", port="19530"):
    try:
        # 建立连接
        connections.connect(
            alias="test",
            host=host,
            port=port
        )
        
        # 检查版本
        version = utility.get_server_version()
        print(f"✓ 成功连接到 Milvus {version}")
        
        # 检查健康状态
        if utility.health():
            print("✓ 服务器健康状态良好")
        
        # 获取集合列表
        collections = utility.list_collections()
        print(f"✓ 现有集合数量: {len(collections)}")
        
        return True
        
    except Exception as e:
        print(f"✗ 连接失败: {e}")
        return False
    finally:
        # 断开连接
        if connections.has_connection("test"):
            connections.disconnect("test")

# 运行测试
if __name__ == "__main__":
    test_connection()

常见问题

连接超时

python
from pymilvus import connections

# 增加超时时间
connections.connect(
    alias="default",
    host="localhost",
    port="19530",
    wait_timeout=30  # 增加到 30 秒
)

连接被拒绝

检查以下几点:

  1. Milvus 服务是否已启动
  2. 防火墙是否放行了 19530 端口
  3. 主机地址和端口是否正确
bash
# 检查 Milvus 服务状态
docker ps | grep milvus

# 测试端口连通性
telnet localhost 19530

认证失败

python
from pymilvus import connections, MilvusException

try:
    connections.connect(
        alias="default",
        host="localhost",
        port="19530",
        user="root",
        password="Milvus"  # 默认密码
    )
except MilvusException as e:
    print(f"认证失败: {e}")

下一步

完成连接配置后,你可以:

  1. 创建集合并定义 Schema
  2. 插入向量数据
  3. 执行向量搜索