Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions langfuse/_task_manager/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import atexit
import logging
import os
import queue
import weakref
from queue import Queue
from typing import List, Optional

Expand Down Expand Up @@ -35,6 +37,7 @@ class TaskManager(object):
_sdk_integration: str
_sample_rate: float
_mask: Optional[MaskFunction]
_shutdown: bool

def __init__(
self,
Expand Down Expand Up @@ -77,12 +80,60 @@ def __init__(
self._enabled = enabled
self._sample_rate = sample_rate
self._mask = mask
self._shutdown = False

self.init_resources()

# cleans up when the python interpreter closes
atexit.register(self.shutdown)

# Register fork handler to reinitialize consumer threads in child process.
# When using Gunicorn with --preload, os.fork() copies memory but not threads.
# Without this, worker processes have no consumer threads and all events are lost.
if hasattr(os, "register_at_fork"):
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
os.register_at_fork(
after_in_child=lambda: weak_reinit()() if weak_reinit() is not None else None
)

def _at_fork_reinit(self):
"""Reinitialize consumer threads after fork in child process.

Called automatically by os.register_at_fork() after fork().
Necessary for Gunicorn --preload deployments where os.fork() is used:
threads are not copied to child processes (POSIX standard), so without
reinitialization, the child process has no consumer threads and all
ingestion events are silently lost.
"""
if self._shutdown:
return

self._log.debug(
f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads"
)

# Clear existing consumer references (threads are dead in child process)
self._ingestion_consumers = []
self._media_upload_consumers = []

# Recreate queues (old queues may have shared state with parent process)
self._ingestion_queue = queue.Queue(self._max_task_queue_size)
self._media_upload_queue = Queue(self._max_task_queue_size)

# Recreate MediaManager with new queue
self._media_manager = MediaManager(
api_client=self._api_client,
media_upload_queue=self._media_upload_queue,
max_retries=self._max_retries,
)

# Start fresh consumer threads in child process
self.init_resources()

self._log.debug(
f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork"
)
Comment on lines +99 to +135
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 _at_fork_reinit restarts threads on an already-shutdown manager

os.register_at_fork callbacks are never removed, so if shutdown() is called in the master process before Gunicorn forks, the child process will still call _at_fork_reinit. This restarts consumer threads on a manager that was intentionally torn down, and the child's atexit-registered shutdown() will then try to flush and join them — potentially generating spurious outbound API traffic or blocking on exit in a worker that was never meant to send data. Adding a guard like a _shutdown flag would prevent reinitialization on already-stopped instances.

Prompt To Fix With AI
This is a comment left during a code review.
Path: langfuse/_task_manager/task_manager.py
Line: 93-126

Comment:
**`_at_fork_reinit` restarts threads on an already-shutdown manager**

`os.register_at_fork` callbacks are never removed, so if `shutdown()` is called in the master process before Gunicorn forks, the child process will still call `_at_fork_reinit`. This restarts consumer threads on a manager that was intentionally torn down, and the child's atexit-registered `shutdown()` will then try to flush and join them — potentially generating spurious outbound API traffic or blocking on exit in a worker that was never meant to send data. Adding a guard like a `_shutdown` flag would prevent reinitialization on already-stopped instances.

How can I resolve this? If you propose a fix, please make it concise.


def init_resources(self):
for i in range(self._threads):
ingestion_consumer = IngestionConsumer(
Expand Down Expand Up @@ -191,6 +242,8 @@ def shutdown(self):
"""Flush all messages and cleanly shutdown the client."""
self._log.debug("shutdown initiated")

self._shutdown = True

# Unregister the atexit handler first
atexit.unregister(self.shutdown)

Expand Down