diff --git a/langfuse/_task_manager/task_manager.py b/langfuse/_task_manager/task_manager.py index e94265350..6cfa4f7c5 100644 --- a/langfuse/_task_manager/task_manager.py +++ b/langfuse/_task_manager/task_manager.py @@ -2,7 +2,9 @@ import atexit import logging +import os import queue +import weakref from queue import Queue from typing import List, Optional @@ -35,6 +37,7 @@ class TaskManager(object): _sdk_integration: str _sample_rate: float _mask: Optional[MaskFunction] + _shutdown: bool def __init__( self, @@ -77,12 +80,60 @@ def __init__( self._enabled = enabled self._sample_rate = sample_rate self._mask = mask + self._shutdown = False self.init_resources() # cleans up when the python interpreter closes atexit.register(self.shutdown) + # Register fork handler to reinitialize consumer threads in child process. + # When using Gunicorn with --preload, os.fork() copies memory but not threads. + # Without this, worker processes have no consumer threads and all events are lost. + if hasattr(os, "register_at_fork"): + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + os.register_at_fork( + after_in_child=lambda: weak_reinit()() if weak_reinit() is not None else None + ) + + def _at_fork_reinit(self): + """Reinitialize consumer threads after fork in child process. + + Called automatically by os.register_at_fork() after fork(). + Necessary for Gunicorn --preload deployments where os.fork() is used: + threads are not copied to child processes (POSIX standard), so without + reinitialization, the child process has no consumer threads and all + ingestion events are silently lost. + """ + if self._shutdown: + return + + self._log.debug( + f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads" + ) + + # Clear existing consumer references (threads are dead in child process) + self._ingestion_consumers = [] + self._media_upload_consumers = [] + + # Recreate queues (old queues may have shared state with parent process) + self._ingestion_queue = queue.Queue(self._max_task_queue_size) + self._media_upload_queue = Queue(self._max_task_queue_size) + + # Recreate MediaManager with new queue + self._media_manager = MediaManager( + api_client=self._api_client, + media_upload_queue=self._media_upload_queue, + max_retries=self._max_retries, + ) + + # Start fresh consumer threads in child process + self.init_resources() + + self._log.debug( + f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork" + ) + def init_resources(self): for i in range(self._threads): ingestion_consumer = IngestionConsumer( @@ -191,6 +242,8 @@ def shutdown(self): """Flush all messages and cleanly shutdown the client.""" self._log.debug("shutdown initiated") + self._shutdown = True + # Unregister the atexit handler first atexit.unregister(self.shutdown)