-
Notifications
You must be signed in to change notification settings - Fork 10
Kaapi: V1.0: Database optimization #817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d0de1ce
367f258
6523910
f8c7ee7
47329e3
6f98242
bb2a56a
3bf260b
7a33dc5
fbc3452
67e2e14
9b9e636
914ff2c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,224 @@ | ||
| """v1.0 query optimization: project_id + composite indexes, drop is_deleted | ||
|
|
||
| Revision ID: 058 | ||
| Revises: 057 | ||
| Create Date: 2026-05-05 12:00:00.000000 | ||
|
|
||
| Bundles three coordinated changes for v1.0 lock: | ||
|
|
||
| 1. Single-column `project_id` btree indexes on every table-mapped model | ||
| that filters by project_id (the dominant tenant filter). | ||
| organization_id-only access is rare and intentionally deferred. | ||
| Tables already covered by a leading-column index are skipped: | ||
| - openai_assistant: UNIQUE(project_id, assistant_id) leads with project_id | ||
| - batch_job: ix_batch_job_project_id (migration 036) | ||
|
|
||
| 2. Composite + partial indexes for hot list/pagination paths matching: | ||
| WHERE project_id = ? [AND deleted_at IS NULL] ORDER BY <ts> DESC | ||
| Plus a small partial index `ix_evaluation_run_processing` for the | ||
| cron polling queries that filter by (type, status='processing') | ||
| without an organization_id predicate. | ||
|
|
||
| 3. Drop the redundant `is_deleted` boolean from every table that also | ||
| carries `deleted_at`. `deleted_at IS NULL` becomes the single source | ||
| of truth for soft-delete: same query cost when paired with a partial | ||
| index, preserves audit timestamp, no dual-write drift. | ||
| Affected tables: openai_assistant, apikey, document, | ||
| openai_conversation, fine_tuning, model_evaluation. | ||
|
|
||
| Execution model: | ||
| Phase A (transactional): backfill deleted_at where is_deleted was true | ||
| but deleted_at was never set, then drop the is_deleted columns. | ||
| Phase B (autocommit_block): CREATE INDEX CONCURRENTLY for every index | ||
| so no AccessExclusiveLock is taken on hot tables. | ||
| """ | ||
|
|
||
| import sqlalchemy as sa | ||
| from alembic import op | ||
|
|
||
|
|
||
| revision = "058" | ||
| down_revision = "057" | ||
| branch_labels = None | ||
| depends_on = None | ||
|
|
||
|
|
||
| # Tables that currently carry both `is_deleted` and `deleted_at`. | ||
| IS_DELETED_TABLES = [ | ||
| "openai_assistant", | ||
| "apikey", | ||
| "document", | ||
| "openai_conversation", | ||
| "fine_tuning", | ||
| "model_evaluation", | ||
| ] | ||
|
|
||
|
|
||
| # Single-column FK / multi-tenant filter indexes (P0). | ||
| # (table_name, column_name, index_name) | ||
| FK_INDEXES: list[tuple[str, str, str]] = [ | ||
| # project_id across tables that filter by tenant | ||
| ("apikey", "project_id", "ix_apikey_project_id"), | ||
| ("credential", "project_id", "ix_credential_project_id"), | ||
| ("collection", "project_id", "ix_collection_project_id"), | ||
| ("collection_jobs", "project_id", "ix_collection_jobs_project_id"), | ||
| ("document", "project_id", "ix_document_project_id"), | ||
| ("evaluation_dataset", "project_id", "ix_evaluation_dataset_project_id"), | ||
| ("evaluation_run", "project_id", "ix_evaluation_run_project_id"), | ||
| ("file", "project_id", "ix_file_project_id"), | ||
| ("fine_tuning", "project_id", "ix_fine_tuning_project_id"), | ||
| ("job", "project_id", "ix_job_project_id"), | ||
| ("llm_call", "project_id", "ix_llm_call_project_id"), | ||
| ("llm_chain", "project_id", "ix_llm_chain_project_id"), | ||
| ("model_evaluation", "project_id", "ix_model_evaluation_project_id"), | ||
| ("openai_conversation", "project_id", "ix_openai_conversation_project_id"), | ||
| ("stt_result", "project_id", "ix_stt_result_project_id"), | ||
| ("stt_sample", "project_id", "ix_stt_sample_project_id"), | ||
| ("tts_result", "project_id", "ix_tts_result_project_id"), | ||
| ("user_project", "project_id", "ix_user_project_project_id"), | ||
| # Other un-indexed FKs surfaced by the audit | ||
| ("apikey", "user_id", "ix_apikey_user_id"), | ||
| ("collection_jobs", "collection_id", "ix_collection_jobs_collection_id"), | ||
| ( | ||
| "doc_transformation_job", | ||
| "source_document_id", | ||
| "ix_doc_transformation_job_source_document_id", | ||
| ), | ||
| ( | ||
| "doc_transformation_job", | ||
| "transformed_document_id", | ||
| "ix_doc_transformation_job_transformed_document_id", | ||
| ), | ||
| ("evaluation_run", "dataset_id", "ix_evaluation_run_dataset_id"), | ||
| ] | ||
|
|
||
|
|
||
| # Composite + partial indexes (P1). (index_name, body_after_INDEX_NAME, schema) | ||
| # `schema` is the unquoted PG schema for downgrade DROP INDEX, or None for | ||
| # the default (public) schema. The upgrade body already names the schema | ||
| # inline in its ON clause; the field exists so downgrade doesn't have to | ||
| # string-sniff it back out. | ||
| COMPOSITE_INDEXES: list[tuple[str, str, str | None]] = [ | ||
| ( | ||
| "ix_document_project_inserted_at_active", | ||
| 'ON "document" ("project_id", "inserted_at" DESC) WHERE "deleted_at" IS NULL', | ||
| None, | ||
| ), | ||
| ( | ||
| "ix_openai_conversation_project_inserted_at_active", | ||
| 'ON "openai_conversation" ("project_id", "inserted_at" DESC) WHERE "deleted_at" IS NULL', | ||
| None, | ||
| ), | ||
| ( | ||
| "ix_openai_conversation_ancestor_project_inserted_at_active", | ||
| 'ON "openai_conversation" ("ancestor_response_id", "project_id", "inserted_at" DESC) WHERE "deleted_at" IS NULL', | ||
| None, | ||
| ), | ||
| ( | ||
| "ix_openai_conversation_response_project_active", | ||
| 'ON "openai_conversation" ("response_id", "project_id") WHERE "deleted_at" IS NULL', | ||
| None, | ||
| ), | ||
| ( | ||
| "ix_collection_jobs_project_status_inserted_at", | ||
| 'ON "collection_jobs" ("project_id", "status", "inserted_at" DESC)', | ||
| None, | ||
| ), | ||
| ( | ||
| "ix_evaluation_run_org_project_type_inserted_at", | ||
| 'ON "evaluation_run" ("organization_id", "project_id", "type", "inserted_at" DESC)', | ||
| None, | ||
| ), | ||
| # Partial index for cron polling queries that filter by | ||
| # (type, status='processing') without an organization_id predicate | ||
| # (crud/evaluations/cron_utils.py and crud/evaluations/processing.py). | ||
| # The composite above leads with organization_id and does not serve | ||
| # these unscoped scans. | ||
| ( | ||
| "ix_evaluation_run_processing", | ||
| 'ON "evaluation_run" ("type", "batch_job_id") WHERE "status" = \'processing\'', | ||
| None, | ||
| ), | ||
| ( | ||
| "ix_evaluation_dataset_org_project_type_inserted_at", | ||
| 'ON "evaluation_dataset" ("organization_id", "project_id", "type", "inserted_at" DESC)', | ||
| None, | ||
| ), | ||
| ( | ||
| "ix_model_evaluation_document_project_updated_at", | ||
| 'ON "model_evaluation" ("document_id", "project_id", "updated_at" DESC) WHERE "deleted_at" IS NULL', | ||
| None, | ||
| ), | ||
| ( | ||
| "ix_model_config_active_provider_name", | ||
| 'ON "global"."model_config" ("is_active", "provider", "model_name")', | ||
| "global", | ||
| ), | ||
| ( | ||
| "ix_collection_project_active", | ||
| 'ON "collection" ("project_id") WHERE "deleted_at" IS NULL', | ||
| None, | ||
| ), | ||
| # Composite FK indexes that match the actual query shape | ||
| ( | ||
| "ix_fine_tuning_document_project", | ||
| 'ON "fine_tuning" ("document_id", "project_id")', | ||
| None, | ||
| ), | ||
| ( | ||
| "ix_model_evaluation_fine_tuning_project", | ||
| 'ON "model_evaluation" ("fine_tuning_id", "project_id")', | ||
| None, | ||
| ), | ||
| # Partial index for active-key listing on apikey | ||
| ( | ||
| "ix_apikey_project_active", | ||
| 'ON "apikey" ("project_id") WHERE "deleted_at" IS NULL', | ||
| None, | ||
| ), | ||
| ] | ||
|
|
||
|
|
||
| def upgrade(): | ||
| # Phase A (transactional): preserve audit timestamp, drop redundant column. | ||
| for table in IS_DELETED_TABLES: | ||
| op.execute( | ||
| f"UPDATE {table} " | ||
| f"SET deleted_at = NOW() " | ||
| f"WHERE is_deleted = TRUE AND deleted_at IS NULL" | ||
| ) | ||
| op.drop_column(table, "is_deleted") | ||
|
|
||
| # Phase B (autocommit): CONCURRENTLY index creation. Each statement | ||
| # runs in its own implicit transaction, required by the CONCURRENTLY | ||
| # variant. | ||
| with op.get_context().autocommit_block(): | ||
| for table, column, index in FK_INDEXES: | ||
| op.execute( | ||
| f'CREATE INDEX CONCURRENTLY IF NOT EXISTS "{index}" ' | ||
| f'ON "{table}" ("{column}")' | ||
| ) | ||
| for index, body, _schema in COMPOSITE_INDEXES: | ||
| op.execute(f'CREATE INDEX CONCURRENTLY IF NOT EXISTS "{index}" {body}') | ||
|
|
||
|
|
||
| def downgrade(): | ||
| with op.get_context().autocommit_block(): | ||
| for index, _body, schema in COMPOSITE_INDEXES: | ||
| qualified = f'"{schema}"."{index}"' if schema else f'"{index}"' | ||
| op.execute(f"DROP INDEX CONCURRENTLY IF EXISTS {qualified}") | ||
| for _table, _column, index in FK_INDEXES: | ||
| op.execute(f'DROP INDEX CONCURRENTLY IF EXISTS "{index}"') | ||
|
|
||
| for table in IS_DELETED_TABLES: | ||
| op.add_column( | ||
| table, | ||
| sa.Column( | ||
| "is_deleted", | ||
| sa.Boolean(), | ||
| nullable=False, | ||
| server_default=sa.text("false"), | ||
| comment="Soft delete flag", | ||
| ), | ||
| ) | ||
| op.execute(f"UPDATE {table} SET is_deleted = TRUE WHERE deleted_at IS NOT NULL") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| """drop redundant indexes superseded by 058 composites | ||
|
|
||
| Revision ID: 059 | ||
| Revises: 058 | ||
| Create Date: 2026-05-05 14:00:00.000000 | ||
|
|
||
| Drops indexes that are now redundant after migration 058 added the | ||
| real composite/partial indexes that match actual query shapes: | ||
|
|
||
| ix_project_name | ||
| Subsumed by uq_project_name_org_id (name is leading column). | ||
| No code path queries Project.name without organization_id. | ||
|
|
||
| ix_credential_provider | ||
| Subsumed by uq_credential_org_project_provider. All four CRUD | ||
| paths in crud/credentials.py filter (org, project, provider) — never | ||
| provider alone. | ||
|
|
||
| ix_openai_conversation_previous_response_id | ||
| Zero query consumers; previous_response_id is read but never | ||
| filtered on in any WHERE clause. | ||
|
|
||
| ix_openai_conversation_response_id | ||
| Superseded by ix_openai_conversation_response_project_active | ||
| (project-scoped partial), which exactly matches CRUD predicates | ||
| in crud/openai_conversation.py:get_conversation_by_response_id. | ||
|
|
||
| ix_openai_conversation_ancestor_response_id | ||
| Superseded by | ||
| ix_openai_conversation_ancestor_project_inserted_at_active, which | ||
| matches the (ancestor_response_id, project_id) + ORDER BY shape | ||
| used in crud/openai_conversation.py:get_conversation_by_ancestor_id | ||
| and the /responses thread reconstruction path. | ||
|
|
||
| idx_file_type | ||
| Low cardinality (4 enum values) and the only consumer in | ||
| crud/file.py:147 always pairs file_type with (organization_id, | ||
| project_id). idx_file_org_project covers the query; an extra | ||
| in-memory filter on file_type is cheaper than a second index hit. | ||
|
|
||
| idx_eval_run_status_org / idx_eval_run_status_project | ||
| Both lead with low-cardinality status. Real CRUD queries lead with | ||
| (organization_id, project_id, type), now covered by | ||
| ix_evaluation_run_org_project_type_inserted_at. | ||
|
|
||
| Uses DROP INDEX CONCURRENTLY so no AccessExclusiveLock is taken. | ||
| Downgrade recreates the original indexes (also concurrently) so the | ||
| schema can be restored bit-for-bit if needed. | ||
| """ | ||
|
|
||
| from alembic import op | ||
|
|
||
|
|
||
| revision = "059" | ||
| down_revision = "058" | ||
| branch_labels = None | ||
| depends_on = None | ||
|
|
||
|
|
||
| # (index_name, recreate_sql_body) | ||
| # recreate_sql_body is "ON \"<table>\" (<columns>)" used by downgrade only. | ||
| INDEXES_TO_DROP: list[tuple[str, str]] = [ | ||
| ("ix_project_name", 'ON "project" ("name")'), | ||
| ("ix_credential_provider", 'ON "credential" ("provider")'), | ||
| ( | ||
| "ix_openai_conversation_previous_response_id", | ||
| 'ON "openai_conversation" ("previous_response_id")', | ||
| ), | ||
| ( | ||
| "ix_openai_conversation_response_id", | ||
| 'ON "openai_conversation" ("response_id")', | ||
| ), | ||
| ( | ||
| "ix_openai_conversation_ancestor_response_id", | ||
| 'ON "openai_conversation" ("ancestor_response_id")', | ||
| ), | ||
| ("idx_file_type", 'ON "file" ("file_type")'), | ||
| ( | ||
| "idx_eval_run_status_org", | ||
| 'ON "evaluation_run" ("status", "organization_id")', | ||
| ), | ||
| ( | ||
| "idx_eval_run_status_project", | ||
| 'ON "evaluation_run" ("status", "project_id")', | ||
| ), | ||
| ] | ||
|
|
||
|
|
||
| def upgrade(): | ||
| with op.get_context().autocommit_block(): | ||
| for index_name, _body in INDEXES_TO_DROP: | ||
| op.execute(f'DROP INDEX CONCURRENTLY IF EXISTS "{index_name}"') | ||
|
|
||
|
|
||
| def downgrade(): | ||
| with op.get_context().autocommit_block(): | ||
| for index_name, body in INDEXES_TO_DROP: | ||
| op.execute(f'CREATE INDEX CONCURRENTLY IF NOT EXISTS "{index_name}" {body}') | ||
|
Comment on lines
+89
to
+98
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win 🧩 Analysis chain🏁 Script executed: #!/bin/bash
python - <<'PY'
import ast
from pathlib import Path
path = Path("backend/app/alembic/versions/059_drop_redundant_indexes.py")
tree = ast.parse(path.read_text())
for node in tree.body:
if isinstance(node, ast.FunctionDef):
if node.returns is None:
print(f"{node.name}: missing return type annotation")
PYRepository: ProjectTech4DevAI/kaapi-backend Length of output: 157 Add explicit return type hints to migration functions. The ♻️ Suggested patch-def upgrade():
+def upgrade() -> None:
with op.get_context().autocommit_block():
for index_name, _body in INDEXES_TO_DROP:
op.execute(f'DROP INDEX CONCURRENTLY IF EXISTS "{index_name}"')
-def downgrade():
+def downgrade() -> None:
with op.get_context().autocommit_block():
for index_name, body in INDEXES_TO_DROP:
op.execute(f'CREATE INDEX CONCURRENTLY IF NOT EXISTS "{index_name}" {body}')🧰 Tools🪛 OpenGrep (1.20.0)[ERROR] 92-92: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead. (coderabbit.sql-injection.python-fstring-execute) [ERROR] 98-98: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead. (coderabbit.sql-injection.python-fstring-execute) 🤖 Prompt for AI Agents |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
find backend/app/alembic/versions -name "058_v1_query_optimization.py" -type fRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 133
🏁 Script executed:
Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 137
🏁 Script executed:
Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 10237
Add type hints to
upgrade()anddowngrade()functions, and redesign migration phases to prevent unrecoverable partial failures.The
upgrade()anddowngrade()functions are missing return type annotations (-> None), violating the type hints guideline.More critically,
autocommit_block()at line 195 commits Phase A changes (UPDATE + drop_column) before Phase B index creation begins. If anyCREATE INDEX CONCURRENTLYfails, the revision is not recorded as complete but theis_deletedcolumns are already dropped. A rerun fails ondrop_column()with a missing column error, leaving the migration unrecoverable without manual intervention. Wrap Phase A operations inside conditional checks (e.g.,column_exists()before drop) or consolidate transaction semantics so either all changes commit together or all roll back on any failure.🧰 Tools
🪛 OpenGrep (1.20.0)
[ERROR] 202-202: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.
(coderabbit.sql-injection.python-fstring-execute)
🤖 Prompt for AI Agents