Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions backend/app/api/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from app.core.config import settings
from app.core.db import engine
from app.core.security import api_key_manager
from app.core.telemetry import set_request_log_context
from app.crud.organization import validate_organization
from app.crud.project import validate_project
from app.models import (
Expand Down Expand Up @@ -44,16 +45,25 @@ def get_db() -> Generator[Session, None, None]:


def _set_tenant_span_attributes(auth_context: AuthContext) -> None:
"""Tag the active OTel span with tenant context so traces in Sentry can be
filtered by user / org / project IDs."""
"""Tag the active OTel span and log context with tenant info after auth.

Sets org/project on:
- OTel span → Sentry traces filterable by tenant
- log context → every log record in this request carries org_id/project_id
- Sentry scope → tags on all events for this request
"""
span = trace.get_current_span()
if not span.is_recording():
return
span.set_attribute("user.id", str(auth_context.user.id))
if auth_context.organization:
span.set_attribute("tenant.org_id", auth_context.organization.id)
if auth_context.project:
span.set_attribute("tenant.project_id", auth_context.project.id)
if span.is_recording():
span.set_attribute("user.id", str(auth_context.user.id))
if auth_context.organization:
span.set_attribute("tenant.org_id", auth_context.organization.id)
if auth_context.project:
span.set_attribute("tenant.project_id", auth_context.project.id)

set_request_log_context(
org_id=auth_context.organization.id if auth_context.organization else None,
project_id=auth_context.project.id if auth_context.project else None,
)


def _authenticate_with_jwt(session: Session, token: str) -> AuthContext:
Expand Down
4 changes: 2 additions & 2 deletions backend/app/api/routes/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def update_credential(
_current_user: AuthContextDep,
):
if not creds_in or not creds_in.provider or not creds_in.credential:
logger.error(
logger.warning(
f"[update_credential] Invalid input | organization_id: {_current_user.organization_.id}, project_id: {_current_user.project_.id}"
)
raise HTTPException(
Expand Down Expand Up @@ -256,7 +256,7 @@ def update_credential_by_org_project(
_current_user: AuthContextDep,
):
if not creds_in or not creds_in.provider or not creds_in.credential:
logger.error(
logger.warning(
f"[update_credential_by_org_project] Invalid input | organization_id: {org_id}, project_id: {project_id}"
)
raise HTTPException(
Expand Down
2 changes: 1 addition & 1 deletion backend/app/api/routes/model_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def evaluate_models(
# even though the client will be initialized separately inside the background task

if not request.fine_tuning_ids:
logger.error(
logger.warning(
f"[evaluate_model] No fine tuning IDs provided | project_id:{current_user.project_.id}"
)
raise HTTPException(status_code=400, detail="No fine-tuned job IDs provided")
Expand Down
7 changes: 0 additions & 7 deletions backend/app/api/routes/organization.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ def read_organization(
"""
org = get_organization_by_id(session=session, org_id=org_id)
if org is None:
logger.error(f"[read_organization] Organization not found | org_id={org_id}")
raise HTTPException(status_code=404, detail="Organization not found")
return APIResponse.success_response(org)

Expand All @@ -87,9 +86,6 @@ def update_organization(
) -> APIResponse[OrganizationPublic]:
org = get_organization_by_id(session=session, org_id=org_id)
if org is None:
logger.error(
f"[update_organization] Organization not found | 'org_id': {org_id}"
)
raise HTTPException(status_code=404, detail="Organization not found")

org_data = org_in.model_dump(exclude_unset=True)
Expand All @@ -115,9 +111,6 @@ def update_organization(
def delete_organization(session: SessionDep, org_id: int) -> APIResponse[None]:
org = get_organization_by_id(session=session, org_id=org_id)
if org is None:
logger.error(
f"[delete_organization] Organization not found | 'org_id': {org_id}"
)
raise HTTPException(status_code=404, detail="Organization not found")

session.delete(org)
Expand Down
3 changes: 0 additions & 3 deletions backend/app/api/routes/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def read_project(*, session: SessionDep, project_id: int):
"""
project = get_project_by_id(session=session, project_id=project_id)
if project is None:
logger.error(f"[read_project] Project not found | project_id={project_id}")
raise HTTPException(status_code=404, detail="Project not found")
return APIResponse.success_response(project)

Expand All @@ -81,7 +80,6 @@ def read_project(*, session: SessionDep, project_id: int):
def update_project(*, session: SessionDep, project_id: int, project_in: ProjectUpdate):
project = get_project_by_id(session=session, project_id=project_id)
if project is None:
logger.error(f"[update_project] Project not found | project_id={project_id}")
raise HTTPException(status_code=404, detail="Project not found")

project_data = project_in.model_dump(exclude_unset=True)
Expand All @@ -106,7 +104,6 @@ def update_project(*, session: SessionDep, project_id: int, project_in: ProjectU
def delete_project(session: SessionDep, project_id: int):
project = get_project_by_id(session=session, project_id=project_id)
if project is None:
logger.error(f"[delete_project] Project not found | project_id={project_id}")
raise HTTPException(status_code=404, detail="Project not found")

session.delete(project)
Expand Down
11 changes: 4 additions & 7 deletions backend/app/api/routes/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def validate_thread(client: OpenAI, thread_id: str) -> tuple[bool, str]:
if runs.data and len(runs.data) > 0:
latest_run = runs.data[0]
if latest_run.status in ["queued", "in_progress", "requires_action"]:
logger.error(
logger.warning(
f"[validate_thread] Thread ID {mask_string(thread_id)} is currently {latest_run.status}."
)
return (
Expand Down Expand Up @@ -305,7 +305,7 @@ async def threads(
)
client, success = configure_openai(credentials)
if not success:
logger.error(
logger.warning(
f"[threads] OpenAI API key not configured for this organization. | organization_id: {_current_user.organization_.id}, project_id: {request.get('project_id')}"
)
return APIResponse.failure_response(
Expand Down Expand Up @@ -379,7 +379,7 @@ async def threads_sync(
# Configure OpenAI client
client, success = configure_openai(credentials)
if not success:
logger.error(
logger.warning(
f"[threads_sync] OpenAI API key not configured for this organization. | organization_id: {_current_user.organization_.id}, project_id: {request.get('project_id')}"
)
return APIResponse.failure_response(
Expand Down Expand Up @@ -447,7 +447,7 @@ async def start_thread(
# Configure OpenAI client
client, success = configure_openai(credentials)
if not success:
logger.error(
logger.warning(
f"[start_thread] OpenAI API key not configured for this organization. | project_id: {_current_user.project_.id}"
)
return APIResponse.failure_response(
Expand Down Expand Up @@ -501,9 +501,6 @@ async def get_thread(
result = get_thread_result(db, thread_id)

if not result:
logger.error(
f"[get_thread] Thread result not found for ID: {mask_string(thread_id)} | org_id: {_current_user.organization_.id}"
)
raise HTTPException(404, "thread not found")

status = result.status or ("success" if result.response else "processing")
Expand Down
14 changes: 6 additions & 8 deletions backend/app/api/routes/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def read_users(session: SessionDep, skip: int = 0, limit: int = 100) -> Any:
)
def create_user_endpoint(*, session: SessionDep, user_in: UserCreate) -> Any:
if get_user_by_email(session=session, email=user_in.email):
logger.error(
logger.warning(
f"[create_user_endpoint] Attempting to create user with existing email | email: {user_in.email}"
)
Comment on lines +53 to 55
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential PII exposure: email addresses logged without masking.

Email addresses are PII and are being logged directly. Consider using mask_string to protect user privacy. This pattern repeats in lines 84, 151, and 202.

Suggested fix for this and similar occurrences
+from app.utils import mask_string
+
 ...
         logger.warning(
-            f"[create_user_endpoint] Attempting to create user with existing email | email: {user_in.email}"
+            f"[create_user_endpoint] Attempting to create user with existing email | email: {mask_string(user_in.email)}"
         )

Apply similar masking to:

  • Line 84: update_user_me
  • Line 151: register_user
  • Line 202: update_user_endpoint

As per coding guidelines: Prefix all log messages with the function name in square brackets: logger.info(f"[function_name] Message {mask_string(sensitive_value)}")

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
logger.warning(
f"[create_user_endpoint] Attempting to create user with existing email | email: {user_in.email}"
)
from app.utils import mask_string
...
logger.warning(
f"[create_user_endpoint] Attempting to create user with existing email | email: {mask_string(user_in.email)}"
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/api/routes/users.py` around lines 53 - 55, The logs currently
print raw email PII (e.g., in logger.warning inside create_user_endpoint and
similar spots); replace direct email usage with the mask_string utility and
ensure each log message is prefixed with the function name in square brackets
(e.g., use mask_string(user_in.email) in the logger call in
create_user_endpoint), and apply the same change to update_user_me,
register_user, and update_user_endpoint so all email logging uses
mask_string(sensitive_value) and follows the "[function_name] Message" prefix
convention.

raise HTTPException(
Expand Down Expand Up @@ -80,7 +80,7 @@ def update_user_me(
if user_in.email:
existing_user = get_user_by_email(session=session, email=user_in.email)
if existing_user and existing_user.id != current_user.id:
logger.error(
logger.warning(
f"[update_user_me] Attempting to update user with existing email | email: {user_in.email}, user_id: {current_user.id}"
)
raise HTTPException(
Expand Down Expand Up @@ -125,7 +125,7 @@ def read_user_me(current_user_dep: AuthContextDep) -> Any:
def delete_user_me(session: SessionDep, current_user_dep: AuthContextDep) -> Any:
current_user = current_user_dep.user
if current_user.is_superuser:
logger.error(
logger.warning(
f"[delete_user_me] Attempting to delete superuser account by itself | user_id: {current_user.id}"
)
raise HTTPException(
Expand All @@ -147,7 +147,7 @@ def register_user(session: SessionDep, user_in: UserRegister) -> Any:
This endpoint allows the registration of a new user and is accessible only by a superuser.
"""
if get_user_by_email(session=session, email=user_in.email):
logger.error(
logger.warning(
f"[register_user] Attempting to create user with existing email | email: {user_in.email}"
)
raise HTTPException(
Expand Down Expand Up @@ -190,7 +190,6 @@ def update_user_endpoint(
) -> Any:
db_user = session.get(User, user_id)
if not db_user:
logger.error(f"[update_user_endpoint] User not found | user_id: {user_id}")
raise HTTPException(
status_code=404,
detail="The user with this id does not exist in the system",
Expand All @@ -199,7 +198,7 @@ def update_user_endpoint(
if user_in.email:
existing_user = get_user_by_email(session=session, email=user_in.email)
if existing_user and existing_user.id != user_id:
logger.error(
logger.warning(
f"[update_user_endpoint] Attempting to update user with existing email | email: {user_in.email}, user_id: {user_id}"
)
raise HTTPException(
Expand All @@ -219,11 +218,10 @@ def delete_user(
) -> Message:
user = session.get(User, user_id)
if not user:
logger.error(f"[delete_user] User not found | user_id: {user_id}")
raise HTTPException(status_code=404, detail="User not found")

if user == current_user.user:
logger.error(
logger.warning(
f"[delete_user] Attempting to delete self by superuser | user_id: {current_user.user.id}"
)
raise HTTPException(
Expand Down
10 changes: 8 additions & 2 deletions backend/app/celery/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
task_failure,
task_postrun,
task_prerun,
setup_logging,
worker_process_init,
)
from kombu import Exchange, Queue
Expand All @@ -14,17 +15,22 @@
from app.core.logger import configure_logging
from app.core.sentry_filters import before_send_transaction_filter

configure_logging(service_name="kaapi-celery")

logger = logging.getLogger(__name__)
_telemetry_initialized = False
_sentry_initialized = False
_flush_hook_registered = False


@setup_logging.connect
def configure_celery_logging(**_: object) -> None:
configure_logging(service_name="kaapi-celery")


def _initialize_worker_observability() -> None:
global _telemetry_initialized, _sentry_initialized, _flush_hook_registered

configure_logging(service_name="kaapi-celery")

if settings.SENTRY_DSN and not _sentry_initialized:
import sentry_sdk
from sentry_sdk.integrations.celery import CeleryIntegration
Expand Down
4 changes: 2 additions & 2 deletions backend/app/core/batch/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def from_credentials(
)

if not credentials:
logger.error(
logger.warning(
f"[from_credentials] Gemini credentials not found | "
f"org_id: {org_id}, project_id: {project_id}"
)
Expand All @@ -80,7 +80,7 @@ def from_credentials(

api_key = credentials.get("api_key")
if not api_key:
logger.error(
logger.warning(
f"[from_credentials] Invalid Gemini credentials (missing api_key) | "
f"org_id: {org_id}, project_id: {project_id}"
)
Expand Down
2 changes: 2 additions & 0 deletions backend/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ def AWS_S3_BUCKET(self) -> str:
LOG_DIR: str = os.path.join(os.path.dirname(os.path.dirname(__file__)), "logs")
OTEL_ENABLED: bool = False
OTEL_SERVICE_NAME: str = "kaapi-backend"
BACKEND_SERVICE_NAME: str = "kaapi-backend"
CRON_SERVICE_NAME: str = "kaapi-cron"

# Celery Configuration
CELERY_WORKER_CONCURRENCY: int | None = None
Expand Down
2 changes: 1 addition & 1 deletion backend/app/core/finetune/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def load_labels_and_prompts(self) -> None:
label_col = "label" if "label" in df.columns else None

if not query_col or not label_col:
logger.error(
logger.warning(
"[ModelEvaluator.load_labels_and_prompts] CSV must "
"contain a 'label' column and one of: "
f"{possible_query_columns}"
Expand Down
2 changes: 1 addition & 1 deletion backend/app/core/finetune/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def _load_dataframe(self):
self.label_col = "label" if "label" in df.columns else None

if not self.query_col or not self.label_col:
logger.error(
logger.warning(
f"[DataPreprocessor] Dataset does not contain a 'label' column and one of: {possible_query_columns}"
)
raise ValueError(
Expand Down
Loading