Skip to content

fix(task_manager): reinitialize consumer threads after os.fork()#1654

Open
pyg410 wants to merge 3 commits into
langfuse:v2-stablefrom
pyg410:taskmanager-fork-safety
Open

fix(task_manager): reinitialize consumer threads after os.fork()#1654
pyg410 wants to merge 3 commits into
langfuse:v2-stablefrom
pyg410:taskmanager-fork-safety

Conversation

@pyg410
Copy link
Copy Markdown

@pyg410 pyg410 commented May 14, 2026

What does this PR do?

When using Gunicorn with --preload, os.fork() copies memory but not threads (POSIX standard).
Worker processes inherit the TaskManager object but have no consumer threads, so all ingestion
events are silently lost and calling flush() blocks forever on queue.join() → Gunicorn worker
timeout (SIGABRT).

Fix by registering os.register_at_fork(after_in_child=self._at_fork_reinit) in TaskManager.__init__
to reinitialize consumer threads, queues, and MediaManager in the child process after fork.

Reproduction example: https://github.com/pyg410/langfuse-gunicorn-example

A process shall be created with a single thread. If a multi-threaded process calls fork(), the new process shall contain a replica of the calling thread and its entire address space, possibly including the states of mutexes and other resources. Consequently, to avoid errors, the child process may only execute async-signal-safe operations until such time as one of the exec functions is called.(https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html)

Type of change

  • Bug fix

Verification

# Reproduction example: https://github.com/pyg410/langfuse-gunicorn-example

# issue case (patched task_manager.py NOT applied)
gunicorn app:app -w 4 -k uvicorn.workers.UvicornWorker --preload

# normal case (patched task_manager.py applied)
gunicorn app:app -w 4 -k uvicorn.workers.UvicornWorker --preload

Checklist

  • I self-reviewed the diff using code_review.md.
  • I added or updated tests for behavior changes.
  • I updated docs, examples, or .env.template if needed.
  • I did not hand-edit generated files; if generated files changed, I used the upstream regeneration path.
  • I did not commit secrets or credentials.

Greptile Summary

This PR fixes silent data loss and flush() deadlocks in Gunicorn --preload deployments by registering an os.register_at_fork(after_in_child=...) callback that recreates consumer threads, queues, and MediaManager in each forked worker process.

  • The core approach (register_at_fork + full reinitialization of queues and threads) correctly addresses the POSIX fork-thread problem and matches the pattern used by Python's own threading module.
  • Two issues need attention: the fork handler stores a permanent strong reference to the bound method (preventing GC of TaskManager and all its resources when multiple clients are created), and there is no guard to skip reinitialization when the manager was already shut down before the fork occurred.

Confidence Score: 3/5

The fix correctly solves the Gunicorn --preload data-loss bug, but two defects in the implementation need to be addressed before merging.

The fork handler permanently pins every TaskManager instance in memory (along with its threads, queues, and API client) because os.register_at_fork holds an unbreakable strong reference to the bound method. In applications or test suites that create multiple Langfuse clients this becomes a resource leak with no way to recover. Additionally, if shutdown() is called in the master process before Gunicorn forks, _at_fork_reinit will silently restart consumer threads in the child on a manager that was intentionally stopped.

langfuse/_task_manager/task_manager.py — both the fork-handler reference lifetime and the missing shutdown-guard need to be revisited before this ships.

Sequence Diagram

sequenceDiagram
    participant MP as Master Process
    participant OS as os.fork()
    participant CP as Child Process (Worker)

    MP->>MP: TaskManager.__init__()
    MP->>MP: init_resources() — start consumer threads
    MP->>MP: atexit.register(self.shutdown)
    MP->>MP: "os.register_at_fork(after_in_child=_at_fork_reinit)"

    MP->>OS: Gunicorn calls fork()
    OS-->>CP: Child inherits memory (queues, config, atexit handlers)
    Note over CP: Consumer threads NOT inherited (POSIX)

    OS->>CP: Calls _at_fork_reinit() [after_in_child]
    CP->>CP: Clear _ingestion_consumers and _media_upload_consumers
    CP->>CP: Recreate _ingestion_queue and _media_upload_queue
    CP->>CP: Recreate MediaManager (fresh locks)
    CP->>CP: init_resources() — start new consumer threads

    CP->>CP: Process requests → add_task() → queue
    CP->>CP: Consumer threads drain queue → send to Langfuse API
    CP->>CP: On exit: atexit fires shutdown() → flush() + join()
Loading
Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 3
langfuse/_task_manager/task_manager.py:3-7
`import os` was inserted after `import queue`, breaking PEP 8 alphabetical ordering within the stdlib import block. It should appear before `import queue`.

```suggestion
import atexit
import logging
import os
import queue
from queue import Queue
```

### Issue 2 of 3
langfuse/_task_manager/task_manager.py:91
**Fork handler accumulates strong references, preventing GC**

`os.register_at_fork` stores the callback permanently in a process-level list. Passing the bound method `self._at_fork_reinit` keeps a strong reference to each `TaskManager` instance alive for the lifetime of the process — including all its associated threads, queues, and API clients. There is no way to un-register a fork handler. If users create multiple `Langfuse()` clients (e.g. across tests or re-initializations), every `TaskManager` will be pinned in memory and its background threads will remain running indefinitely, even after the user drops all references. Using a `weakref` and skipping the callback when the object has already been collected would prevent this leak.

### Issue 3 of 3
langfuse/_task_manager/task_manager.py:93-126
**`_at_fork_reinit` restarts threads on an already-shutdown manager**

`os.register_at_fork` callbacks are never removed, so if `shutdown()` is called in the master process before Gunicorn forks, the child process will still call `_at_fork_reinit`. This restarts consumer threads on a manager that was intentionally torn down, and the child's atexit-registered `shutdown()` will then try to flush and join them — potentially generating spurious outbound API traffic or blocking on exit in a worker that was never meant to send data. Adding a guard like a `_shutdown` flag would prevent reinitialization on already-stopped instances.

Reviews (1): Last reviewed commit: "fix(task_manager): reinitialize consumer..." | Re-trigger Greptile

Greptile also left 2 inline comments on this PR.

Context used:

  • Rule used - Move imports to the top of the module instead of p... (source)

Learned From
langfuse/langfuse-python#1387

Signed-off-by: Yeonggyu Park <pon05114@naver.com>
Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude Code Review

This pull request is from a fork — automated review is disabled. A repository maintainer can comment @claude review to run a one-time review.

@CLAassistant
Copy link
Copy Markdown

CLAassistant commented May 14, 2026

CLA assistant check
All committers have signed the CLA.

Comment thread langfuse/_task_manager/task_manager.py Outdated
# When using Gunicorn with --preload, os.fork() copies memory but not threads.
# Without this, worker processes have no consumer threads and all events are lost.
if hasattr(os, "register_at_fork"):
os.register_at_fork(after_in_child=self._at_fork_reinit)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Fork handler accumulates strong references, preventing GC

os.register_at_fork stores the callback permanently in a process-level list. Passing the bound method self._at_fork_reinit keeps a strong reference to each TaskManager instance alive for the lifetime of the process — including all its associated threads, queues, and API clients. There is no way to un-register a fork handler. If users create multiple Langfuse() clients (e.g. across tests or re-initializations), every TaskManager will be pinned in memory and its background threads will remain running indefinitely, even after the user drops all references. Using a weakref and skipping the callback when the object has already been collected would prevent this leak.

Prompt To Fix With AI
This is a comment left during a code review.
Path: langfuse/_task_manager/task_manager.py
Line: 91

Comment:
**Fork handler accumulates strong references, preventing GC**

`os.register_at_fork` stores the callback permanently in a process-level list. Passing the bound method `self._at_fork_reinit` keeps a strong reference to each `TaskManager` instance alive for the lifetime of the process — including all its associated threads, queues, and API clients. There is no way to un-register a fork handler. If users create multiple `Langfuse()` clients (e.g. across tests or re-initializations), every `TaskManager` will be pinned in memory and its background threads will remain running indefinitely, even after the user drops all references. Using a `weakref` and skipping the callback when the object has already been collected would prevent this leak.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +93 to +126
def _at_fork_reinit(self):
"""Reinitialize consumer threads after fork in child process.

Called automatically by os.register_at_fork() after fork().
Necessary for Gunicorn --preload deployments where os.fork() is used:
threads are not copied to child processes (POSIX standard), so without
reinitialization, the child process has no consumer threads and all
ingestion events are silently lost.
"""
self._log.debug(
f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads"
)

# Clear existing consumer references (threads are dead in child process)
self._ingestion_consumers = []
self._media_upload_consumers = []

# Recreate queues (old queues may have shared state with parent process)
self._ingestion_queue = queue.Queue(self._max_task_queue_size)
self._media_upload_queue = Queue(self._max_task_queue_size)

# Recreate MediaManager with new queue
self._media_manager = MediaManager(
api_client=self._api_client,
media_upload_queue=self._media_upload_queue,
max_retries=self._max_retries,
)

# Start fresh consumer threads in child process
self.init_resources()

self._log.debug(
f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork"
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 _at_fork_reinit restarts threads on an already-shutdown manager

os.register_at_fork callbacks are never removed, so if shutdown() is called in the master process before Gunicorn forks, the child process will still call _at_fork_reinit. This restarts consumer threads on a manager that was intentionally torn down, and the child's atexit-registered shutdown() will then try to flush and join them — potentially generating spurious outbound API traffic or blocking on exit in a worker that was never meant to send data. Adding a guard like a _shutdown flag would prevent reinitialization on already-stopped instances.

Prompt To Fix With AI
This is a comment left during a code review.
Path: langfuse/_task_manager/task_manager.py
Line: 93-126

Comment:
**`_at_fork_reinit` restarts threads on an already-shutdown manager**

`os.register_at_fork` callbacks are never removed, so if `shutdown()` is called in the master process before Gunicorn forks, the child process will still call `_at_fork_reinit`. This restarts consumer threads on a manager that was intentionally torn down, and the child's atexit-registered `shutdown()` will then try to flush and join them — potentially generating spurious outbound API traffic or blocking on exit in a worker that was never meant to send data. Adding a guard like a `_shutdown` flag would prevent reinitialization on already-stopped instances.

How can I resolve this? If you propose a fix, please make it concise.

Signed-off-by: Yeonggyu Park <pon05114@naver.com>
@pyg410
Copy link
Copy Markdown
Author

pyg410 commented May 14, 2026

This fix follows the same pattern used in the OpenTelemetry Python SDK
(https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py)

Signed-off-by: Yeonggyu Park <pon05114@naver.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants