在进程1中:
from sqlalchemy.dialects import postgresql
# 序列化查询语句
query_sql = str(query.statement.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}))
在进程2中:
from sqlalchemy import text, select
# 反序列化查询语句并执行
# 使用 `from_statement` 方法以获取完整的ORM对象,而非仅仅通过 `session.execute(text(query_sql))` 得到Row元组
objects = session.execute(
select(MyTable).execution_options(yield_per=100).from_statement(text(query_sql)),
execution_options={'yield_per': 100},
).scalars()
# 对大型查询结果使用服务器端游标逐批读取
for obj in objects.yield_per(100):
...
上述解决方案并不完美,因为objects
是sqlalchemy.engine.result.ScalarResult
类型,它不像原始的query
那样具有.count()
方法,也没有rowcount
属性。如何高效地(通过服务器端查询)获取结果计数?
一种选择是在进程1中获取结果计数,并将其作为int
类型与query_sql
字符串一起传递给进程2。但这会产生竞态条件(计数与查询结果不同步),因此我更倾向于在同一会话中同时获取两者。