fix(task_manager): reinitialize consumer threads after os.fork()#1654
fix(task_manager): reinitialize consumer threads after os.fork()#1654pyg410 wants to merge 3 commits into
Conversation
Signed-off-by: Yeonggyu Park <pon05114@naver.com>
| # When using Gunicorn with --preload, os.fork() copies memory but not threads. | ||
| # Without this, worker processes have no consumer threads and all events are lost. | ||
| if hasattr(os, "register_at_fork"): | ||
| os.register_at_fork(after_in_child=self._at_fork_reinit) |
There was a problem hiding this comment.
Fork handler accumulates strong references, preventing GC
os.register_at_fork stores the callback permanently in a process-level list. Passing the bound method self._at_fork_reinit keeps a strong reference to each TaskManager instance alive for the lifetime of the process — including all its associated threads, queues, and API clients. There is no way to un-register a fork handler. If users create multiple Langfuse() clients (e.g. across tests or re-initializations), every TaskManager will be pinned in memory and its background threads will remain running indefinitely, even after the user drops all references. Using a weakref and skipping the callback when the object has already been collected would prevent this leak.
Prompt To Fix With AI
This is a comment left during a code review.
Path: langfuse/_task_manager/task_manager.py
Line: 91
Comment:
**Fork handler accumulates strong references, preventing GC**
`os.register_at_fork` stores the callback permanently in a process-level list. Passing the bound method `self._at_fork_reinit` keeps a strong reference to each `TaskManager` instance alive for the lifetime of the process — including all its associated threads, queues, and API clients. There is no way to un-register a fork handler. If users create multiple `Langfuse()` clients (e.g. across tests or re-initializations), every `TaskManager` will be pinned in memory and its background threads will remain running indefinitely, even after the user drops all references. Using a `weakref` and skipping the callback when the object has already been collected would prevent this leak.
How can I resolve this? If you propose a fix, please make it concise.| def _at_fork_reinit(self): | ||
| """Reinitialize consumer threads after fork in child process. | ||
|
|
||
| Called automatically by os.register_at_fork() after fork(). | ||
| Necessary for Gunicorn --preload deployments where os.fork() is used: | ||
| threads are not copied to child processes (POSIX standard), so without | ||
| reinitialization, the child process has no consumer threads and all | ||
| ingestion events are silently lost. | ||
| """ | ||
| self._log.debug( | ||
| f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads" | ||
| ) | ||
|
|
||
| # Clear existing consumer references (threads are dead in child process) | ||
| self._ingestion_consumers = [] | ||
| self._media_upload_consumers = [] | ||
|
|
||
| # Recreate queues (old queues may have shared state with parent process) | ||
| self._ingestion_queue = queue.Queue(self._max_task_queue_size) | ||
| self._media_upload_queue = Queue(self._max_task_queue_size) | ||
|
|
||
| # Recreate MediaManager with new queue | ||
| self._media_manager = MediaManager( | ||
| api_client=self._api_client, | ||
| media_upload_queue=self._media_upload_queue, | ||
| max_retries=self._max_retries, | ||
| ) | ||
|
|
||
| # Start fresh consumer threads in child process | ||
| self.init_resources() | ||
|
|
||
| self._log.debug( | ||
| f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork" | ||
| ) |
There was a problem hiding this comment.
_at_fork_reinit restarts threads on an already-shutdown manager
os.register_at_fork callbacks are never removed, so if shutdown() is called in the master process before Gunicorn forks, the child process will still call _at_fork_reinit. This restarts consumer threads on a manager that was intentionally torn down, and the child's atexit-registered shutdown() will then try to flush and join them — potentially generating spurious outbound API traffic or blocking on exit in a worker that was never meant to send data. Adding a guard like a _shutdown flag would prevent reinitialization on already-stopped instances.
Prompt To Fix With AI
This is a comment left during a code review.
Path: langfuse/_task_manager/task_manager.py
Line: 93-126
Comment:
**`_at_fork_reinit` restarts threads on an already-shutdown manager**
`os.register_at_fork` callbacks are never removed, so if `shutdown()` is called in the master process before Gunicorn forks, the child process will still call `_at_fork_reinit`. This restarts consumer threads on a manager that was intentionally torn down, and the child's atexit-registered `shutdown()` will then try to flush and join them — potentially generating spurious outbound API traffic or blocking on exit in a worker that was never meant to send data. Adding a guard like a `_shutdown` flag would prevent reinitialization on already-stopped instances.
How can I resolve this? If you propose a fix, please make it concise.Signed-off-by: Yeonggyu Park <pon05114@naver.com>
|
This fix follows the same pattern used in the OpenTelemetry Python SDK |
Signed-off-by: Yeonggyu Park <pon05114@naver.com>
What does this PR do?
When using Gunicorn with
--preload,os.fork()copies memory but not threads (POSIX standard).Worker processes inherit the
TaskManagerobject but have no consumer threads, so all ingestionevents are silently lost and calling
flush()blocks forever onqueue.join()→ Gunicorn workertimeout (SIGABRT).
Fix by registering
os.register_at_fork(after_in_child=self._at_fork_reinit)inTaskManager.__init__to reinitialize consumer threads, queues, and
MediaManagerin the child process after fork.Reproduction example: https://github.com/pyg410/langfuse-gunicorn-example
Type of change
Verification
Checklist
code_review.md..env.templateif needed.Greptile Summary
This PR fixes silent data loss and
flush()deadlocks in Gunicorn--preloaddeployments by registering anos.register_at_fork(after_in_child=...)callback that recreates consumer threads, queues, andMediaManagerin each forked worker process.register_at_fork+ full reinitialization of queues and threads) correctly addresses the POSIX fork-thread problem and matches the pattern used by Python's ownthreadingmodule.TaskManagerand all its resources when multiple clients are created), and there is no guard to skip reinitialization when the manager was already shut down before the fork occurred.Confidence Score: 3/5
The fix correctly solves the Gunicorn --preload data-loss bug, but two defects in the implementation need to be addressed before merging.
The fork handler permanently pins every TaskManager instance in memory (along with its threads, queues, and API client) because os.register_at_fork holds an unbreakable strong reference to the bound method. In applications or test suites that create multiple Langfuse clients this becomes a resource leak with no way to recover. Additionally, if shutdown() is called in the master process before Gunicorn forks, _at_fork_reinit will silently restart consumer threads in the child on a manager that was intentionally stopped.
langfuse/_task_manager/task_manager.py — both the fork-handler reference lifetime and the missing shutdown-guard need to be revisited before this ships.
Sequence Diagram
sequenceDiagram participant MP as Master Process participant OS as os.fork() participant CP as Child Process (Worker) MP->>MP: TaskManager.__init__() MP->>MP: init_resources() — start consumer threads MP->>MP: atexit.register(self.shutdown) MP->>MP: "os.register_at_fork(after_in_child=_at_fork_reinit)" MP->>OS: Gunicorn calls fork() OS-->>CP: Child inherits memory (queues, config, atexit handlers) Note over CP: Consumer threads NOT inherited (POSIX) OS->>CP: Calls _at_fork_reinit() [after_in_child] CP->>CP: Clear _ingestion_consumers and _media_upload_consumers CP->>CP: Recreate _ingestion_queue and _media_upload_queue CP->>CP: Recreate MediaManager (fresh locks) CP->>CP: init_resources() — start new consumer threads CP->>CP: Process requests → add_task() → queue CP->>CP: Consumer threads drain queue → send to Langfuse API CP->>CP: On exit: atexit fires shutdown() → flush() + join()Prompt To Fix All With AI
Reviews (1): Last reviewed commit: "fix(task_manager): reinitialize consumer..." | Re-trigger Greptile
Context used:
Learned From
langfuse/langfuse-python#1387