psycopg2.ProgrammingError: execute cannot be used while an asynchronous query is underway
从日志中可以分析导致异常的原因,主要集中在以下几个问题点:
异常概述
psycopg2.ProgrammingError: execute cannot be used while an asynchronous query is underway
是核心异常。这表明:一个异步数据库查询正在运行,而另一个查询尝试在同一连接或事务中被执行,从而导致冲突。这种情况通常发生在使用 SQLAlchemy 和 psycopg2 驱动的异步操作场景下。
根因分析
从栈追踪和代码结构分析,可以识别几个潜在问题:
- 异步任务冲突
- 异常提示是
execute cannot be used while an asynchronous query is underway
。这通常是因为多个任务或线程共享同一个数据库连接,而其中有异步查询未完成时,另一个查询试图使用这个连接。 - 日志中的线程,特别是
[Thread-892]
和[Thread-890]
看起来彼此是并发运行的,它们之间可能共享了同一个数据库会话(Session)或连接池。 - 在异步环境中,每个任务都需要独立的数据库连接,而不是跨任务共享。
- SQLAlchemy 和 psycopg2 的使用方式
- 日志显示使用的是 SQLAlchemy ORM 和 Psycopg2:
File "/app/api/.venv/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1963, in _exec_single_context
- Psycopg2 支持两种模式:同步和异步模式。如果异步模式没有正确配置,例如:同时执行
async
和普通execute
查询,会导致此类冲突。
- 连接池的配置问题
- 日志中显示异常是在执行一个 SQL 查询:
SQL: SELECT end_users.id AS end_users_id ...
- SQLAlchemy 在默认情况下通过连接池维护数据库连接。如果连接池存在不足(例如池大小过小),导致多个任务或线程竞争同一连接,可能发生类似的问题。
- 同时,如果连接池未处理好异步上下文的分配,比如未启用适配
asyncpg
,也可能导致连接竞争。
- 当前代码生命周期问题
- 异常涉及访问用户对象
current_user
的tenant_id
属性:
tenant_id = extract_tenant_id(current_user) return user.tenant_id
- 很可能在获取
tenant_id
属性时,触发 ORM 查询加载属性,该查询被发往数据库时又出现了连接冲突。 - 如果
current_user
在不同线程或异步任务中被共享,而未正确绑定独立事务或连接,会导致此类问题。
解决方案
短期解决
- 检查连接池的配置
- 增大 SQLAlchemy 的连接池大小。
- 确保
Session
在任务间隔离:
from sqlalchemy.orm import sessionmaker SessionLocal = sessionmaker(bind=engine)
每个线程或异步任务应确保使用独立的会话实例。
- 改用异步驱动
- 如果你的应用需要异步查询,使用
asyncpg
驱动替换psycopg2
。 - 启用 SQLAlchemy 的异步支持:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/dbname") async_session = sessionmaker(engine, class_=AsyncSession)
- 上下文管理
- 使用
contextvars
或类似机制,在异步任务中确保独立的用户上下文:
from contextvars import ContextVar current_user = ContextVar("current_user")
长期优化
- 任务隔离
检查对共享资源的访问规则,例如:
- 不同异步任务应独立使用数据库连接。
- 为异步任务显式创建独立的事务。
- 代码审查
- 在栈追踪暴露的路径中,检查
Session
和engine
的生命周期管理。 - 确保数据库对象(如
tenant_id
属性的加载)在正确的上下文中加载。
- 连接模式优化
考虑将所有查询转换为异步模式,特别是长时间查询场景:
async with async_session() as session: async with session.begin(): result = await session.execute(statement)
- 监控连接池状态
- 使用 SQLAlchemy 的调试工具检查连接池状态。
- 在高并发场景下增强池性能,例如适当调整
max_overflow
和pool_size
。
总结
异常的关键在于多任务间对数据库连接的竞争。一些任务可能执行了长时间挂起的异步查询,而其他任务尝试使用同一连接导致冲突。通过隔离任务间的数据库上下文、优化连接池配置或改为纯异步操作,可以有效解决这些问题。
关注公众号:程序新视界,一个让你软实力、硬技术同步提升的平台
除非注明,否则均为程序新视界原创文章,转载必须以链接形式标明本文链接