From 6d8dccd59edcaa8f9380935e7be8fa73e87f3632 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=98=81=EA=B7=9C?= Date: Thu, 14 May 2026 22:40:15 +0900 Subject: [PATCH 1/3] fix(task_manager): reinitialize consumer threads after os.fork() Signed-off-by: Yeonggyu Park --- langfuse/_task_manager/task_manager.py | 42 ++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/langfuse/_task_manager/task_manager.py b/langfuse/_task_manager/task_manager.py index e94265350..0789c27ef 100644 --- a/langfuse/_task_manager/task_manager.py +++ b/langfuse/_task_manager/task_manager.py @@ -3,6 +3,7 @@ import atexit import logging import queue +import os from queue import Queue from typing import List, Optional @@ -83,6 +84,47 @@ def __init__( # cleans up when the python interpreter closes atexit.register(self.shutdown) + # Register fork handler to reinitialize consumer threads in child process. + # When using Gunicorn with --preload, os.fork() copies memory but not threads. + # Without this, worker processes have no consumer threads and all events are lost. + if hasattr(os, "register_at_fork"): + os.register_at_fork(after_in_child=self._at_fork_reinit) + + def _at_fork_reinit(self): + """Reinitialize consumer threads after fork in child process. + + Called automatically by os.register_at_fork() after fork(). + Necessary for Gunicorn --preload deployments where os.fork() is used: + threads are not copied to child processes (POSIX standard), so without + reinitialization, the child process has no consumer threads and all + ingestion events are silently lost. + """ + self._log.debug( + f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads" + ) + + # Clear existing consumer references (threads are dead in child process) + self._ingestion_consumers = [] + self._media_upload_consumers = [] + + # Recreate queues (old queues may have shared state with parent process) + self._ingestion_queue = queue.Queue(self._max_task_queue_size) + self._media_upload_queue = Queue(self._max_task_queue_size) + + # Recreate MediaManager with new queue + self._media_manager = MediaManager( + api_client=self._api_client, + media_upload_queue=self._media_upload_queue, + max_retries=self._max_retries, + ) + + # Start fresh consumer threads in child process + self.init_resources() + + self._log.debug( + f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork" + ) + def init_resources(self): for i in range(self._threads): ingestion_consumer = IngestionConsumer( From 5b70f7dac817aeb493ade8f503273d63d9114acc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=98=81=EA=B7=9C?= Date: Thu, 14 May 2026 23:28:13 +0900 Subject: [PATCH 2/3] fix(task_manager): use weakref and add shutdown guard for fork handler Signed-off-by: Yeonggyu Park --- langfuse/_task_manager/task_manager.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/langfuse/_task_manager/task_manager.py b/langfuse/_task_manager/task_manager.py index 0789c27ef..54c90030a 100644 --- a/langfuse/_task_manager/task_manager.py +++ b/langfuse/_task_manager/task_manager.py @@ -4,6 +4,7 @@ import logging import queue import os +import weakref from queue import Queue from typing import List, Optional @@ -36,6 +37,7 @@ class TaskManager(object): _sdk_integration: str _sample_rate: float _mask: Optional[MaskFunction] + _shutdown: bool def __init__( self, @@ -78,6 +80,7 @@ def __init__( self._enabled = enabled self._sample_rate = sample_rate self._mask = mask + self._shutdown = False self.init_resources() @@ -88,7 +91,10 @@ def __init__( # When using Gunicorn with --preload, os.fork() copies memory but not threads. # Without this, worker processes have no consumer threads and all events are lost. if hasattr(os, "register_at_fork"): - os.register_at_fork(after_in_child=self._at_fork_reinit) + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + os.register_at_fork( + after_in_child=lambda: weak_reinit()() if weak_reinit() is not None else None + ) def _at_fork_reinit(self): """Reinitialize consumer threads after fork in child process. @@ -99,6 +105,9 @@ def _at_fork_reinit(self): reinitialization, the child process has no consumer threads and all ingestion events are silently lost. """ + if self._shutdown: + return + self._log.debug( f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads" ) @@ -233,6 +242,8 @@ def shutdown(self): """Flush all messages and cleanly shutdown the client.""" self._log.debug("shutdown initiated") + self._shutdown = True + # Unregister the atexit handler first atexit.unregister(self.shutdown) From ad72414c0702bac99f1b75fe372718a8cc8e156a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=98=81=EA=B7=9C?= Date: Thu, 14 May 2026 23:57:59 +0900 Subject: [PATCH 3/3] style(task_manager): fix stdlib import order per PEP 8 Signed-off-by: Yeonggyu Park --- langfuse/_task_manager/task_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/langfuse/_task_manager/task_manager.py b/langfuse/_task_manager/task_manager.py index 54c90030a..6cfa4f7c5 100644 --- a/langfuse/_task_manager/task_manager.py +++ b/langfuse/_task_manager/task_manager.py @@ -2,8 +2,8 @@ import atexit import logging -import queue import os +import queue import weakref from queue import Queue from typing import List, Optional