diff --git a/backend/app/api/deps.py b/backend/app/api/deps.py index 526c5877b..bcebcab5f 100644 --- a/backend/app/api/deps.py +++ b/backend/app/api/deps.py @@ -15,6 +15,7 @@ from app.core.config import settings from app.core.db import engine from app.core.security import api_key_manager +from app.core.telemetry import set_request_log_context from app.crud.organization import validate_organization from app.crud.project import validate_project from app.models import ( @@ -44,16 +45,25 @@ def get_db() -> Generator[Session, None, None]: def _set_tenant_span_attributes(auth_context: AuthContext) -> None: - """Tag the active OTel span with tenant context so traces in Sentry can be - filtered by user / org / project IDs.""" + """Tag the active OTel span and log context with tenant info after auth. + + Sets org/project on: + - OTel span → Sentry traces filterable by tenant + - log context → every log record in this request carries org_id/project_id + - Sentry scope → tags on all events for this request + """ span = trace.get_current_span() - if not span.is_recording(): - return - span.set_attribute("user.id", str(auth_context.user.id)) - if auth_context.organization: - span.set_attribute("tenant.org_id", auth_context.organization.id) - if auth_context.project: - span.set_attribute("tenant.project_id", auth_context.project.id) + if span.is_recording(): + span.set_attribute("user.id", str(auth_context.user.id)) + if auth_context.organization: + span.set_attribute("tenant.org_id", auth_context.organization.id) + if auth_context.project: + span.set_attribute("tenant.project_id", auth_context.project.id) + + set_request_log_context( + org_id=auth_context.organization.id if auth_context.organization else None, + project_id=auth_context.project.id if auth_context.project else None, + ) def _authenticate_with_jwt(session: Session, token: str) -> AuthContext: diff --git a/backend/app/api/routes/credentials.py b/backend/app/api/routes/credentials.py index be75b5b98..e2d338761 100644 --- a/backend/app/api/routes/credentials.py +++ b/backend/app/api/routes/credentials.py @@ -115,7 +115,7 @@ def update_credential( _current_user: AuthContextDep, ): if not creds_in or not creds_in.provider or not creds_in.credential: - logger.error( + logger.warning( f"[update_credential] Invalid input | organization_id: {_current_user.organization_.id}, project_id: {_current_user.project_.id}" ) raise HTTPException( @@ -256,7 +256,7 @@ def update_credential_by_org_project( _current_user: AuthContextDep, ): if not creds_in or not creds_in.provider or not creds_in.credential: - logger.error( + logger.warning( f"[update_credential_by_org_project] Invalid input | organization_id: {org_id}, project_id: {project_id}" ) raise HTTPException( diff --git a/backend/app/api/routes/model_evaluation.py b/backend/app/api/routes/model_evaluation.py index c22c78508..21e8556fa 100644 --- a/backend/app/api/routes/model_evaluation.py +++ b/backend/app/api/routes/model_evaluation.py @@ -143,7 +143,7 @@ def evaluate_models( # even though the client will be initialized separately inside the background task if not request.fine_tuning_ids: - logger.error( + logger.warning( f"[evaluate_model] No fine tuning IDs provided | project_id:{current_user.project_.id}" ) raise HTTPException(status_code=400, detail="No fine-tuned job IDs provided") diff --git a/backend/app/api/routes/organization.py b/backend/app/api/routes/organization.py index 079e1a508..c3988016e 100644 --- a/backend/app/api/routes/organization.py +++ b/backend/app/api/routes/organization.py @@ -70,7 +70,6 @@ def read_organization( """ org = get_organization_by_id(session=session, org_id=org_id) if org is None: - logger.error(f"[read_organization] Organization not found | org_id={org_id}") raise HTTPException(status_code=404, detail="Organization not found") return APIResponse.success_response(org) @@ -87,9 +86,6 @@ def update_organization( ) -> APIResponse[OrganizationPublic]: org = get_organization_by_id(session=session, org_id=org_id) if org is None: - logger.error( - f"[update_organization] Organization not found | 'org_id': {org_id}" - ) raise HTTPException(status_code=404, detail="Organization not found") org_data = org_in.model_dump(exclude_unset=True) @@ -115,9 +111,6 @@ def update_organization( def delete_organization(session: SessionDep, org_id: int) -> APIResponse[None]: org = get_organization_by_id(session=session, org_id=org_id) if org is None: - logger.error( - f"[delete_organization] Organization not found | 'org_id': {org_id}" - ) raise HTTPException(status_code=404, detail="Organization not found") session.delete(org) diff --git a/backend/app/api/routes/project.py b/backend/app/api/routes/project.py index 71fcf50ee..64d65c1ae 100644 --- a/backend/app/api/routes/project.py +++ b/backend/app/api/routes/project.py @@ -66,7 +66,6 @@ def read_project(*, session: SessionDep, project_id: int): """ project = get_project_by_id(session=session, project_id=project_id) if project is None: - logger.error(f"[read_project] Project not found | project_id={project_id}") raise HTTPException(status_code=404, detail="Project not found") return APIResponse.success_response(project) @@ -81,7 +80,6 @@ def read_project(*, session: SessionDep, project_id: int): def update_project(*, session: SessionDep, project_id: int, project_in: ProjectUpdate): project = get_project_by_id(session=session, project_id=project_id) if project is None: - logger.error(f"[update_project] Project not found | project_id={project_id}") raise HTTPException(status_code=404, detail="Project not found") project_data = project_in.model_dump(exclude_unset=True) @@ -106,7 +104,6 @@ def update_project(*, session: SessionDep, project_id: int, project_in: ProjectU def delete_project(session: SessionDep, project_id: int): project = get_project_by_id(session=session, project_id=project_id) if project is None: - logger.error(f"[delete_project] Project not found | project_id={project_id}") raise HTTPException(status_code=404, detail="Project not found") session.delete(project) diff --git a/backend/app/api/routes/threads.py b/backend/app/api/routes/threads.py index 90302e5f0..3bdd8619c 100644 --- a/backend/app/api/routes/threads.py +++ b/backend/app/api/routes/threads.py @@ -65,7 +65,7 @@ def validate_thread(client: OpenAI, thread_id: str) -> tuple[bool, str]: if runs.data and len(runs.data) > 0: latest_run = runs.data[0] if latest_run.status in ["queued", "in_progress", "requires_action"]: - logger.error( + logger.warning( f"[validate_thread] Thread ID {mask_string(thread_id)} is currently {latest_run.status}." ) return ( @@ -305,7 +305,7 @@ async def threads( ) client, success = configure_openai(credentials) if not success: - logger.error( + logger.warning( f"[threads] OpenAI API key not configured for this organization. | organization_id: {_current_user.organization_.id}, project_id: {request.get('project_id')}" ) return APIResponse.failure_response( @@ -379,7 +379,7 @@ async def threads_sync( # Configure OpenAI client client, success = configure_openai(credentials) if not success: - logger.error( + logger.warning( f"[threads_sync] OpenAI API key not configured for this organization. | organization_id: {_current_user.organization_.id}, project_id: {request.get('project_id')}" ) return APIResponse.failure_response( @@ -447,7 +447,7 @@ async def start_thread( # Configure OpenAI client client, success = configure_openai(credentials) if not success: - logger.error( + logger.warning( f"[start_thread] OpenAI API key not configured for this organization. | project_id: {_current_user.project_.id}" ) return APIResponse.failure_response( @@ -501,9 +501,6 @@ async def get_thread( result = get_thread_result(db, thread_id) if not result: - logger.error( - f"[get_thread] Thread result not found for ID: {mask_string(thread_id)} | org_id: {_current_user.organization_.id}" - ) raise HTTPException(404, "thread not found") status = result.status or ("success" if result.response else "processing") diff --git a/backend/app/api/routes/users.py b/backend/app/api/routes/users.py index ba13a6c1c..247a522c1 100644 --- a/backend/app/api/routes/users.py +++ b/backend/app/api/routes/users.py @@ -50,7 +50,7 @@ def read_users(session: SessionDep, skip: int = 0, limit: int = 100) -> Any: ) def create_user_endpoint(*, session: SessionDep, user_in: UserCreate) -> Any: if get_user_by_email(session=session, email=user_in.email): - logger.error( + logger.warning( f"[create_user_endpoint] Attempting to create user with existing email | email: {user_in.email}" ) raise HTTPException( @@ -80,7 +80,7 @@ def update_user_me( if user_in.email: existing_user = get_user_by_email(session=session, email=user_in.email) if existing_user and existing_user.id != current_user.id: - logger.error( + logger.warning( f"[update_user_me] Attempting to update user with existing email | email: {user_in.email}, user_id: {current_user.id}" ) raise HTTPException( @@ -125,7 +125,7 @@ def read_user_me(current_user_dep: AuthContextDep) -> Any: def delete_user_me(session: SessionDep, current_user_dep: AuthContextDep) -> Any: current_user = current_user_dep.user if current_user.is_superuser: - logger.error( + logger.warning( f"[delete_user_me] Attempting to delete superuser account by itself | user_id: {current_user.id}" ) raise HTTPException( @@ -147,7 +147,7 @@ def register_user(session: SessionDep, user_in: UserRegister) -> Any: This endpoint allows the registration of a new user and is accessible only by a superuser. """ if get_user_by_email(session=session, email=user_in.email): - logger.error( + logger.warning( f"[register_user] Attempting to create user with existing email | email: {user_in.email}" ) raise HTTPException( @@ -190,7 +190,6 @@ def update_user_endpoint( ) -> Any: db_user = session.get(User, user_id) if not db_user: - logger.error(f"[update_user_endpoint] User not found | user_id: {user_id}") raise HTTPException( status_code=404, detail="The user with this id does not exist in the system", @@ -199,7 +198,7 @@ def update_user_endpoint( if user_in.email: existing_user = get_user_by_email(session=session, email=user_in.email) if existing_user and existing_user.id != user_id: - logger.error( + logger.warning( f"[update_user_endpoint] Attempting to update user with existing email | email: {user_in.email}, user_id: {user_id}" ) raise HTTPException( @@ -219,11 +218,10 @@ def delete_user( ) -> Message: user = session.get(User, user_id) if not user: - logger.error(f"[delete_user] User not found | user_id: {user_id}") raise HTTPException(status_code=404, detail="User not found") if user == current_user.user: - logger.error( + logger.warning( f"[delete_user] Attempting to delete self by superuser | user_id: {current_user.user.id}" ) raise HTTPException( diff --git a/backend/app/celery/celery_app.py b/backend/app/celery/celery_app.py index 2448e2913..14ecd090f 100644 --- a/backend/app/celery/celery_app.py +++ b/backend/app/celery/celery_app.py @@ -6,6 +6,7 @@ task_failure, task_postrun, task_prerun, + setup_logging, worker_process_init, ) from kombu import Exchange, Queue @@ -14,17 +15,22 @@ from app.core.logger import configure_logging from app.core.sentry_filters import before_send_transaction_filter -configure_logging(service_name="kaapi-celery") - logger = logging.getLogger(__name__) _telemetry_initialized = False _sentry_initialized = False _flush_hook_registered = False +@setup_logging.connect +def configure_celery_logging(**_: object) -> None: + configure_logging(service_name="kaapi-celery") + + def _initialize_worker_observability() -> None: global _telemetry_initialized, _sentry_initialized, _flush_hook_registered + configure_logging(service_name="kaapi-celery") + if settings.SENTRY_DSN and not _sentry_initialized: import sentry_sdk from sentry_sdk.integrations.celery import CeleryIntegration diff --git a/backend/app/core/batch/client.py b/backend/app/core/batch/client.py index 115df6217..b6a08f993 100644 --- a/backend/app/core/batch/client.py +++ b/backend/app/core/batch/client.py @@ -69,7 +69,7 @@ def from_credentials( ) if not credentials: - logger.error( + logger.warning( f"[from_credentials] Gemini credentials not found | " f"org_id: {org_id}, project_id: {project_id}" ) @@ -80,7 +80,7 @@ def from_credentials( api_key = credentials.get("api_key") if not api_key: - logger.error( + logger.warning( f"[from_credentials] Invalid Gemini credentials (missing api_key) | " f"org_id: {org_id}, project_id: {project_id}" ) diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 6e46112de..9328a865d 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -137,6 +137,8 @@ def AWS_S3_BUCKET(self) -> str: LOG_DIR: str = os.path.join(os.path.dirname(os.path.dirname(__file__)), "logs") OTEL_ENABLED: bool = False OTEL_SERVICE_NAME: str = "kaapi-backend" + BACKEND_SERVICE_NAME: str = "kaapi-backend" + CRON_SERVICE_NAME: str = "kaapi-cron" # Celery Configuration CELERY_WORKER_CONCURRENCY: int | None = None diff --git a/backend/app/core/finetune/evaluation.py b/backend/app/core/finetune/evaluation.py index 560a4c752..09ad38938 100644 --- a/backend/app/core/finetune/evaluation.py +++ b/backend/app/core/finetune/evaluation.py @@ -64,7 +64,7 @@ def load_labels_and_prompts(self) -> None: label_col = "label" if "label" in df.columns else None if not query_col or not label_col: - logger.error( + logger.warning( "[ModelEvaluator.load_labels_and_prompts] CSV must " "contain a 'label' column and one of: " f"{possible_query_columns}" diff --git a/backend/app/core/finetune/preprocessing.py b/backend/app/core/finetune/preprocessing.py index 52d652ee1..ff080c50c 100644 --- a/backend/app/core/finetune/preprocessing.py +++ b/backend/app/core/finetune/preprocessing.py @@ -101,7 +101,7 @@ def _load_dataframe(self): self.label_col = "label" if "label" in df.columns else None if not self.query_col or not self.label_col: - logger.error( + logger.warning( f"[DataPreprocessor] Dataset does not contain a 'label' column and one of: {possible_query_columns}" ) raise ValueError( diff --git a/backend/app/core/logger.py b/backend/app/core/logger.py index fd2abb3dd..8a1ffe441 100644 --- a/backend/app/core/logger.py +++ b/backend/app/core/logger.py @@ -1,7 +1,12 @@ import logging import os +from collections.abc import Iterator +from contextlib import contextmanager +from contextvars import ContextVar from logging.handlers import RotatingFileHandler + from asgi_correlation_id import correlation_id + from app.core.config import settings LOG_DIR = settings.LOG_DIR @@ -14,11 +19,16 @@ "%(asctime)s - [%(service_name)s] - [%(correlation_id)s] - " "%(levelname)s - %(name)s - %(message)s" ) +_service_name_context: ContextVar[str | None] = ContextVar( + "kaapi_service_name", default=None +) class CorrelationIdFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: record.correlation_id = correlation_id.get() or "N/A" + if record.name == "uvicorn.error": + record.name = "uvicorn" return True @@ -27,11 +37,45 @@ def __init__(self, service_name: str) -> None: super().__init__() self._service_name = service_name + def set_default_service_name(self, service_name: str) -> None: + self._service_name = service_name + def filter(self, record: logging.LogRecord) -> bool: - record.service_name = self._service_name + record.service_name = _service_name_context.get() or self._service_name return True +@contextmanager +def log_service_name(service_name: str) -> Iterator[None]: + token = _service_name_context.set(service_name) + try: + yield + finally: + _service_name_context.reset(token) + + +def _set_service_name_on_existing_filters( + root_logger: logging.Logger, service_name: str +) -> None: + for handler in root_logger.handlers: + for handler_filter in handler.filters: + if isinstance(handler_filter, ServiceNameFilter): + handler_filter.set_default_service_name(service_name) + + +def _configure_uvicorn_loggers() -> None: + for logger_name in ("uvicorn", "uvicorn.error"): + uvicorn_logger = logging.getLogger(logger_name) + uvicorn_logger.handlers.clear() + uvicorn_logger.propagate = True + uvicorn_logger.disabled = False + + access_logger = logging.getLogger("uvicorn.access") + access_logger.handlers.clear() + access_logger.propagate = False + access_logger.disabled = True + + logging.getLogger("LiteLLM").setLevel(logging.WARNING) logging.getLogger("opentelemetry").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) @@ -42,13 +86,17 @@ def filter(self, record: logging.LogRecord) -> bool: def configure_logging(service_name: str | None = None) -> None: root_logger = logging.getLogger() + resolved_service_name = service_name or settings.BACKEND_SERVICE_NAME + if getattr(root_logger, "_kaapi_logging_configured", False): + _set_service_name_on_existing_filters(root_logger, resolved_service_name) + root_logger._kaapi_service_name = resolved_service_name + _configure_uvicorn_loggers() return root_logger.setLevel(LOGGING_LEVEL) formatter = logging.Formatter(LOGGING_FORMAT) - resolved_service_name = service_name or settings.OTEL_SERVICE_NAME stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) @@ -66,3 +114,5 @@ def configure_logging(service_name: str | None = None) -> None: root_logger.addHandler(stream_handler) root_logger.addHandler(file_handler) root_logger._kaapi_logging_configured = True + root_logger._kaapi_service_name = resolved_service_name + _configure_uvicorn_loggers() diff --git a/backend/app/core/middleware.py b/backend/app/core/middleware.py index 153a41947..1032e3eba 100644 --- a/backend/app/core/middleware.py +++ b/backend/app/core/middleware.py @@ -2,9 +2,13 @@ import time import sentry_sdk +from asgi_correlation_id import correlation_id from fastapi import Request, Response from opentelemetry import trace +from app.core.config import settings +from app.core.logger import log_service_name + logger = logging.getLogger("http_request_logger") @@ -19,6 +23,14 @@ def _resolve_http_route(request: Request) -> str: async def http_request_logger(request: Request, call_next) -> Response: + if request.url.path.startswith(f"{settings.API_V1_STR}/cron/"): + with log_service_name(settings.CRON_SERVICE_NAME): + return await _log_http_request(request, call_next) + + return await _log_http_request(request, call_next) + + +async def _log_http_request(request: Request, call_next) -> Response: start_time = time.time() method = request.method raw_path = request.url.path @@ -32,6 +44,8 @@ async def http_request_logger(request: Request, call_next) -> Response: if sentry_sdk.get_client().is_active(): sentry_sdk.set_tag("http.method", method) sentry_sdk.set_tag("http.request.method", method) + if request_id := correlation_id.get(): + sentry_sdk.set_tag("correlation_id", request_id) try: response = await call_next(request) diff --git a/backend/app/core/providers.py b/backend/app/core/providers.py index 11d2c888b..597c9708d 100644 --- a/backend/app/core/providers.py +++ b/backend/app/core/providers.py @@ -65,7 +65,7 @@ def validate_provider(provider: str) -> Provider: return Provider(provider.lower()) except ValueError: supported = ", ".join(p.value for p in Provider) - logger.error( + logger.warning( f"[validate_provider] Unsupported provider | provider: {provider}, supported_providers: {supported}" ) raise ValueError( @@ -89,7 +89,7 @@ def validate_provider_credentials(provider: str, credentials: Dict[str, str]) -> if missing_fields := [ field for field in required_fields if field not in credentials ]: - logger.error( + logger.warning( f"[validate_provider_credentials] Missing required fields | provider: {provider}, missing_fields: {', '.join(missing_fields)}" ) raise ValueError( diff --git a/backend/app/core/sentry_filters.py b/backend/app/core/sentry_filters.py index 30bc60923..6712803bd 100644 --- a/backend/app/core/sentry_filters.py +++ b/backend/app/core/sentry_filters.py @@ -5,7 +5,9 @@ _SQL_OR_CONNECT = re.compile(r"^(select|insert|update|delete|connect)\b", re.IGNORECASE) _HTTP_SEND_RECEIVE = re.compile(r"http (send|receive)$", re.IGNORECASE) _DB_QUERY_SPAN = re.compile(r"^db\.query$", re.IGNORECASE) -_BARE_HTTP_METHOD = re.compile(r"^(GET|HEAD|OPTIONS)$", re.IGNORECASE) +_BARE_HTTP_METHOD = re.compile( + r"^(GET|HEAD|OPTIONS|POST|PUT|PATCH|DELETE|TRACE|CONNECT)$", re.IGNORECASE +) _NOISE_PATH = re.compile( r"(^/health/?$|^/robots\.txt$|^/favicon\.ico$|^/wp-admin|^/wp-login|^/xmlrpc\.php$)", re.IGNORECASE, diff --git a/backend/app/core/telemetry.py b/backend/app/core/telemetry.py index f267afb55..ce39702bc 100644 --- a/backend/app/core/telemetry.py +++ b/backend/app/core/telemetry.py @@ -56,6 +56,34 @@ def _emit_sentry_metric( logger.debug("[_emit_sentry_metric] Failed to emit %s (%s)", name, metric_type) +def set_request_log_context( + org_id: int | None = None, + project_id: int | None = None, +) -> None: + """Attach org/project to the current request's log context and Sentry scope. + + Call once per authenticated request (from the auth dependency). All subsequent + log records in this request will carry org_id and project_id automatically + via LogContextFilter — no need to add them to individual log statements. + """ + current = _log_context_var.get() or {} + payload = dict(current) + if org_id is not None: + payload["org_id"] = str(org_id) + if project_id is not None: + payload["project_id"] = str(project_id) + _log_context_var.set(payload) + + try: + if sentry_sdk.get_client().is_active(): + if org_id is not None: + sentry_sdk.set_tag("tenant.org_id", str(org_id)) + if project_id is not None: + sentry_sdk.set_tag("tenant.project_id", str(project_id)) + except Exception: + pass + + @contextmanager def log_context( *, tag: str | None = None, **fields: str | int | float | bool | None diff --git a/backend/app/core/util.py b/backend/app/core/util.py index 56a05dbc1..25c06d909 100644 --- a/backend/app/core/util.py +++ b/backend/app/core/util.py @@ -40,5 +40,5 @@ def configure_openai(credentials: dict) -> tuple[OpenAI, bool]: client = OpenAI(api_key=credentials["api_key"]) return client, True except Exception as e: - logger.error(f"Failed to configure OpenAI client: {str(e)}") + logger.warning(f"Failed to configure OpenAI client: {str(e)}") return None, False diff --git a/backend/app/crud/assistants.py b/backend/app/crud/assistants.py index c32f820da..a079b17a2 100644 --- a/backend/app/crud/assistants.py +++ b/backend/app/crud/assistants.py @@ -62,12 +62,12 @@ def fetch_assistant_from_openai(assistant_id: str, client: OpenAI) -> OpenAIAssi assistant = client.beta.assistants.retrieve(assistant_id=assistant_id) return assistant except openai.NotFoundError as e: - logger.error( + logger.warning( f"[fetch_assistant_from_openai] Assistant not found: {mask_string(assistant_id)} | {e}" ) raise HTTPException(status_code=404, detail="Assistant not found in OpenAI.") except openai.OpenAIError as e: - logger.error( + logger.warning( f"[fetch_assistant_from_openai] OpenAI API error while retrieving assistant {mask_string(assistant_id)}: {e}" ) raise HTTPException(status_code=502, detail=f"OpenAI API error: {e}") @@ -83,13 +83,13 @@ def verify_vector_store_ids_exist( try: openai_client.vector_stores.retrieve(vector_store_id) except openai.NotFoundError: - logger.error(f"Vector store ID {vector_store_id} not found in OpenAI.") + logger.warning(f"Vector store ID {vector_store_id} not found in OpenAI.") raise HTTPException( status_code=400, detail=f"Vector store ID {vector_store_id} not found in OpenAI.", ) except openai.OpenAIError as e: - logger.error(f"Failed to verify vector store ID {vector_store_id}: {e}") + logger.warning(f"Failed to verify vector store ID {vector_store_id}: {e}") raise HTTPException( status_code=502, detail=f"Error verifying vector store ID {vector_store_id}: {str(e)}", @@ -178,7 +178,7 @@ def create_assistant( existing = get_assistant_by_id(session, assistant.assistant_id, project_id) if existing: - logger.error( + logger.warning( f"[create_assistant] Assistant with ID {mask_string(assistant.assistant_id)} already exists. | project_id: {project_id}" ) raise HTTPException( @@ -209,7 +209,7 @@ def update_assistant( ) -> Assistant: existing_assistant = get_assistant_by_id(session, assistant_id, project_id) if not existing_assistant: - logger.error( + logger.warning( f"[update_assistant] Assistant {mask_string(assistant_id)} not found | project_id: {project_id}" ) raise HTTPException(status_code=404, detail="Assistant not found.") @@ -227,7 +227,7 @@ def update_assistant( add_ids = set(assistant_update.vector_store_ids_add or []) remove_ids = set(assistant_update.vector_store_ids_remove or []) if conflicting_ids := add_ids & remove_ids: - logger.error( + logger.warning( f"[update_assistant] Conflicting vector store IDs in add/remove: {conflicting_ids} | project_id: {project_id}" ) raise HTTPException( diff --git a/backend/app/crud/config/version.py b/backend/app/crud/config/version.py index e1335d171..9b5642f88 100644 --- a/backend/app/crud/config/version.py +++ b/backend/app/crud/config/version.py @@ -65,7 +65,7 @@ def create_or_raise(self, version_create: ConfigVersionUpdate) -> ConfigVersion: validated_blob = ConfigBlob.model_validate(merged_config) except ValidationError as e: validation_errors = e.errors() - logger.error( + logger.warning( f"[ConfigVersionCrud.create_or_raise] Validation failed | " f"{{'config_id': '{self.config_id}', 'error_count': {len(validation_errors)}, " f"'fields': {['.'.join(str(part) for part in err['loc']) for err in validation_errors]}}}" @@ -282,7 +282,7 @@ def _validate_config_type_unchanged( old_type = "text" if new_type is None: - logger.error( + logger.warning( f"[ConfigVersionCrud._validate_config_type_unchanged] Missing type field | " f"{{'config_id': '{self.config_id}', 'old_type': {old_type}, 'new_type': {new_type}}}" ) diff --git a/backend/app/crud/credentials.py b/backend/app/crud/credentials.py index 6853c455a..1d23ff587 100644 --- a/backend/app/crud/credentials.py +++ b/backend/app/crud/credentials.py @@ -22,7 +22,7 @@ def set_creds_for_org( created_credentials = [] if not creds_add.credential: - logger.error( + logger.warning( f"[set_creds_for_org] No credentials provided | project_id: {project_id}" ) raise HTTPException(400, "No credentials provided") @@ -31,7 +31,7 @@ def set_creds_for_org( try: validate_provider_credentials(provider, credentials) except ValueError as e: - logger.error( + logger.warning( f"[set_creds_for_org] Validation error | project_id: {project_id}, provider: {provider}, error: {str(e)}" ) raise HTTPException(status_code=400, detail=str(e)) @@ -197,7 +197,7 @@ def update_creds_for_org( try: validate_provider_credentials(creds_in.provider, credential_data) except ValueError as e: - logger.error( + logger.warning( f"[update_creds_for_org] Validation error | organization_id: {org_id}, project_id: {project_id}, provider: {creds_in.provider}, error: {str(e)}" ) raise HTTPException(status_code=400, detail=str(e)) diff --git a/backend/app/crud/document/document.py b/backend/app/crud/document/document.py index 35e4d86fb..84f2898a1 100644 --- a/backend/app/crud/document/document.py +++ b/backend/app/crud/document/document.py @@ -48,9 +48,8 @@ def read_many( try: raise ValueError(f"Negative skip: {skip}") except ValueError as err: - logger.error( + logger.warning( f"[DocumentCrud.read_many] Invalid skip value | {{'project_id': {self.project_id}, 'skip': {skip}, 'error': '{str(err)}'}}", - exc_info=True, ) raise statement = statement.offset(skip) @@ -60,9 +59,8 @@ def read_many( try: raise ValueError(f"Negative limit: {limit}") except ValueError as err: - logger.error( + logger.warning( f"[DocumentCrud.read_many] Invalid limit value | {{'project_id': {self.project_id}, 'limit': {limit}, 'error': '{str(err)}'}}", - exc_info=True, ) raise statement = statement.limit(limit + 1) @@ -112,9 +110,8 @@ def update(self, document: Document): try: raise PermissionError(error) except PermissionError as err: - logger.error( + logger.warning( f"[DocumentCrud.update] Permission error | {{'doc_id': '{document.id}', 'error': '{str(err)}'}}", - exc_info=True, ) raise document.updated_at = now() diff --git a/backend/app/crud/evaluations/batch.py b/backend/app/crud/evaluations/batch.py index 13fb9a50b..4181910b8 100644 --- a/backend/app/crud/evaluations/batch.py +++ b/backend/app/crud/evaluations/batch.py @@ -41,7 +41,7 @@ def fetch_dataset_items(langfuse: Langfuse, dataset_name: str) -> list[dict[str, try: dataset = langfuse.get_dataset(dataset_name) except Exception as e: - logger.error( + logger.warning( f"[fetch_dataset_items] Failed to fetch dataset | dataset={dataset_name} | {e}" ) raise ValueError(f"Dataset '{dataset_name}' not found: {e}") diff --git a/backend/app/crud/evaluations/dataset.py b/backend/app/crud/evaluations/dataset.py index 806238a00..0aaf5a2e9 100644 --- a/backend/app/crud/evaluations/dataset.py +++ b/backend/app/crud/evaluations/dataset.py @@ -83,9 +83,8 @@ def create_evaluation_dataset( except IntegrityError as e: session.rollback() - logger.error( - f"[create_evaluation_dataset] Database integrity error creating dataset | name={name} | {e}", - exc_info=True, + logger.warning( + f"[create_evaluation_dataset] Duplicate dataset name | name={name} | {e}", ) raise HTTPException( status_code=409, diff --git a/backend/app/crud/evaluations/embeddings.py b/backend/app/crud/evaluations/embeddings.py index d21f186cc..d689e7567 100644 --- a/backend/app/crud/evaluations/embeddings.py +++ b/backend/app/crud/evaluations/embeddings.py @@ -166,7 +166,9 @@ def parse_embedding_results(raw_results: list[dict[str, Any]]) -> list[dict[str, # Handle errors in batch processing if response.get("error"): error_msg = response["error"].get("message", "Unknown error") - logger.error(f"Trace {trace_id} had error: {error_msg}") + logger.warning( + f"[parse_embedding_batch_results] Trace {trace_id} had error: {error_msg}" + ) continue # Extract the response body diff --git a/backend/app/crud/evaluations/langfuse.py b/backend/app/crud/evaluations/langfuse.py index 7c9edbe99..be5da4eb3 100644 --- a/backend/app/crud/evaluations/langfuse.py +++ b/backend/app/crud/evaluations/langfuse.py @@ -268,7 +268,7 @@ def upload_item(item: dict[str, str], duplicate_num: int, question_id: str) -> b ) return True except Exception as e: - logger.error( + logger.warning( f"[upload_dataset_to_langfuse] Failed to upload item | " f"duplicate={duplicate_num + 1} | " f"question={item['question'][:50]}... | {e}" @@ -390,8 +390,8 @@ def fetch_trace_scores_from_langfuse( try: dataset_run = langfuse.api.datasets.get_run(dataset_name, run_name) except Exception as e: - logger.error( - f"[fetch_trace_scores_from_langfuse] Failed to get run | " + logger.warning( + f"[fetch_trace_scores_from_langfuse] Run not found in Langfuse | " f"dataset={dataset_name} | run={run_name} | error={e}" ) raise ValueError( diff --git a/backend/app/crud/evaluations/processing.py b/backend/app/crud/evaluations/processing.py index 472390801..607e29e49 100644 --- a/backend/app/crud/evaluations/processing.py +++ b/backend/app/crud/evaluations/processing.py @@ -93,7 +93,7 @@ def _extract_batch_error_message( error_msg = "Batch completed with errors but could not parse error file" except Exception as e: - logger.error( + logger.warning( f"[_extract_batch_error_message] Failed to extract errors | batch_job_id={batch_job.id} | {e}", exc_info=True, ) @@ -181,7 +181,7 @@ def parse_evaluation_output( # Handle errors in batch processing if response.get("error"): error_msg = response["error"].get("message", "Unknown error") - logger.error( + logger.warning( f"[parse_evaluation_output] Item had error | item_id={item_id} | {error_msg}" ) generated_output = f"ERROR: {error_msg}" @@ -503,7 +503,7 @@ async def process_completed_embedding_batch( ) except Exception as e: # Log error but don't fail the evaluation - logger.error( + logger.warning( f"[process_completed_embedding_batch] {log_prefix} Failed to update Langfuse traces with scores | {e}", exc_info=True, ) @@ -623,7 +623,7 @@ async def check_and_process_evaluation( } elif embedding_status in ["failed", "expired", "cancelled"]: - logger.error( + logger.warning( f"[check_and_process_evaluation] {log_prefix} Embedding batch failed | provider_batch_id={embedding_batch_job.provider_batch_id} | {embedding_batch_job.error_message}" ) # Mark as completed without embeddings diff --git a/backend/app/crud/fine_tuning.py b/backend/app/crud/fine_tuning.py index 61a0ccc6a..ade3768a1 100644 --- a/backend/app/crud/fine_tuning.py +++ b/backend/app/crud/fine_tuning.py @@ -93,7 +93,7 @@ def fetch_by_id(session: Session, job_id: int, project_id: int) -> Fine_Tuning: ).one_or_none() if job is None: - logger.error( + logger.warning( f"[fetch_by_id]Fine-tune job not found: job_id={job_id}, project_id={project_id}" ) raise HTTPException(status_code=404, detail="Job not found") diff --git a/backend/app/crud/model_evaluation.py b/backend/app/crud/model_evaluation.py index 51fa7a486..2255b88af 100644 --- a/backend/app/crud/model_evaluation.py +++ b/backend/app/crud/model_evaluation.py @@ -27,7 +27,7 @@ def create_model_evaluation( fine_tuning_job = fetch_by_id(session, request.fine_tuning_id, project_id) if fine_tuning_job.fine_tuned_model and fine_tuning_job.test_data_s3_object is None: - logger.error( + logger.warning( f"[create_model_evaluation] No fine tuned model or test data found for the given fine tuning ID | fine_tuning_id={request.fine_tuning_id}, project_id={project_id}" ) raise HTTPException(404, "Fine tuned model not found") @@ -68,9 +68,6 @@ def fetch_by_eval_id( ).one_or_none() if model_eval is None: - logger.error( - f"[fetch_by_id]Model evaluation not found for eval_id={eval_id}, project_id={project_id}" - ) raise HTTPException(status_code=404, detail="Model evaluation not found") logger.info( @@ -96,9 +93,6 @@ def fetch_eval_by_doc_id( model_evals = session.exec(query).all() if not model_evals: - logger.error( - f"[fetch_eval_by_doc_id]Model evaluation not found for document_id={document_id}, project_id={project_id}" - ) raise HTTPException(status_code=404, detail="Model evaluation not found") logger.info( @@ -134,7 +128,7 @@ def fetch_top_model_by_doc_id( top_model = model_eval if not top_model: - logger.error( + logger.warning( f"[fetch_top_model_by_doc_id]No model evaluation found with populated score for document_id={document_id}, project_id={project_id}" ) raise HTTPException(status_code=404, detail="No top model found") diff --git a/backend/app/crud/organization.py b/backend/app/crud/organization.py index e27ddc9f8..be7970c5a 100644 --- a/backend/app/crud/organization.py +++ b/backend/app/crud/organization.py @@ -43,13 +43,13 @@ def validate_organization(session: Session, org_id: int) -> Organization: """ organization = get_organization_by_id(session, org_id) if not organization: - logger.error( + logger.warning( f"[validate_organization] Organization not found | 'org_id': {org_id}" ) raise HTTPException(404, "Organization not found") if not organization.is_active: - logger.error( + logger.warning( f"[validate_organization] Organization is not active | 'org_id': {org_id}" ) raise HTTPException(status_code=403, detail="Organization is not active") diff --git a/backend/app/crud/project.py b/backend/app/crud/project.py index 570a3b266..67dd64be4 100644 --- a/backend/app/crud/project.py +++ b/backend/app/crud/project.py @@ -17,7 +17,7 @@ def create_project(*, session: Session, project_create: ProjectCreate) -> Projec project_name=project_create.name, ) if project: - logger.error( + logger.warning( f"[create_project] Project already exists | 'project_id': {project.id}, 'name': {project.name}" ) raise HTTPException(409, "Project already exists") @@ -58,13 +58,13 @@ def validate_project(session: Session, project_id: int) -> Project: """ project = get_project_by_id(session=session, project_id=project_id) if not project: - logger.error( + logger.warning( f"[validate_project] Project not found | 'project_id': {project_id}" ) raise HTTPException(404, "Project not found") if not project.is_active: - logger.error( + logger.warning( f"[validate_project] Project is not active | 'project_id': {project_id}" ) raise HTTPException(404, "Project is not active") diff --git a/backend/app/crud/rag/open_ai.py b/backend/app/crud/rag/open_ai.py index cdae82440..2ae36f4f1 100644 --- a/backend/app/crud/rag/open_ai.py +++ b/backend/app/crud/rag/open_ai.py @@ -55,9 +55,8 @@ def __call__(self, resource, retries=1): ) return except OpenAIError as err: - logger.error( + logger.warning( f"[ResourceCleaner.call] OpenAI error during cleanup | {{'cleaner_type': '{self}', 'resource': '{resource}', 'error': '{str(err)}'}}", - exc_info=True, ) logger.warning( diff --git a/backend/app/crud/stt_evaluations/batch.py b/backend/app/crud/stt_evaluations/batch.py index a68d35d7d..6d0ba2a84 100644 --- a/backend/app/crud/stt_evaluations/batch.py +++ b/backend/app/crud/stt_evaluations/batch.py @@ -160,7 +160,7 @@ def _upload_to_gemini(sample: STTSample) -> _UploadResult: gemini_file_names.append(result.file_name) else: failed_samples.append((result.sample, result.error)) - logger.error( + logger.warning( f"[start_stt_evaluation_batch] Failed to upload to Gemini | " f"sample_id: {result.sample.id}, error: {result.error}" ) diff --git a/backend/app/crud/stt_evaluations/dataset.py b/backend/app/crud/stt_evaluations/dataset.py index 2c559fb10..76c56c2c5 100644 --- a/backend/app/crud/stt_evaluations/dataset.py +++ b/backend/app/crud/stt_evaluations/dataset.py @@ -82,7 +82,7 @@ def create_stt_dataset( except IntegrityError as e: session.rollback() if "uq_evaluation_dataset_name_org_project" in str(e): - logger.error( + logger.warning( f"[create_stt_dataset] Dataset name already exists | name: {name}" ) raise HTTPException( diff --git a/backend/app/crud/tts_evaluations/dataset.py b/backend/app/crud/tts_evaluations/dataset.py index c81007e92..64a5a83f2 100644 --- a/backend/app/crud/tts_evaluations/dataset.py +++ b/backend/app/crud/tts_evaluations/dataset.py @@ -75,7 +75,7 @@ def create_tts_dataset( except IntegrityError as e: session.rollback() if "uq_evaluation_dataset_name_org_project" in str(e): - logger.error( + logger.warning( f"[create_tts_dataset] Dataset name already exists | name: {name}" ) raise HTTPException( diff --git a/backend/app/main.py b/backend/app/main.py index cb965fe4d..1abacf461 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -25,7 +25,7 @@ # Load environment variables load_environment() -configure_logging() +configure_logging(service_name=settings.BACKEND_SERVICE_NAME) if settings.SENTRY_DSN: @@ -52,7 +52,7 @@ ], ) -setup_telemetry() +setup_telemetry(service_name=settings.BACKEND_SERVICE_NAME) def custom_generate_unique_id(route: APIRoute) -> str: diff --git a/backend/app/services/collections/providers/registry.py b/backend/app/services/collections/providers/registry.py index 10d07d451..5195c4b4f 100644 --- a/backend/app/services/collections/providers/registry.py +++ b/backend/app/services/collections/providers/registry.py @@ -63,7 +63,7 @@ def get_llm_provider( raise ValueError("OpenAI credentials not configured for this project.") client = OpenAI(api_key=credentials["api_key"]) else: - logger.error( + logger.warning( f"[get_llm_provider] Unsupported provider type requested: {provider}" ) raise ValueError(f"Provider '{provider}' is not supported.") diff --git a/backend/app/services/evaluations/evaluation.py b/backend/app/services/evaluations/evaluation.py index 55653b253..9f78b9e37 100644 --- a/backend/app/services/evaluations/evaluation.py +++ b/backend/app/services/evaluations/evaluation.py @@ -266,7 +266,7 @@ def get_evaluation_with_scores( ) return eval_run, None except Exception as e: - logger.error( + logger.warning( f"[get_evaluation_with_scores] Error loading traces from S3: {e} | " f"evaluation_id={evaluation_id}", exc_info=True, diff --git a/backend/app/services/evaluations/validators.py b/backend/app/services/evaluations/validators.py index 61d0c3b06..c99774683 100644 --- a/backend/app/services/evaluations/validators.py +++ b/backend/app/services/evaluations/validators.py @@ -172,5 +172,5 @@ def parse_csv_items(csv_content: bytes) -> list[dict[str, str]]: except HTTPException: raise except Exception as e: - logger.error(f"[parse_csv_items] Failed to parse CSV | {e}", exc_info=True) + logger.warning(f"[parse_csv_items] Failed to parse CSV | {e}") raise HTTPException(status_code=422, detail=f"Invalid CSV file: {e}") diff --git a/backend/app/services/llm/chain/chain.py b/backend/app/services/llm/chain/chain.py index ad0503675..2cd80708b 100644 --- a/backend/app/services/llm/chain/chain.py +++ b/backend/app/services/llm/chain/chain.py @@ -125,7 +125,7 @@ def execute( on_block_completed(block._index, result) if not result.success: - logger.error( + logger.warning( f"[LLMChain.execute] Block {block._index} failed: {result.error} | " f"job_id={self._context.job_id}" ) diff --git a/backend/app/services/llm/chain/executor.py b/backend/app/services/llm/chain/executor.py index a7bde2799..795130e39 100644 --- a/backend/app/services/llm/chain/executor.py +++ b/backend/app/services/llm/chain/executor.py @@ -104,7 +104,7 @@ def _handle_error(self, error: str) -> dict: error=error or "Unknown error occurred", metadata=self._request.request_metadata, ) - logger.error( + logger.warning( f"[_handle_error] Chain execution failed | " f"chain_id={self._context.chain_id}, job_id={self._context.job_id}, error={error}" ) diff --git a/backend/app/services/llm/jobs.py b/backend/app/services/llm/jobs.py index e797040a2..e85c67a61 100644 --- a/backend/app/services/llm/jobs.py +++ b/backend/app/services/llm/jobs.py @@ -59,6 +59,33 @@ tracer = trace.get_tracer(__name__) +def _set_traceability_attributes( + span: trace.Span, + *, + job_id: UUID | str | None = None, + llm_call_id: UUID | str | None = None, + chain_id: UUID | str | None = None, + trace_id: str | None = None, + project_id: int | None = None, + organization_id: int | None = None, + task_id: str | None = None, +) -> None: + if job_id is not None: + span.set_attribute("llm.job_id", str(job_id)) + if llm_call_id is not None: + span.set_attribute("llm.call_id", str(llm_call_id)) + if chain_id is not None: + span.set_attribute("llm.chain_id", str(chain_id)) + if trace_id is not None: + span.set_attribute("kaapi.trace_id", trace_id) + if project_id is not None: + span.set_attribute("kaapi.project_id", project_id) + if organization_id is not None: + span.set_attribute("kaapi.organization_id", organization_id) + if task_id is not None: + span.set_attribute("celery.task_id", task_id) + + def _execute_provider_call( *, func, @@ -91,15 +118,19 @@ def start_job( project_id=project_id, organization_id=organization_id, ), tracer.start_as_current_span("llm.start_job") as span: - span.set_attribute("kaapi.project_id", project_id) - span.set_attribute("kaapi.organization_id", organization_id) - trace_id = correlation_id.get() or "N/A" + _set_traceability_attributes( + span, + project_id=project_id, + organization_id=organization_id, + trace_id=trace_id, + ) + job_crud = JobCrud(session=db) job = job_crud.create( job_type=JobType.LLM_API, trace_id=trace_id, project_id=project_id ) - span.set_attribute("llm.job_id", str(job.id)) + _set_traceability_attributes(span, job_id=job.id) logger.info( f"[start_job] Created job | job_id={job.id}, status={job.status}, project_id={project_id}" @@ -126,7 +157,7 @@ def start_job( status_code=500, detail="Internal server error while executing LLM call" ) - span.set_attribute("celery.task_id", str(task_id)) + _set_traceability_attributes(span, task_id=str(task_id)) logger.info( f"[start_job] Job scheduled for LLM call | job_id={job.id}, project_id={project_id}, task_id={task_id}" ) @@ -149,7 +180,14 @@ def start_chain_job( job_id=job.id, project_id=project_id, organization_id=organization_id, - ): + ), tracer.start_as_current_span("llm.chain.start_job") as span: + _set_traceability_attributes( + span, + job_id=job.id, + trace_id=trace_id, + project_id=project_id, + organization_id=organization_id, + ) logger.info( f"[start_chain_job] Created job | job_id={job.id}, status={job.status}, project_id={project_id}" ) @@ -163,6 +201,8 @@ def start_chain_job( organization_id=organization_id, ) except Exception as e: + span.record_exception(e) + span.set_status(trace.Status(trace.StatusCode.ERROR, str(e))) logger.error( f"[start_chain_job] Error starting Celery task: {str(e)} | job_id={job.id}, project_id={project_id}", exc_info=True, @@ -174,6 +214,7 @@ def start_chain_job( detail="Internal server error while executing LLM chain job", ) + _set_traceability_attributes(span, task_id=str(task_id)) logger.info( f"[start_chain_job] Job scheduled for LLM chain job | job_id={job.id}, project_id={project_id}, task_id={task_id}" ) @@ -193,6 +234,13 @@ def handle_job_error( with tracer.start_as_current_span("llm.send_callback") as cb_span: cb_span.set_attribute("callback.url", callback_url) cb_span.set_attribute("callback.status", "failure") + _set_traceability_attributes( + cb_span, + job_id=job_id, + trace_id=correlation_id.get(), + project_id=project_id, + organization_id=organization_id, + ) send_callback( callback_url=callback_url, data=callback_response.model_dump(), @@ -415,11 +463,19 @@ def execute_llm_call( config_blob: ConfigBlob | None = None llm_call_id: UUID | None = None + trace_id = correlation_id.get() try: with Session(engine) as session: with tracer.start_as_current_span("llm.resolve_config") as cfg_span: - cfg_span.set_attribute("llm.job_id", str(job_id)) + _set_traceability_attributes( + cfg_span, + job_id=job_id, + chain_id=chain_id, + trace_id=trace_id, + project_id=project_id, + organization_id=organization_id, + ) cfg_span.set_attribute("llm.config.is_stored", config.is_stored_config) if config.is_stored_config: cfg_span.set_attribute("llm.config.id", str(config.id)) @@ -440,7 +496,14 @@ def execute_llm_call( query.input.content.value = interpolated with tracer.start_as_current_span("llm.guardrails.input") as guard_span: - guard_span.set_attribute("llm.job_id", str(job_id)) + _set_traceability_attributes( + guard_span, + job_id=job_id, + chain_id=chain_id, + trace_id=trace_id, + project_id=project_id, + organization_id=organization_id, + ) query, input_error, guardrail_direct_response = apply_input_guardrails( config_blob=config_blob, query=query, @@ -497,7 +560,14 @@ def execute_llm_call( ) with tracer.start_as_current_span("llm.create_call_record") as create_span: - create_span.set_attribute("llm.job_id", str(job_id)) + _set_traceability_attributes( + create_span, + job_id=job_id, + chain_id=chain_id, + trace_id=trace_id, + project_id=project_id, + organization_id=organization_id, + ) create_span.set_attribute( "llm.provider", str(completion_config.provider) ) @@ -520,7 +590,7 @@ def execute_llm_call( chain_id=chain_id, ) llm_call_id = llm_call.id - create_span.set_attribute("llm.call_id", str(llm_call_id)) + _set_traceability_attributes(create_span, llm_call_id=llm_call_id) logger.info( f"[execute_llm_call] Created LLM call record | " f"llm_call_id={llm_call_id}, job_id={job_id}" @@ -572,6 +642,15 @@ def execute_llm_call( ai_span_name = f"chat {model_name}" if model_name else f"chat {provider_name}" with tracer.start_as_current_span(ai_span_name) as ai_span: ai_span.set_attribute("sentry.op", "gen_ai.chat") + _set_traceability_attributes( + ai_span, + job_id=job_id, + llm_call_id=llm_call_id, + chain_id=chain_id, + trace_id=trace_id, + project_id=project_id, + organization_id=organization_id, + ) if completion_type: ai_span.set_attribute("completion_type", completion_type) set_gen_ai_request_attributes( @@ -589,6 +668,15 @@ def execute_llm_call( with tracer.start_as_current_span( "llm.provider.execute" ) as provider_span: + _set_traceability_attributes( + provider_span, + job_id=job_id, + llm_call_id=llm_call_id, + chain_id=chain_id, + trace_id=trace_id, + project_id=project_id, + organization_id=organization_id, + ) provider_span.set_attribute("llm.provider", provider_name) provider_span.set_attribute( "llm.operation.name", "provider.execute" @@ -638,8 +726,15 @@ def execute_llm_call( with tracer.start_as_current_span( "llm.update_call_record" ) as update_span: - update_span.set_attribute("llm.call_id", str(llm_call_id)) - update_span.set_attribute("llm.job_id", str(job_id)) + _set_traceability_attributes( + update_span, + job_id=job_id, + llm_call_id=llm_call_id, + chain_id=chain_id, + trace_id=trace_id, + project_id=project_id, + organization_id=organization_id, + ) try: update_llm_call_response( session, @@ -684,7 +779,15 @@ def execute_llm_call( with tracer.start_as_current_span( "llm.guardrails.output" ) as out_guard_span: - out_guard_span.set_attribute("llm.job_id", str(job_id)) + _set_traceability_attributes( + out_guard_span, + job_id=job_id, + llm_call_id=llm_call_id, + chain_id=chain_id, + trace_id=trace_id, + project_id=project_id, + organization_id=organization_id, + ) result, output_error = apply_output_guardrails( config_blob=config_blob, result=result, @@ -749,6 +852,14 @@ def execute_job( project_id=project_id, organization_id=organization_id, ): + _set_traceability_attributes( + trace.get_current_span(), + job_id=job_uuid, + trace_id=correlation_id.get(), + project_id=project_id, + organization_id=organization_id, + task_id=task_id, + ) logger.info( f"[execute_job] Starting LLM job execution | job_id={job_id}, task_id={task_id}, callback_url {callback_url_str}" ) @@ -791,7 +902,14 @@ def execute_job( with tracer.start_as_current_span("llm.send_callback") as cb_span: cb_span.set_attribute("callback.url", callback_url_str) cb_span.set_attribute("callback.status", "success") - cb_span.set_attribute("llm.job_id", str(job_uuid)) + _set_traceability_attributes( + cb_span, + job_id=job_uuid, + trace_id=correlation_id.get(), + project_id=project_id, + organization_id=organization_id, + task_id=task_id, + ) send_callback( callback_url=callback_url_str, data=callback_response.model_dump(), @@ -872,6 +990,14 @@ def execute_chain_job( organization_id=organization_id, total_blocks=len(request.blocks), ): + _set_traceability_attributes( + trace.get_current_span(), + job_id=job_uuid, + trace_id=correlation_id.get(), + project_id=project_id, + organization_id=organization_id, + task_id=task_id, + ) logger.info( f"[execute_chain_job] Starting chain execution | " f"job_id={job_uuid}, total_blocks={len(request.blocks)}" @@ -889,6 +1015,9 @@ def execute_chain_job( configs=[block.model_dump(mode="json") for block in request.blocks], ) chain_uuid = chain_record.id + _set_traceability_attributes( + trace.get_current_span(), chain_id=chain_uuid + ) logger.info( f"[execute_chain_job] Created chain record | " @@ -949,7 +1078,7 @@ def execute_chain_job( error=str(e), ) except Exception as update_err: - logger.error( + logger.warning( f"[execute_chain_job] Failed to update chain status: {update_err} | " f"chain_id={chain_uuid}", exc_info=True, diff --git a/backend/app/services/llm/providers/eai.py b/backend/app/services/llm/providers/eai.py index ba5de8614..81ce6d3ca 100644 --- a/backend/app/services/llm/providers/eai.py +++ b/backend/app/services/llm/providers/eai.py @@ -301,7 +301,7 @@ def execute( except ValueError as e: error_message = f"Input validation error: {str(e)}" - logger.error( + logger.warning( f"[ElevenlabsAIProvider.execute] {error_message} | provider={provider_name}", exc_info=True, ) diff --git a/backend/app/services/llm/providers/oai.py b/backend/app/services/llm/providers/oai.py index 758f4bc7d..a93bd76c7 100644 --- a/backend/app/services/llm/providers/oai.py +++ b/backend/app/services/llm/providers/oai.py @@ -141,7 +141,7 @@ def execute( from app.utils import handle_openai_error error_message = handle_openai_error(e) - logger.error( + logger.warning( f"[OpenAIProvider.execute] OpenAI API error: {error_message} | provider={completion_config.provider}", exc_info=True, ) diff --git a/backend/app/services/llm/providers/sai.py b/backend/app/services/llm/providers/sai.py index 7d2e4fd8d..f4a6cc5e7 100644 --- a/backend/app/services/llm/providers/sai.py +++ b/backend/app/services/llm/providers/sai.py @@ -283,7 +283,7 @@ def execute( except ValueError as e: error_message = f"Input validation error: {str(e)}" - logger.error( + logger.warning( f"[SarvamAIProvider.execute] {error_message} | provider={provider_name}", exc_info=True, ) diff --git a/backend/app/services/response/response.py b/backend/app/services/response/response.py index 681606406..2c021649d 100644 --- a/backend/app/services/response/response.py +++ b/backend/app/services/response/response.py @@ -146,7 +146,7 @@ def generate_response( except openai.OpenAIError as e: error_message = handle_openai_error(e) - logger.error( + logger.warning( f"[process_response_task] OpenAI API error: {error_message}", exc_info=True, ) @@ -225,7 +225,7 @@ def process_response( assistant = get_assistant_by_id(session, assistant_id, project_id) if not assistant: - logger.error( + logger.warning( f"[process_response_task] Assistant not found: " f"assistant_id={mask_string(assistant_id)}, project_id={project_id}" ) diff --git a/backend/app/services/stt_evaluations/batch_job.py b/backend/app/services/stt_evaluations/batch_job.py index 69648dc21..05e77b4c1 100644 --- a/backend/app/services/stt_evaluations/batch_job.py +++ b/backend/app/services/stt_evaluations/batch_job.py @@ -66,7 +66,7 @@ def execute_batch_submission( ) if not samples: - logger.error( + logger.warning( f"[execute_batch_submission] No samples found | " f"run_id: {run_id}, dataset_id: {dataset_id}" ) diff --git a/backend/app/services/stt_evaluations/metric_job.py b/backend/app/services/stt_evaluations/metric_job.py index 3d1e272a7..0b0f612c8 100644 --- a/backend/app/services/stt_evaluations/metric_job.py +++ b/backend/app/services/stt_evaluations/metric_job.py @@ -119,7 +119,7 @@ def execute_metric_computation( all_scores.append(scores) scored_count += 1 except Exception as e: - logger.error( + logger.warning( f"[execute_metric_computation] Metric calculation failed | " f"result_id: {result.id}, error: {e}", exc_info=True, diff --git a/backend/app/services/tts_evaluations/batch_job.py b/backend/app/services/tts_evaluations/batch_job.py index fc89500a9..38bd5107a 100644 --- a/backend/app/services/tts_evaluations/batch_job.py +++ b/backend/app/services/tts_evaluations/batch_job.py @@ -84,7 +84,7 @@ def execute_batch_submission( sample_texts = get_sample_texts_from_dataset(session, dataset, project_id) if not sample_texts: - logger.error( + logger.warning( f"[execute_batch_submission] No samples found | " f"run_id: {run_id}, dataset_id: {dataset_id}" ) diff --git a/backend/app/services/tts_evaluations/batch_result_processing.py b/backend/app/services/tts_evaluations/batch_result_processing.py index 390945514..d6fac1de4 100644 --- a/backend/app/services/tts_evaluations/batch_result_processing.py +++ b/backend/app/services/tts_evaluations/batch_result_processing.py @@ -185,7 +185,7 @@ def execute_tts_result_processing( processed_count += 1 except Exception as audio_err: - logger.error( + logger.warning( f"[execute_tts_result_processing] Audio processing failed | " f"result_id={result_id}, error={str(audio_err)}" ) diff --git a/backend/app/utils.py b/backend/app/utils.py index ab9407973..36dfe3662 100644 --- a/backend/app/utils.py +++ b/backend/app/utils.py @@ -262,7 +262,7 @@ def get_openai_client(session: Session, org_id: int, project_id: int) -> OpenAI: ) if not credentials or "api_key" not in credentials: - logger.error( + logger.warning( f"[get_openai_client] OpenAI credentials not found. | project_id: {project_id}" ) raise HTTPException( @@ -273,7 +273,7 @@ def get_openai_client(session: Session, org_id: int, project_id: int) -> OpenAI: try: return OpenAI(api_key=credentials["api_key"]) except Exception as e: - logger.error( + logger.warning( f"[get_openai_client] Failed to configure OpenAI client. | project_id: {project_id} | error: {str(e)}", exc_info=True, ) @@ -297,7 +297,7 @@ def get_langfuse_client(session: Session, org_id: int, project_id: int) -> Langf if not credentials or not all( key in credentials for key in ["public_key", "secret_key", "host"] ): - logger.error( + logger.warning( f"[get_langfuse_client] Langfuse credentials not found or incomplete. | project_id: {project_id}" ) raise HTTPException( @@ -313,7 +313,7 @@ def get_langfuse_client(session: Session, org_id: int, project_id: int) -> Langf timeout=60, ) except Exception as e: - logger.error( + logger.warning( f"[get_langfuse_client] Failed to configure Langfuse client. | project_id: {project_id} | error: {str(e)}", exc_info=True, ) @@ -493,7 +493,7 @@ def send_callback( try: validate_callback_url(str(callback_url)) except ValueError as ve: - logger.error(f"[send_callback] Invalid callback URL: {ve}", exc_info=True) + logger.warning(f"[send_callback] Invalid callback URL: {ve}", exc_info=True) return False try: raw_body = json.dumps(data, separators=(",", ":")).encode() @@ -656,7 +656,7 @@ def resolve_input( return "", f"Unknown input type: {type(query_input)}" except Exception as e: - logger.error(f"[resolve_input] Failed to resolve input: {e}", exc_info=True) + logger.warning(f"[resolve_input] Failed to resolve input: {e}", exc_info=True) return "", f"Failed to resolve input: {str(e)}" diff --git a/scripts/python/invoke-cron.py b/scripts/python/invoke-cron.py index 45d4119c4..4bf1c9a7e 100644 --- a/scripts/python/invoke-cron.py +++ b/scripts/python/invoke-cron.py @@ -162,7 +162,7 @@ async def run(self): logger.info("Shutting down gracefully...") break except Exception as e: - logger.error(f"Error during invocation: {e}") + logger.warning(f"Error during invocation: {e}") # Wait before retrying on error logger.info(f"Waiting {self.interval_seconds} seconds before retry") await asyncio.sleep(self.interval_seconds)