From 4e33e81fdd051d1446749a35b8fe73e5b6f15e02 Mon Sep 17 00:00:00 2001 From: Edwin Pavlovsky Date: Mon, 4 May 2026 18:56:27 -0400 Subject: [PATCH 1/6] Add support for async dedup --- nucleus/__init__.py | 7 +- nucleus/dataset.py | 46 ++++++++++-- nucleus/deduplication.py | 103 ++++++++++++++++++++++++++- tests/test_deduplication.py | 135 +++++++++++++++++++++++++++++++++++- 4 files changed, 281 insertions(+), 10 deletions(-) diff --git a/nucleus/__init__.py b/nucleus/__init__.py index c38a1215..d7ee51db 100644 --- a/nucleus/__init__.py +++ b/nucleus/__init__.py @@ -4,6 +4,7 @@ "AsyncJob", "EmbeddingsExportJob", "BoxAnnotation", + "DeduplicationJob", "DeduplicationResult", "DeduplicationStats", "BoxPrediction", @@ -131,7 +132,11 @@ from .data_transfer_object.job_status import JobInfoRequestPayload from .dataset import Dataset from .dataset_item import DatasetItem -from .deduplication import DeduplicationResult, DeduplicationStats +from .deduplication import ( + DeduplicationJob, + DeduplicationResult, + DeduplicationStats, +) from .deprecation_warning import deprecated from .errors import ( DatasetItemRetrievalError, diff --git a/nucleus/dataset.py b/nucleus/dataset.py index 26982758..6518bae4 100644 --- a/nucleus/dataset.py +++ b/nucleus/dataset.py @@ -84,7 +84,11 @@ check_items_have_dimensions, ) from .dataset_item_uploader import DatasetItemUploader -from .deduplication import DeduplicationResult, DeduplicationStats +from .deduplication import ( + DeduplicationJob, + DeduplicationResult, + DeduplicationStats, +) from .deprecation_warning import deprecated from .errors import NotFoundError, NucleusAPIError from .job import CustomerJobTypes, jobs_status_overview @@ -1016,7 +1020,8 @@ def deduplicate( self, threshold: int, reference_ids: Optional[List[str]] = None, - ) -> DeduplicationResult: + asynchronous: bool = False, + ) -> Union[DeduplicationResult, DeduplicationJob]: """Deduplicate images or frames using user-defined reference IDs. This method can deduplicate an entire dataset (when reference_ids is omitted) @@ -1029,6 +1034,7 @@ def deduplicate( not the scenes themselves. Frame reference IDs or dataset item IDs should be provided for scene datasets. - For very large datasets, this operation may take significant time. + Pass ``asynchronous=True`` to avoid HTTP timeouts. Parameters: threshold: Hamming distance threshold (0-64). Lower = stricter. @@ -1036,9 +1042,17 @@ def deduplicate( reference_ids: Optional list of user-defined reference IDs to deduplicate. If not provided (or None), deduplicates the entire dataset. Cannot be an empty list - use None for entire dataset. + asynchronous: When False (default), runs dedup synchronously and + returns a :class:`DeduplicationResult` directly. When True, + kicks off a background job and returns a + :class:`DeduplicationJob` you can poll/await — recommended for + large datasets where the sync HTTP request would otherwise + time out. Returns: - DeduplicationResult with unique_reference_ids, unique_item_ids, and stats. + :class:`DeduplicationResult` when ``asynchronous=False``, or a + :class:`DeduplicationJob` when ``asynchronous=True``. Call + ``job.result()`` on the latter to block and unpack the result. Raises: ValueError: If reference_ids is an empty list (use None for entire dataset). @@ -1057,6 +1071,12 @@ def deduplicate( if reference_ids is not None: payload[REFERENCE_IDS_KEY] = reference_ids + if asynchronous: + response = self._client.make_request( + payload, f"dataset/{self.id}/deduplicate_async" + ) + return DeduplicationJob.from_json(response, self._client) + response = self._client.make_request( payload, f"dataset/{self.id}/deduplicate" ) @@ -1074,7 +1094,8 @@ def deduplicate_by_ids( self, threshold: int, dataset_item_ids: List[str], - ) -> DeduplicationResult: + asynchronous: bool = False, + ) -> Union[DeduplicationResult, DeduplicationJob]: """Deduplicate images or frames using internal Nucleus dataset item IDs. This method identifies items by internal Nucleus IDs (e.g., "di_abc123...") @@ -1088,9 +1109,17 @@ def deduplicate_by_ids( dataset_item_ids: List of internal Nucleus dataset item IDs to deduplicate. These IDs are generated by Nucleus; they are not user-defined reference IDs. Must be non-empty. + asynchronous: When False (default), runs dedup synchronously and + returns a :class:`DeduplicationResult` directly. When True, + kicks off a background job and returns a + :class:`DeduplicationJob` you can poll/await — recommended for + large id lists where the sync HTTP request would otherwise + time out. Returns: - DeduplicationResult with unique_item_ids, unique_reference_ids, and stats. + :class:`DeduplicationResult` when ``asynchronous=False``, or a + :class:`DeduplicationJob` when ``asynchronous=True``. Call + ``job.result()`` on the latter to block and unpack the result. Raises: ValueError: If dataset_item_ids is empty. @@ -1109,6 +1138,13 @@ def deduplicate_by_ids( DATASET_ITEM_IDS_KEY: dataset_item_ids, THRESHOLD_KEY: threshold, } + + if asynchronous: + response = self._client.make_request( + payload, f"dataset/{self.id}/deduplicate_async" + ) + return DeduplicationJob.from_json(response, self._client) + response = self._client.make_request( payload, f"dataset/{self.id}/deduplicate" ) diff --git a/nucleus/deduplication.py b/nucleus/deduplication.py index f427c004..c7b6b9d2 100644 --- a/nucleus/deduplication.py +++ b/nucleus/deduplication.py @@ -1,9 +1,20 @@ from dataclasses import dataclass -from typing import List +from typing import Any, Dict, List, cast + +from nucleus.async_job import AsyncJob, JobError @dataclass class DeduplicationStats: + """Summary statistics for a deduplication run. + + Attributes: + threshold: The Hamming distance threshold the run was executed at. + Lower values are stricter; ``0`` means exact matches only. + original_count: How many items were considered before deduplication. + deduplicated_count: How many unique items remained afterwards. + """ + threshold: int original_count: int deduplicated_count: int @@ -11,6 +22,92 @@ class DeduplicationStats: @dataclass class DeduplicationResult: - unique_item_ids: List[str] # Internal dataset item IDs - unique_reference_ids: List[str] # User-defined reference IDs + """Output of a deduplication run. + + Attributes: + unique_item_ids: Nucleus-internal dataset item IDs (e.g. + ``"di_abc123..."``) that survived deduplication. One entry per + kept item. + unique_reference_ids: The user-defined reference IDs you supplied at + upload time, in the same order as ``unique_item_ids``. + stats: Summary statistics for the run. See :class:`DeduplicationStats`. + """ + + unique_item_ids: List[str] + unique_reference_ids: List[str] stats: DeduplicationStats + + +class DeduplicationJob(AsyncJob): + """Handle to a long-running deduplication job. + + Returned from :meth:`Dataset.deduplicate` and + :meth:`Dataset.deduplicate_by_ids` when called with ``asynchronous=True``. + Use this when your dataset (or item list) is large enough that the + synchronous request would risk timing out — the job runs in the background + and you collect the result with :meth:`result`. + + Inherits all the standard :class:`AsyncJob` controls + (:meth:`status`, :meth:`errors`, :meth:`sleep_until_complete`). + + :: + + import nucleus + + client = nucleus.NucleusClient(YOUR_API_KEY) + dataset = client.get_dataset("ds_xxx") + + # Sync — fine for small datasets, returns the result inline. + result = dataset.deduplicate(threshold=10) + + # Async — recommended for large datasets. + job = dataset.deduplicate(threshold=10, asynchronous=True) + result = job.result() # blocks until done + print(result.stats.deduplicated_count) + print(result.unique_reference_ids) + + # Or split the wait and fetch yourself. + job.sleep_until_complete() + result = job.result(wait_for_completion=False) + """ + + def result( + self, wait_for_completion: bool = True + ) -> "DeduplicationResult": + """Return the deduplication result, optionally waiting for the job. + + Parameters: + wait_for_completion: When ``True`` (default), block until the job + reaches a terminal state. When ``False``, the caller is + expected to have already waited (e.g. via + :meth:`sleep_until_complete`). + + Returns: + A :class:`DeduplicationResult` containing the kept item IDs, + reference IDs, and run statistics. + + Raises: + JobError: If the job did not finish successfully (e.g. it was + cancelled or hit a server error). + """ + if wait_for_completion: + self.sleep_until_complete(verbose_std_out=False) + + status = self.status() + if status["status"] != "Completed": + raise JobError(status, self) + + # AsyncJob.status() is typed as Dict[str, str] in the base class, but + # the `message` slot is a JSON dict in practice. Cast locally so + # static checkers don't flag the dict accesses below. + msg = cast(Dict[str, Any], status["message"] or {}) + stats = cast(Dict[str, Any], msg.get("stats") or {}) + return DeduplicationResult( + unique_item_ids=msg["unique_item_ids"], + unique_reference_ids=msg["unique_reference_ids"], + stats=DeduplicationStats( + threshold=stats.get("threshold", 0), + original_count=stats["original_count"], + deduplicated_count=stats["deduplicated_count"], + ), + ) diff --git a/tests/test_deduplication.py b/tests/test_deduplication.py index 5e4d45d8..d538c926 100644 --- a/tests/test_deduplication.py +++ b/tests/test_deduplication.py @@ -1,7 +1,7 @@ import pytest from nucleus import Dataset, DatasetItem, NucleusClient, VideoScene -from nucleus.deduplication import DeduplicationResult +from nucleus.deduplication import DeduplicationJob, DeduplicationResult from nucleus.errors import NucleusAPIError from .helpers import ( @@ -31,6 +31,28 @@ def test_deduplicate_by_ids_empty_list_raises_error(): ) +def test_deduplicate_async_empty_reference_ids_raises_error(): + """Async mode performs the same client-side validation as sync mode and + must raise before issuing any HTTP request.""" + fake_dataset = Dataset("fake", NucleusClient("fake")) + with pytest.raises(ValueError, match="reference_ids cannot be empty"): + fake_dataset.deduplicate( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD, + reference_ids=[], + asynchronous=True, + ) + + +def test_deduplicate_by_ids_async_empty_list_raises_error(): + fake_dataset = Dataset("fake", NucleusClient("fake")) + with pytest.raises(ValueError, match="dataset_item_ids must be non-empty"): + fake_dataset.deduplicate_by_ids( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD, + dataset_item_ids=[], + asynchronous=True, + ) + + @pytest.fixture(scope="module") def dataset_image_sync(CLIENT): """Image dataset uploaded synchronously.""" @@ -449,3 +471,114 @@ def test_deduplicate_distinct_images_all_unique(dataset_image_sync): # With threshold=0 (exact match only), all distinct images should be unique assert result.stats.deduplicated_count == result.stats.original_count + + +# --------------------------------------------------------------------------- +# Async-mode tests +# +# These exercise the `asynchronous=True` code path: the SDK kicks off a +# Temporal-backed dedup job on the server, returns a `DeduplicationJob`, and +# the caller polls/awaits via `job.result()`. The result payload should match +# the sync flow for the same inputs. +# --------------------------------------------------------------------------- + + +@pytest.mark.integration +def test_deduplicate_async_returns_job_and_result(dataset_image_sync): + """`asynchronous=True` returns a DeduplicationJob whose `.result()` blocks + until the workflow completes and yields a DeduplicationResult equivalent + to the sync output for the same inputs.""" + sync_result = dataset_image_sync.deduplicate( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD + ) + + job = dataset_image_sync.deduplicate( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD, asynchronous=True + ) + assert isinstance(job, DeduplicationJob) + assert ( + job.job_type + ) # populated by AsyncJob.from_json from the server response + + async_result = job.result() + assert isinstance(async_result, DeduplicationResult) + assert async_result.stats.threshold == DEDUP_DEFAULT_TEST_THRESHOLD + assert ( + async_result.stats.original_count == sync_result.stats.original_count + ) + assert ( + async_result.stats.deduplicated_count + == sync_result.stats.deduplicated_count + ) + assert set(async_result.unique_reference_ids) == set( + sync_result.unique_reference_ids + ) + + +@pytest.mark.integration +def test_deduplicate_async_with_reference_ids(dataset_image_sync): + """Async mode respects an explicit `reference_ids` scope just like sync.""" + reference_ids = [item.reference_id for item in TEST_DATASET_ITEMS[:2]] + + job = dataset_image_sync.deduplicate( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD, + reference_ids=reference_ids, + asynchronous=True, + ) + assert isinstance(job, DeduplicationJob) + + result = job.result() + assert isinstance(result, DeduplicationResult) + assert result.stats.original_count == len(reference_ids) + assert len(result.unique_reference_ids) <= len(reference_ids) + + +@pytest.mark.integration +def test_deduplicate_by_ids_async(dataset_image_sync): + """Async `deduplicate_by_ids` parallels the sync entrypoint.""" + initial_result = dataset_image_sync.deduplicate( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD + ) + item_ids = initial_result.unique_item_ids + assert len(item_ids) > 0 + + job = dataset_image_sync.deduplicate_by_ids( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD, + dataset_item_ids=item_ids, + asynchronous=True, + ) + assert isinstance(job, DeduplicationJob) + + result = job.result() + assert isinstance(result, DeduplicationResult) + assert result.stats.original_count == len(item_ids) + assert result.unique_item_ids == initial_result.unique_item_ids + + +@pytest.mark.integration +def test_deduplicate_async_identifies_duplicates(dataset_with_duplicates): + """Sanity check: async mode produces the same dedup outcome as sync for a + dataset with known duplicates.""" + job = dataset_with_duplicates.deduplicate(threshold=0, asynchronous=True) + assert isinstance(job, DeduplicationJob) + + result = job.result() + assert result.stats.original_count == 3 + # Same duplicate-detection logic regardless of transport. + assert result.stats.deduplicated_count == 2 + assert len(result.unique_reference_ids) == 2 + + +@pytest.mark.integration +def test_deduplicate_async_split_wait_and_fetch(dataset_image_sync): + """`result(wait_for_completion=False)` is a usable mode when the caller + has already blocked via `sleep_until_complete()`.""" + job = dataset_image_sync.deduplicate( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD, asynchronous=True + ) + assert isinstance(job, DeduplicationJob) + + job.sleep_until_complete(verbose_std_out=False) + result = job.result(wait_for_completion=False) + assert isinstance(result, DeduplicationResult) + assert result.stats.threshold == DEDUP_DEFAULT_TEST_THRESHOLD From 552797fa00b0db0d73cca3e3e112a7b62f4098c2 Mon Sep 17 00:00:00 2001 From: Edwin Pavlovsky Date: Mon, 4 May 2026 18:57:00 -0400 Subject: [PATCH 2/6] Update sphinx package versions --- nucleus/dataset_item.py | 2 +- nucleus/scene.py | 4 +- poetry.lock | 184 +++++++++++++++++++++++++++------------- pyproject.toml | 4 +- 4 files changed, 132 insertions(+), 62 deletions(-) diff --git a/nucleus/dataset_item.py b/nucleus/dataset_item.py index 0ecd79c6..6b90f35a 100644 --- a/nucleus/dataset_item.py +++ b/nucleus/dataset_item.py @@ -100,7 +100,7 @@ class DatasetItem: # pylint: disable=R0902 camera intrinsics the metadata of your camera image items. Nucleus requires these intrinsics to create visualizations such as cuboid projections. Refer to our `guide to uploading 3D data - `_ for more + `__ for more info. Coordinate metadata may be provided to enable the Map Chart in the Nucleus Dataset charts page. diff --git a/nucleus/scene.py b/nucleus/scene.py index 310f775b..aedfe5df 100644 --- a/nucleus/scene.py +++ b/nucleus/scene.py @@ -43,7 +43,7 @@ class Frame: pointcloud and any number of images (e.g. from different angles). Refer to our `guide to uploading 3D data - `_ for more info! + `__ for more info! """ def __init__(self, **kwargs: DatasetItem) -> None: @@ -419,7 +419,7 @@ class LidarScene(Scene): `{ "context_attachments": [ { "attachment": 'https://example.com/1' }, { "attachment": 'https://example.com/2' }, ... ] }`. Refer to our `guide to uploading 3D data - `_ for more info! + `__ for more info! """ def __repr__(self) -> str: diff --git a/poetry.lock b/poetry.lock index f284a782..0af6a1fb 100644 --- a/poetry.lock +++ b/poetry.lock @@ -12,6 +12,25 @@ files = [ {file = "absl_py-2.4.0.tar.gz", hash = "sha256:8c6af82722b35cf71e0f4d1d47dcaebfff286e27110a99fc359349b247dfb5d4"}, ] +[[package]] +name = "accessible-pygments" +version = "0.0.5" +description = "A collection of accessible pygments styles" +optional = false +python-versions = ">=3.9" +groups = ["dev"] +files = [ + {file = "accessible_pygments-0.0.5-py3-none-any.whl", hash = "sha256:88ae3211e68a1d0b011504b2ffc1691feafce124b845bd072ab6f9f66f34d4b7"}, + {file = "accessible_pygments-0.0.5.tar.gz", hash = "sha256:40918d3e6a2b619ad424cb91e556bd3bd8865443d9f22f1dcdf79e33c8046872"}, +] + +[package.dependencies] +pygments = ">=1.5" + +[package.extras] +dev = ["pillow", "pkginfo (>=1.10)", "playwright", "pre-commit", "setuptools", "twine (>=5.0)"] +tests = ["hypothesis", "pytest"] + [[package]] name = "affine" version = "2.4.0" @@ -1144,14 +1163,14 @@ files = [ [[package]] name = "docutils" -version = "0.17.1" +version = "0.21.2" description = "Docutils -- Python Documentation Utilities" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +python-versions = ">=3.9" groups = ["dev"] files = [ - {file = "docutils-0.17.1-py2.py3-none-any.whl", hash = "sha256:cf316c8370a737a022b72b56874f6602acf974a37a9fba42ec2876387549fc61"}, - {file = "docutils-0.17.1.tar.gz", hash = "sha256:686577d2e4c32380bb50cbb22f575ed742d58168cee37e99117a854bcd88f125"}, + {file = "docutils-0.21.2-py3-none-any.whl", hash = "sha256:dafca5b9e384f0e419294eb4d2ff9fa826435bf15f15b7bd45723e8ad76811b2"}, + {file = "docutils-0.21.2.tar.gz", hash = "sha256:3a6b18732edf182daa3cd12775bbb338cf5691468f91eeeb109deff6ebfa986f"}, ] [[package]] @@ -1396,21 +1415,22 @@ files = [ [[package]] name = "furo" -version = "2022.9.29" +version = "2025.12.19" description = "A clean customisable Sphinx documentation theme." optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" groups = ["dev"] files = [ - {file = "furo-2022.9.29-py3-none-any.whl", hash = "sha256:559ee17999c0f52728481dcf6b1b0cf8c9743e68c5e3a18cb45a7992747869a9"}, - {file = "furo-2022.9.29.tar.gz", hash = "sha256:d4238145629c623609c2deb5384f8d036e2a1ee2a101d64b67b4348112470dbd"}, + {file = "furo-2025.12.19-py3-none-any.whl", hash = "sha256:bb0ead5309f9500130665a26bee87693c41ce4dbdff864dbfb6b0dae4673d24f"}, + {file = "furo-2025.12.19.tar.gz", hash = "sha256:188d1f942037d8b37cd3985b955839fea62baa1730087dc29d157677c857e2a7"}, ] [package.dependencies] +accessible-pygments = ">=0.0.5" beautifulsoup4 = "*" pygments = ">=2.7" -sphinx = ">=4.0,<6.0" -sphinx-basic-ng = "*" +sphinx = ">=7.0,<10.0" +sphinx-basic-ng = ">=1.0.0b2" [[package]] name = "identify" @@ -4212,6 +4232,35 @@ pygments = ">=2.6.0,<3.0.0" [package.extras] jupyter = ["ipywidgets (>=7.5.1,<8.0.0)"] +[[package]] +name = "roman-numerals" +version = "4.1.0" +description = "Manipulate well-formed Roman numerals" +optional = false +python-versions = ">=3.10" +groups = ["dev"] +markers = "python_version >= \"3.11\"" +files = [ + {file = "roman_numerals-4.1.0-py3-none-any.whl", hash = "sha256:647ba99caddc2cc1e55a51e4360689115551bf4476d90e8162cf8c345fe233c7"}, + {file = "roman_numerals-4.1.0.tar.gz", hash = "sha256:1af8b147eb1405d5839e78aeb93131690495fe9da5c91856cb33ad55a7f1e5b2"}, +] + +[[package]] +name = "roman-numerals-py" +version = "4.1.0" +description = "This package is deprecated, switch to roman-numerals." +optional = false +python-versions = ">=3.10" +groups = ["dev"] +markers = "python_version >= \"3.11\"" +files = [ + {file = "roman_numerals_py-4.1.0-py3-none-any.whl", hash = "sha256:553114c1167141c1283a51743759723ecd05604a1b6b507225e91dc1a6df0780"}, + {file = "roman_numerals_py-4.1.0.tar.gz", hash = "sha256:f5d7b2b4ca52dd855ef7ab8eb3590f428c0b1ea480736ce32b01fef2a5f8daf9"}, +] + +[package.dependencies] +roman-numerals = "4.1.0" + [[package]] name = "rpds-py" version = "0.30.0" @@ -4810,62 +4859,95 @@ files = [ [[package]] name = "sphinx" -version = "4.5.0" +version = "8.1.3" description = "Python documentation generator" optional = false -python-versions = ">=3.6" +python-versions = ">=3.10" groups = ["dev"] +markers = "python_version == \"3.10\"" files = [ - {file = "Sphinx-4.5.0-py3-none-any.whl", hash = "sha256:ebf612653238bcc8f4359627a9b7ce44ede6fdd75d9d30f68255c7383d3a6226"}, - {file = "Sphinx-4.5.0.tar.gz", hash = "sha256:7bf8ca9637a4ee15af412d1a1d9689fec70523a68ca9bb9127c2f3eeb344e2e6"}, + {file = "sphinx-8.1.3-py3-none-any.whl", hash = "sha256:09719015511837b76bf6e03e42eb7595ac8c2e41eeb9c29c5b755c6b677992a2"}, + {file = "sphinx-8.1.3.tar.gz", hash = "sha256:43c1911eecb0d3e161ad78611bc905d1ad0e523e4ddc202a58a821773dc4c927"}, ] [package.dependencies] -alabaster = ">=0.7,<0.8" -babel = ">=1.3" -colorama = {version = ">=0.3.5", markers = "sys_platform == \"win32\""} -docutils = ">=0.14,<0.18" -imagesize = "*" -Jinja2 = ">=2.3" -packaging = "*" -Pygments = ">=2.0" -requests = ">=2.5.0" -snowballstemmer = ">=1.1" -sphinxcontrib-applehelp = "*" -sphinxcontrib-devhelp = "*" -sphinxcontrib-htmlhelp = ">=2.0.0" -sphinxcontrib-jsmath = "*" -sphinxcontrib-qthelp = "*" -sphinxcontrib-serializinghtml = ">=1.1.5" +alabaster = ">=0.7.14" +babel = ">=2.13" +colorama = {version = ">=0.4.6", markers = "sys_platform == \"win32\""} +docutils = ">=0.20,<0.22" +imagesize = ">=1.3" +Jinja2 = ">=3.1" +packaging = ">=23.0" +Pygments = ">=2.17" +requests = ">=2.30.0" +snowballstemmer = ">=2.2" +sphinxcontrib-applehelp = ">=1.0.7" +sphinxcontrib-devhelp = ">=1.0.6" +sphinxcontrib-htmlhelp = ">=2.0.6" +sphinxcontrib-jsmath = ">=1.0.1" +sphinxcontrib-qthelp = ">=1.0.6" +sphinxcontrib-serializinghtml = ">=1.1.9" +tomli = {version = ">=2", markers = "python_version < \"3.11\""} + +[package.extras] +docs = ["sphinxcontrib-websupport"] +lint = ["flake8 (>=6.0)", "mypy (==1.11.1)", "pyright (==1.1.384)", "pytest (>=6.0)", "ruff (==0.6.9)", "sphinx-lint (>=0.9)", "tomli (>=2)", "types-Pillow (==10.2.0.20240822)", "types-Pygments (==2.18.0.20240506)", "types-colorama (==0.4.15.20240311)", "types-defusedxml (==0.7.0.20240218)", "types-docutils (==0.21.0.20241005)", "types-requests (==2.32.0.20240914)", "types-urllib3 (==1.26.25.14)"] +test = ["cython (>=3.0)", "defusedxml (>=0.7.1)", "pytest (>=8.0)", "setuptools (>=70.0)", "typing_extensions (>=4.9)"] + +[[package]] +name = "sphinx" +version = "8.2.3" +description = "Python documentation generator" +optional = false +python-versions = ">=3.11" +groups = ["dev"] +markers = "python_version >= \"3.11\"" +files = [ + {file = "sphinx-8.2.3-py3-none-any.whl", hash = "sha256:4405915165f13521d875a8c29c8970800a0141c14cc5416a38feca4ea5d9b9c3"}, + {file = "sphinx-8.2.3.tar.gz", hash = "sha256:398ad29dee7f63a75888314e9424d40f52ce5a6a87ae88e7071e80af296ec348"}, +] + +[package.dependencies] +alabaster = ">=0.7.14" +babel = ">=2.13" +colorama = {version = ">=0.4.6", markers = "sys_platform == \"win32\""} +docutils = ">=0.20,<0.22" +imagesize = ">=1.3" +Jinja2 = ">=3.1" +packaging = ">=23.0" +Pygments = ">=2.17" +requests = ">=2.30.0" +roman-numerals-py = ">=1.0.0" +snowballstemmer = ">=2.2" +sphinxcontrib-applehelp = ">=1.0.7" +sphinxcontrib-devhelp = ">=1.0.6" +sphinxcontrib-htmlhelp = ">=2.0.6" +sphinxcontrib-jsmath = ">=1.0.1" +sphinxcontrib-qthelp = ">=1.0.6" +sphinxcontrib-serializinghtml = ">=1.1.9" [package.extras] docs = ["sphinxcontrib-websupport"] -lint = ["docutils-stubs", "flake8 (>=3.5.0)", "isort", "mypy (>=0.931)", "types-requests", "types-typed-ast"] -test = ["cython", "html5lib", "pytest", "pytest-cov", "typed-ast ; python_version < \"3.8\""] +lint = ["betterproto (==2.0.0b6)", "mypy (==1.15.0)", "pypi-attestations (==0.0.21)", "pyright (==1.1.395)", "pytest (>=8.0)", "ruff (==0.9.9)", "sphinx-lint (>=0.9)", "types-Pillow (==10.2.0.20240822)", "types-Pygments (==2.19.0.20250219)", "types-colorama (==0.4.15.20240311)", "types-defusedxml (==0.7.0.20240218)", "types-docutils (==0.21.0.20241128)", "types-requests (==2.32.0.20241016)", "types-urllib3 (==1.26.25.14)"] +test = ["cython (>=3.0)", "defusedxml (>=0.7.1)", "pytest (>=8.0)", "pytest-xdist[psutil] (>=3.4)", "setuptools (>=70.0)", "typing_extensions (>=4.9)"] [[package]] name = "sphinx-autoapi" -version = "1.9.0" +version = "3.8.0" description = "Sphinx API documentation generator" optional = false -python-versions = ">=3.7" +python-versions = ">=3.10" groups = ["dev"] files = [ - {file = "sphinx-autoapi-1.9.0.tar.gz", hash = "sha256:c897ea337df16ad0cde307cbdfe2bece207788dde1587fa4fc8b857d1fc5dcba"}, - {file = "sphinx_autoapi-1.9.0-py2.py3-none-any.whl", hash = "sha256:d217953273b359b699d8cb81a5a72985a3e6e15cfe3f703d9a3c201ffc30849b"}, + {file = "sphinx_autoapi-3.8.0-py3-none-any.whl", hash = "sha256:245aefdeab85609ae4aa3576b0d99f69676aa6333dda438761bd125755b3c42d"}, + {file = "sphinx_autoapi-3.8.0.tar.gz", hash = "sha256:9f8ac7d43baf28a0831ac0e392fab6a095b875af07e52d135a5f716cc3cf1142"}, ] [package.dependencies] -astroid = ">=2.7" +astroid = ">=3.0" Jinja2 = "*" PyYAML = "*" -sphinx = ">=3.0" -unidecode = "*" - -[package.extras] -docs = ["sphinx", "sphinx-rtd-theme"] -dotnet = ["sphinxcontrib-dotnetdomain"] -go = ["sphinxcontrib-golangdomain"] +sphinx = ">=7.4.0" [[package]] name = "sphinx-autobuild" @@ -5302,18 +5384,6 @@ files = [ {file = "tzdata-2026.2.tar.gz", hash = "sha256:9173fde7d80d9018e02a662e168e5a2d04f87c41ea174b139fbef642eda62d10"}, ] -[[package]] -name = "unidecode" -version = "1.4.0" -description = "ASCII transliterations of Unicode text" -optional = false -python-versions = ">=3.7" -groups = ["dev"] -files = [ - {file = "Unidecode-1.4.0-py3-none-any.whl", hash = "sha256:c3c7606c27503ad8d501270406e345ddb480a7b5f38827eafe4fa82a137f0021"}, - {file = "Unidecode-1.4.0.tar.gz", hash = "sha256:ce35985008338b676573023acc382d62c264f307c8f7963733405add37ea2b23"}, -] - [[package]] name = "uri-template" version = "1.3.0" @@ -5693,4 +5763,4 @@ metrics = ["Shapely", "rasterio", "scikit-learn", "scipy"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<4.0" -content-hash = "bf04ddba562b921563949f9a71310eca65c5c2f99de1eec11dc78beb6311631b" +content-hash = "a5ddb6d8b83d37b2723d803bc76130fa276aabb40a912300320b17def73788d2" diff --git a/pyproject.toml b/pyproject.toml index 628ffa3a..5690ea2a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,10 +69,10 @@ pre-commit = ">=2.12.1" jupyterlab = ">=3.1.10,<4.0" isort = ">=5.10.1" absl-py = ">=0.13.0" -Sphinx = ">=4.2.0,<5" +Sphinx = ">=7,<9" sphinx-autobuild = "^2021.3.14" furo = ">=2021.10.9" -sphinx-autoapi = "^1.8.4" +sphinx-autoapi = ">=3,<4" python-dateutil = "^2.8.2" ruff = "^0.0.290" types-setuptools = "^68.2.0.0" From 612e3f3df3091dcd14a88597dca5eb82867db5c0 Mon Sep 17 00:00:00 2001 From: Edwin Pavlovsky Date: Wed, 6 May 2026 11:09:51 -0400 Subject: [PATCH 3/6] Update sdk version to 0.18.1 --- CHANGELOG.md | 5 +++++ pyproject.toml | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7dbedd92..08689ce6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ All notable changes to the [Nucleus Python Client](https://github.com/scaleapi/n The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.18.1](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.18.1) - 2026-05-05 + +### Tooling / CI +- Added local Trivy scanning targets and usage docs for reproducing SG2.0 filesystem/image vulnerability scans, SARIF CI output, SBOM generation, and cleanup workflows. + ## [0.18.0](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.18.0) - 2026-04-29 ### Removed diff --git a/pyproject.toml b/pyproject.toml index 5690ea2a..e33e4b25 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ ignore = ["E501", "E741", "E731", "F401"] # Easy ignore for getting it running [tool.poetry] name = "scale-nucleus" -version = "0.18.0" +version = "0.18.1" description = "The official Python client library for Nucleus, the Data Platform for AI" license = "MIT" authors = ["Scale AI Nucleus Team "] From 938d311d6686e2c67f65952c1d82ee406547903e Mon Sep 17 00:00:00 2001 From: Edwin Pavlovsky Date: Wed, 6 May 2026 13:56:23 -0400 Subject: [PATCH 4/6] Remove support for sync dedup in sdk --- nucleus/dataset.py | 74 ++---- nucleus/deduplication.py | 19 +- tests/test_deduplication.py | 433 +++++++++++++++++++----------------- 3 files changed, 256 insertions(+), 270 deletions(-) diff --git a/nucleus/dataset.py b/nucleus/dataset.py index 6518bae4..fc30c2fd 100644 --- a/nucleus/dataset.py +++ b/nucleus/dataset.py @@ -84,11 +84,7 @@ check_items_have_dimensions, ) from .dataset_item_uploader import DatasetItemUploader -from .deduplication import ( - DeduplicationJob, - DeduplicationResult, - DeduplicationStats, -) +from .deduplication import DeduplicationJob from .deprecation_warning import deprecated from .errors import NotFoundError, NucleusAPIError from .job import CustomerJobTypes, jobs_status_overview @@ -1020,8 +1016,7 @@ def deduplicate( self, threshold: int, reference_ids: Optional[List[str]] = None, - asynchronous: bool = False, - ) -> Union[DeduplicationResult, DeduplicationJob]: + ) -> DeduplicationJob: """Deduplicate images or frames using user-defined reference IDs. This method can deduplicate an entire dataset (when reference_ids is omitted) @@ -1034,7 +1029,7 @@ def deduplicate( not the scenes themselves. Frame reference IDs or dataset item IDs should be provided for scene datasets. - For very large datasets, this operation may take significant time. - Pass ``asynchronous=True`` to avoid HTTP timeouts. + This operation runs asynchronously to avoid HTTP timeouts. Parameters: threshold: Hamming distance threshold (0-64). Lower = stricter. @@ -1042,17 +1037,11 @@ def deduplicate( reference_ids: Optional list of user-defined reference IDs to deduplicate. If not provided (or None), deduplicates the entire dataset. Cannot be an empty list - use None for entire dataset. - asynchronous: When False (default), runs dedup synchronously and - returns a :class:`DeduplicationResult` directly. When True, - kicks off a background job and returns a - :class:`DeduplicationJob` you can poll/await — recommended for - large datasets where the sync HTTP request would otherwise - time out. Returns: - :class:`DeduplicationResult` when ``asynchronous=False``, or a - :class:`DeduplicationJob` when ``asynchronous=True``. Call - ``job.result()`` on the latter to block and unpack the result. + :class:`DeduplicationJob`: A background job. Call + ``job.result()`` to block and unpack the + :class:`DeduplicationResult`. Raises: ValueError: If reference_ids is an empty list (use None for entire dataset). @@ -1071,31 +1060,16 @@ def deduplicate( if reference_ids is not None: payload[REFERENCE_IDS_KEY] = reference_ids - if asynchronous: - response = self._client.make_request( - payload, f"dataset/{self.id}/deduplicate_async" - ) - return DeduplicationJob.from_json(response, self._client) - response = self._client.make_request( - payload, f"dataset/{self.id}/deduplicate" - ) - return DeduplicationResult( - unique_item_ids=response["unique_item_ids"], - unique_reference_ids=response["unique_reference_ids"], - stats=DeduplicationStats( - threshold=threshold, - original_count=response["stats"]["original_count"], - deduplicated_count=response["stats"]["deduplicated_count"], - ), + payload, f"dataset/{self.id}/deduplicate_async" ) + return DeduplicationJob.from_json(response, self._client) def deduplicate_by_ids( self, threshold: int, dataset_item_ids: List[str], - asynchronous: bool = False, - ) -> Union[DeduplicationResult, DeduplicationJob]: + ) -> DeduplicationJob: """Deduplicate images or frames using internal Nucleus dataset item IDs. This method identifies items by internal Nucleus IDs (e.g., "di_abc123...") @@ -1109,17 +1083,11 @@ def deduplicate_by_ids( dataset_item_ids: List of internal Nucleus dataset item IDs to deduplicate. These IDs are generated by Nucleus; they are not user-defined reference IDs. Must be non-empty. - asynchronous: When False (default), runs dedup synchronously and - returns a :class:`DeduplicationResult` directly. When True, - kicks off a background job and returns a - :class:`DeduplicationJob` you can poll/await — recommended for - large id lists where the sync HTTP request would otherwise - time out. Returns: - :class:`DeduplicationResult` when ``asynchronous=False``, or a - :class:`DeduplicationJob` when ``asynchronous=True``. Call - ``job.result()`` on the latter to block and unpack the result. + :class:`DeduplicationJob`: A background job. Call + ``job.result()`` to block and unpack the + :class:`DeduplicationResult`. Raises: ValueError: If dataset_item_ids is empty. @@ -1139,24 +1107,10 @@ def deduplicate_by_ids( THRESHOLD_KEY: threshold, } - if asynchronous: - response = self._client.make_request( - payload, f"dataset/{self.id}/deduplicate_async" - ) - return DeduplicationJob.from_json(response, self._client) - response = self._client.make_request( - payload, f"dataset/{self.id}/deduplicate" - ) - return DeduplicationResult( - unique_item_ids=response["unique_item_ids"], - unique_reference_ids=response["unique_reference_ids"], - stats=DeduplicationStats( - threshold=threshold, - original_count=response["stats"]["original_count"], - deduplicated_count=response["stats"]["deduplicated_count"], - ), + payload, f"dataset/{self.id}/deduplicate_async" ) + return DeduplicationJob.from_json(response, self._client) def build_slice( self, diff --git a/nucleus/deduplication.py b/nucleus/deduplication.py index c7b6b9d2..badad5c8 100644 --- a/nucleus/deduplication.py +++ b/nucleus/deduplication.py @@ -42,10 +42,8 @@ class DeduplicationJob(AsyncJob): """Handle to a long-running deduplication job. Returned from :meth:`Dataset.deduplicate` and - :meth:`Dataset.deduplicate_by_ids` when called with ``asynchronous=True``. - Use this when your dataset (or item list) is large enough that the - synchronous request would risk timing out — the job runs in the background - and you collect the result with :meth:`result`. + :meth:`Dataset.deduplicate_by_ids`. Deduplication always runs in the + background; collect the completed output with :meth:`result`. Inherits all the standard :class:`AsyncJob` controls (:meth:`status`, :meth:`errors`, :meth:`sleep_until_complete`). @@ -57,15 +55,18 @@ class DeduplicationJob(AsyncJob): client = nucleus.NucleusClient(YOUR_API_KEY) dataset = client.get_dataset("ds_xxx") - # Sync — fine for small datasets, returns the result inline. - result = dataset.deduplicate(threshold=10) - - # Async — recommended for large datasets. - job = dataset.deduplicate(threshold=10, asynchronous=True) + job = dataset.deduplicate(threshold=10) result = job.result() # blocks until done print(result.stats.deduplicated_count) print(result.unique_reference_ids) + # You can also deduplicate a known set of internal dataset item IDs. + job = dataset.deduplicate_by_ids( + threshold=10, + dataset_item_ids=["di_xxx", "di_yyy"], + ) + result = job.result() + # Or split the wait and fetch yourself. job.sleep_until_complete() result = job.result(wait_for_completion=False) diff --git a/tests/test_deduplication.py b/tests/test_deduplication.py index d538c926..138daef3 100644 --- a/tests/test_deduplication.py +++ b/tests/test_deduplication.py @@ -1,6 +1,13 @@ import pytest -from nucleus import Dataset, DatasetItem, NucleusClient, VideoScene +from nucleus import Dataset, DatasetItem, VideoScene +from nucleus.async_job import JobError +from nucleus.constants import ( + JOB_CREATION_TIME_KEY, + JOB_ID_KEY, + JOB_LAST_KNOWN_STATUS_KEY, + JOB_TYPE_KEY, +) from nucleus.deduplication import DeduplicationJob, DeduplicationResult from nucleus.errors import NucleusAPIError @@ -15,55 +22,95 @@ ) +class FakeDeduplicationClient: + def __init__(self): + self.requests = [] + + def make_request(self, payload, route, requests_command=None): + self.requests.append( + { + "payload": payload, + "route": route, + "requests_command": requests_command, + } + ) + return { + JOB_ID_KEY: "job_fake", + JOB_LAST_KNOWN_STATUS_KEY: "Queued", + JOB_TYPE_KEY: "deduplicate", + JOB_CREATION_TIME_KEY: "2026-05-06T00:00:00Z", + } + + +def _deduplication_result(job: DeduplicationJob) -> DeduplicationResult: + assert isinstance(job, DeduplicationJob) + result = job.result() + assert isinstance(result, DeduplicationResult) + return result + + def test_deduplicate_empty_reference_ids_raises_error(): - fake_dataset = Dataset("fake", NucleusClient("fake")) + fake_client = FakeDeduplicationClient() + fake_dataset = Dataset("fake", fake_client) with pytest.raises(ValueError, match="reference_ids cannot be empty"): fake_dataset.deduplicate( threshold=DEDUP_DEFAULT_TEST_THRESHOLD, reference_ids=[] ) + assert fake_client.requests == [] def test_deduplicate_by_ids_empty_list_raises_error(): - fake_dataset = Dataset("fake", NucleusClient("fake")) + fake_client = FakeDeduplicationClient() + fake_dataset = Dataset("fake", fake_client) with pytest.raises(ValueError, match="dataset_item_ids must be non-empty"): fake_dataset.deduplicate_by_ids( threshold=DEDUP_DEFAULT_TEST_THRESHOLD, dataset_item_ids=[] ) + assert fake_client.requests == [] -def test_deduplicate_async_empty_reference_ids_raises_error(): - """Async mode performs the same client-side validation as sync mode and - must raise before issuing any HTTP request.""" - fake_dataset = Dataset("fake", NucleusClient("fake")) - with pytest.raises(ValueError, match="reference_ids cannot be empty"): - fake_dataset.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD, - reference_ids=[], - asynchronous=True, - ) +def test_deduplicate_defaults_to_async_route_and_returns_job(): + fake_client = FakeDeduplicationClient() + fake_dataset = Dataset("fake", fake_client) + job = fake_dataset.deduplicate( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD, + reference_ids=["ref_1", "ref_2"], + ) -def test_deduplicate_by_ids_async_empty_list_raises_error(): - fake_dataset = Dataset("fake", NucleusClient("fake")) - with pytest.raises(ValueError, match="dataset_item_ids must be non-empty"): - fake_dataset.deduplicate_by_ids( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD, - dataset_item_ids=[], - asynchronous=True, - ) + assert isinstance(job, DeduplicationJob) + assert fake_client.requests == [ + { + "payload": { + "threshold": DEDUP_DEFAULT_TEST_THRESHOLD, + "reference_ids": ["ref_1", "ref_2"], + }, + "route": "dataset/fake/deduplicate_async", + "requests_command": None, + } + ] -@pytest.fixture(scope="module") -def dataset_image_sync(CLIENT): - """Image dataset uploaded synchronously.""" - ds = CLIENT.create_dataset( - TEST_DATASET_NAME + " dedup sync", is_scene=False +def test_deduplicate_by_ids_defaults_to_async_route_and_returns_job(): + fake_client = FakeDeduplicationClient() + fake_dataset = Dataset("fake", fake_client) + + job = fake_dataset.deduplicate_by_ids( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD, + dataset_item_ids=["di_1", "di_2"], ) - try: - ds.append(TEST_DATASET_ITEMS) - yield ds - finally: - CLIENT.delete_dataset(ds.id) + + assert isinstance(job, DeduplicationJob) + assert fake_client.requests == [ + { + "payload": { + "dataset_item_ids": ["di_1", "di_2"], + "threshold": DEDUP_DEFAULT_TEST_THRESHOLD, + }, + "route": "dataset/fake/deduplicate_async", + "requests_command": None, + } + ] @pytest.fixture(scope="module") @@ -80,55 +127,12 @@ def dataset_image_async(CLIENT): CLIENT.delete_dataset(ds.id) -@pytest.mark.integration -def test_deduplicate_image_sync_entire_dataset(dataset_image_sync): - """Test deduplication on image dataset uploaded synchronously.""" - result = dataset_image_sync.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD - ) - assert isinstance(result, DeduplicationResult) - assert len(result.unique_reference_ids) > 0 - assert len(result.unique_item_ids) > 0 - assert result.stats.original_count == len(TEST_DATASET_ITEMS) - - -@pytest.mark.integration -def test_deduplicate_image_sync_with_reference_ids(dataset_image_sync): - """Test deduplication with reference IDs on image dataset uploaded synchronously.""" - reference_ids = [item.reference_id for item in TEST_DATASET_ITEMS[:2]] - result = dataset_image_sync.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD, reference_ids=reference_ids - ) - assert isinstance(result, DeduplicationResult) - assert result.stats.original_count == len(reference_ids) - assert len(result.unique_reference_ids) <= len(reference_ids) - assert len(result.unique_item_ids) <= len(reference_ids) - - -@pytest.mark.integration -def test_deduplicate_image_sync_by_ids(dataset_image_sync): - """Test deduplicate_by_ids on image dataset uploaded synchronously.""" - initial_result = dataset_image_sync.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD - ) - item_ids = initial_result.unique_item_ids - assert len(item_ids) > 0 - - result = dataset_image_sync.deduplicate_by_ids( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD, dataset_item_ids=item_ids - ) - assert isinstance(result, DeduplicationResult) - assert result.stats.original_count == len(item_ids) - assert result.unique_item_ids == initial_result.unique_item_ids - - @pytest.mark.integration def test_deduplicate_image_async_entire_dataset(dataset_image_async): """Test deduplication on image dataset uploaded asynchronously.""" - result = dataset_image_async.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD + result = _deduplication_result( + dataset_image_async.deduplicate(threshold=DEDUP_DEFAULT_TEST_THRESHOLD) ) - assert isinstance(result, DeduplicationResult) assert len(result.unique_reference_ids) > 0 assert len(result.unique_item_ids) > 0 assert result.stats.original_count == len(TEST_DATASET_ITEMS) @@ -138,10 +142,12 @@ def test_deduplicate_image_async_entire_dataset(dataset_image_async): def test_deduplicate_image_async_with_reference_ids(dataset_image_async): """Test deduplication with reference IDs on image dataset uploaded asynchronously.""" reference_ids = [item.reference_id for item in TEST_DATASET_ITEMS[:2]] - result = dataset_image_async.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD, reference_ids=reference_ids + result = _deduplication_result( + dataset_image_async.deduplicate( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD, + reference_ids=reference_ids, + ) ) - assert isinstance(result, DeduplicationResult) assert result.stats.original_count == len(reference_ids) assert len(result.unique_reference_ids) <= len(reference_ids) assert len(result.unique_item_ids) <= len(reference_ids) @@ -150,16 +156,18 @@ def test_deduplicate_image_async_with_reference_ids(dataset_image_async): @pytest.mark.integration def test_deduplicate_image_async_by_ids(dataset_image_async): """Test deduplicate_by_ids on image dataset uploaded asynchronously.""" - initial_result = dataset_image_async.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD + initial_result = _deduplication_result( + dataset_image_async.deduplicate(threshold=DEDUP_DEFAULT_TEST_THRESHOLD) ) item_ids = initial_result.unique_item_ids assert len(item_ids) > 0 - result = dataset_image_async.deduplicate_by_ids( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD, dataset_item_ids=item_ids + result = _deduplication_result( + dataset_image_async.deduplicate_by_ids( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD, + dataset_item_ids=item_ids, + ) ) - assert isinstance(result, DeduplicationResult) assert result.stats.original_count == len(item_ids) assert result.unique_item_ids == initial_result.unique_item_ids @@ -193,10 +201,11 @@ def test_deduplicate_video_scene_async_entire_dataset( dataset_video_scene_async, ): """Test deduplication on video scene dataset uploaded asynchronously.""" - result = dataset_video_scene_async.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD + result = _deduplication_result( + dataset_video_scene_async.deduplicate( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD + ) ) - assert isinstance(result, DeduplicationResult) assert len(result.unique_reference_ids) > 0 assert len(result.unique_item_ids) > 0 assert result.stats.original_count == len(_get_scene_frame_ref_ids()) @@ -208,10 +217,12 @@ def test_deduplicate_video_scene_async_with_frame_reference_ids( ): """Test deduplication with frame reference IDs on video scene dataset uploaded asynchronously.""" frame_ref_ids = _get_scene_frame_ref_ids() - result = dataset_video_scene_async.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD, reference_ids=frame_ref_ids + result = _deduplication_result( + dataset_video_scene_async.deduplicate( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD, + reference_ids=frame_ref_ids, + ) ) - assert isinstance(result, DeduplicationResult) assert result.stats.original_count == len(frame_ref_ids) assert len(result.unique_reference_ids) <= len(frame_ref_ids) assert len(result.unique_item_ids) <= len(frame_ref_ids) @@ -220,16 +231,20 @@ def test_deduplicate_video_scene_async_with_frame_reference_ids( @pytest.mark.integration def test_deduplicate_video_scene_async_by_ids(dataset_video_scene_async): """Test deduplicate_by_ids on video scene dataset uploaded asynchronously.""" - initial_result = dataset_video_scene_async.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD + initial_result = _deduplication_result( + dataset_video_scene_async.deduplicate( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD + ) ) item_ids = initial_result.unique_item_ids assert len(item_ids) > 0 - result = dataset_video_scene_async.deduplicate_by_ids( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD, dataset_item_ids=item_ids + result = _deduplication_result( + dataset_video_scene_async.deduplicate_by_ids( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD, + dataset_item_ids=item_ids, + ) ) - assert isinstance(result, DeduplicationResult) assert result.stats.original_count == len(item_ids) assert result.unique_item_ids == initial_result.unique_item_ids @@ -258,10 +273,11 @@ def dataset_video_url_async(CLIENT): @pytest.mark.integration def test_deduplicate_video_url_async_entire_dataset(dataset_video_url_async): """Test deduplication on video URL dataset uploaded asynchronously.""" - result = dataset_video_url_async.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD + result = _deduplication_result( + dataset_video_url_async.deduplicate( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD + ) ) - assert isinstance(result, DeduplicationResult) assert len(result.unique_reference_ids) > 0 assert len(result.unique_item_ids) > 0 assert result.stats.original_count > 0 @@ -270,16 +286,20 @@ def test_deduplicate_video_url_async_entire_dataset(dataset_video_url_async): @pytest.mark.integration def test_deduplicate_video_url_async_by_ids(dataset_video_url_async): """Test deduplicate_by_ids on video URL dataset uploaded asynchronously.""" - initial_result = dataset_video_url_async.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD + initial_result = _deduplication_result( + dataset_video_url_async.deduplicate( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD + ) ) item_ids = initial_result.unique_item_ids assert len(item_ids) > 0 - result = dataset_video_url_async.deduplicate_by_ids( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD, dataset_item_ids=item_ids + result = _deduplication_result( + dataset_video_url_async.deduplicate_by_ids( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD, + dataset_item_ids=item_ids, + ) ) - assert isinstance(result, DeduplicationResult) assert result.stats.original_count == len(item_ids) assert result.unique_item_ids == initial_result.unique_item_ids @@ -288,70 +308,76 @@ def test_deduplicate_video_url_async_by_ids(dataset_video_url_async): @pytest.mark.integration -def test_deduplicate_threshold_zero(dataset_image_sync): +def test_deduplicate_threshold_zero(dataset_image_async): """Verify threshold=0 (exact match only) succeeds and returns correct stats.""" - result = dataset_image_sync.deduplicate(threshold=0) - assert isinstance(result, DeduplicationResult) + result = _deduplication_result( + dataset_image_async.deduplicate(threshold=0) + ) assert result.stats.threshold == 0 @pytest.mark.integration -def test_deduplicate_threshold_max(dataset_image_sync): +def test_deduplicate_threshold_max(dataset_image_async): """Verify threshold=64 (maximum allowed) succeeds and returns correct stats.""" - result = dataset_image_sync.deduplicate(threshold=64) - assert isinstance(result, DeduplicationResult) + result = _deduplication_result( + dataset_image_async.deduplicate(threshold=64) + ) assert result.stats.threshold == 64 @pytest.mark.integration -def test_deduplicate_threshold_negative(dataset_image_sync): - """Verify negative threshold raises NucleusAPIError (must be >= 0).""" - with pytest.raises(NucleusAPIError): - dataset_image_sync.deduplicate(threshold=-1) +def test_deduplicate_threshold_negative(dataset_image_async): + """Verify negative threshold raises an error (must be >= 0).""" + with pytest.raises((NucleusAPIError, JobError)): + _deduplication_result(dataset_image_async.deduplicate(threshold=-1)) @pytest.mark.integration -def test_deduplicate_threshold_too_high(dataset_image_sync): - """Verify threshold > 64 raises NucleusAPIError (must be <= 64).""" - with pytest.raises(NucleusAPIError): - dataset_image_sync.deduplicate(threshold=65) +def test_deduplicate_threshold_too_high(dataset_image_async): + """Verify threshold > 64 raises an error (must be <= 64).""" + with pytest.raises((NucleusAPIError, JobError)): + _deduplication_result(dataset_image_async.deduplicate(threshold=65)) @pytest.mark.integration -def test_deduplicate_threshold_non_integer(dataset_image_sync): - """Verify non-integer threshold raises NucleusAPIError.""" - with pytest.raises(NucleusAPIError): - dataset_image_sync.deduplicate(threshold=10.5) +def test_deduplicate_threshold_non_integer(dataset_image_async): + """Verify non-integer threshold raises an error.""" + with pytest.raises((NucleusAPIError, JobError)): + _deduplication_result(dataset_image_async.deduplicate(threshold=10.5)) @pytest.mark.integration -def test_deduplicate_nonexistent_reference_id(dataset_image_sync): - """Verify nonexistent reference_id raises NucleusAPIError.""" - with pytest.raises(NucleusAPIError): - dataset_image_sync.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD, - reference_ids=["nonexistent_ref_id"], +def test_deduplicate_nonexistent_reference_id(dataset_image_async): + """Verify nonexistent reference_id raises an error.""" + with pytest.raises((NucleusAPIError, JobError)): + _deduplication_result( + dataset_image_async.deduplicate( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD, + reference_ids=["nonexistent_ref_id"], + ) ) @pytest.mark.integration -def test_deduplicate_by_ids_nonexistent_id(dataset_image_sync): - """Verify nonexistent dataset_item_id raises NucleusAPIError.""" - with pytest.raises(NucleusAPIError): - dataset_image_sync.deduplicate_by_ids( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD, - dataset_item_ids=["di_nonexistent"], +def test_deduplicate_by_ids_nonexistent_id(dataset_image_async): + """Verify nonexistent dataset_item_id raises an error.""" + with pytest.raises((NucleusAPIError, JobError)): + _deduplication_result( + dataset_image_async.deduplicate_by_ids( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD, + dataset_item_ids=["di_nonexistent"], + ) ) @pytest.mark.integration -def test_deduplicate_idempotency(dataset_image_sync): +def test_deduplicate_idempotency(dataset_image_async): """Verify repeated deduplication calls return consistent results.""" - result1 = dataset_image_sync.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD + result1 = _deduplication_result( + dataset_image_async.deduplicate(threshold=DEDUP_DEFAULT_TEST_THRESHOLD) ) - result2 = dataset_image_sync.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD + result2 = _deduplication_result( + dataset_image_async.deduplicate(threshold=DEDUP_DEFAULT_TEST_THRESHOLD) ) assert result1.unique_item_ids == result2.unique_item_ids @@ -361,10 +387,10 @@ def test_deduplicate_idempotency(dataset_image_sync): @pytest.mark.integration -def test_deduplicate_response_invariants(dataset_image_sync): +def test_deduplicate_response_invariants(dataset_image_async): """Verify response maintains expected invariants between fields.""" - result = dataset_image_sync.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD + result = _deduplication_result( + dataset_image_async.deduplicate(threshold=DEDUP_DEFAULT_TEST_THRESHOLD) ) assert len(result.unique_item_ids) == len(result.unique_reference_ids) @@ -374,39 +400,46 @@ def test_deduplicate_response_invariants(dataset_image_sync): @pytest.mark.integration -def test_deduplicate_by_ids_threshold_negative(dataset_image_sync): +def test_deduplicate_by_ids_threshold_negative(dataset_image_async): """Verify deduplicate_by_ids rejects negative threshold.""" - initial_result = dataset_image_sync.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD + initial_result = _deduplication_result( + dataset_image_async.deduplicate(threshold=DEDUP_DEFAULT_TEST_THRESHOLD) ) item_ids = initial_result.unique_item_ids - with pytest.raises(NucleusAPIError): - dataset_image_sync.deduplicate_by_ids( - threshold=-1, dataset_item_ids=item_ids + with pytest.raises((NucleusAPIError, JobError)): + _deduplication_result( + dataset_image_async.deduplicate_by_ids( + threshold=-1, dataset_item_ids=item_ids + ) ) @pytest.mark.integration -def test_deduplicate_by_ids_threshold_too_high(dataset_image_sync): +def test_deduplicate_by_ids_threshold_too_high(dataset_image_async): """Verify deduplicate_by_ids rejects threshold > 64.""" - initial_result = dataset_image_sync.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD + initial_result = _deduplication_result( + dataset_image_async.deduplicate(threshold=DEDUP_DEFAULT_TEST_THRESHOLD) ) item_ids = initial_result.unique_item_ids - with pytest.raises(NucleusAPIError): - dataset_image_sync.deduplicate_by_ids( - threshold=65, dataset_item_ids=item_ids + with pytest.raises((NucleusAPIError, JobError)): + _deduplication_result( + dataset_image_async.deduplicate_by_ids( + threshold=65, dataset_item_ids=item_ids + ) ) @pytest.mark.integration -def test_deduplicate_single_item(dataset_image_sync): +def test_deduplicate_single_item(dataset_image_async): """Verify single item deduplication returns that item as unique.""" reference_ids = [TEST_DATASET_ITEMS[0].reference_id] - result = dataset_image_sync.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD, reference_ids=reference_ids + result = _deduplication_result( + dataset_image_async.deduplicate( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD, + reference_ids=reference_ids, + ) ) assert result.stats.original_count == 1 @@ -427,7 +460,9 @@ def dataset_empty(CLIENT): @pytest.mark.integration def test_deduplicate_empty_dataset(dataset_empty): """Verify deduplication on empty dataset returns zero counts.""" - result = dataset_empty.deduplicate(threshold=DEDUP_DEFAULT_TEST_THRESHOLD) + result = _deduplication_result( + dataset_empty.deduplicate(threshold=DEDUP_DEFAULT_TEST_THRESHOLD) + ) assert result.stats.original_count == 0 assert result.stats.deduplicated_count == 0 @@ -456,7 +491,9 @@ def dataset_with_duplicates(CLIENT): @pytest.mark.integration def test_deduplicate_identifies_duplicates(dataset_with_duplicates): """Verify deduplication actually identifies duplicate images.""" - result = dataset_with_duplicates.deduplicate(threshold=0) + result = _deduplication_result( + dataset_with_duplicates.deduplicate(threshold=0) + ) assert result.stats.original_count == 3 # With threshold=0, the two identical images should be deduplicated to one @@ -465,65 +502,62 @@ def test_deduplicate_identifies_duplicates(dataset_with_duplicates): @pytest.mark.integration -def test_deduplicate_distinct_images_all_unique(dataset_image_sync): +def test_deduplicate_distinct_images_all_unique(dataset_image_async): """Distinct images should all remain after deduplication.""" - result = dataset_image_sync.deduplicate(threshold=0) + result = _deduplication_result( + dataset_image_async.deduplicate(threshold=0) + ) # With threshold=0 (exact match only), all distinct images should be unique assert result.stats.deduplicated_count == result.stats.original_count # --------------------------------------------------------------------------- -# Async-mode tests +# Default async job tests # -# These exercise the `asynchronous=True` code path: the SDK kicks off a -# Temporal-backed dedup job on the server, returns a `DeduplicationJob`, and -# the caller polls/awaits via `job.result()`. The result payload should match -# the sync flow for the same inputs. +# The SDK kicks off a Temporal-backed dedup job on the server, returns a +# `DeduplicationJob`, and the caller polls/awaits via `job.result()`. The +# result payload should match the direct result contract for the same inputs. # --------------------------------------------------------------------------- @pytest.mark.integration -def test_deduplicate_async_returns_job_and_result(dataset_image_sync): - """`asynchronous=True` returns a DeduplicationJob whose `.result()` blocks - until the workflow completes and yields a DeduplicationResult equivalent - to the sync output for the same inputs.""" - sync_result = dataset_image_sync.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD +def test_deduplicate_returns_job_and_result(dataset_image_async): + """`deduplicate` returns a DeduplicationJob whose `.result()` blocks until + the workflow completes and yields a DeduplicationResult.""" + first_result = _deduplication_result( + dataset_image_async.deduplicate(threshold=DEDUP_DEFAULT_TEST_THRESHOLD) ) - job = dataset_image_sync.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD, asynchronous=True + job = dataset_image_async.deduplicate( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD ) assert isinstance(job, DeduplicationJob) assert ( job.job_type ) # populated by AsyncJob.from_json from the server response - async_result = job.result() - assert isinstance(async_result, DeduplicationResult) - assert async_result.stats.threshold == DEDUP_DEFAULT_TEST_THRESHOLD - assert ( - async_result.stats.original_count == sync_result.stats.original_count - ) + result = job.result() + assert isinstance(result, DeduplicationResult) + assert result.stats.threshold == DEDUP_DEFAULT_TEST_THRESHOLD + assert result.stats.original_count == first_result.stats.original_count assert ( - async_result.stats.deduplicated_count - == sync_result.stats.deduplicated_count + result.stats.deduplicated_count + == first_result.stats.deduplicated_count ) - assert set(async_result.unique_reference_ids) == set( - sync_result.unique_reference_ids + assert set(result.unique_reference_ids) == set( + first_result.unique_reference_ids ) @pytest.mark.integration -def test_deduplicate_async_with_reference_ids(dataset_image_sync): - """Async mode respects an explicit `reference_ids` scope just like sync.""" +def test_deduplicate_with_reference_ids_returns_job(dataset_image_async): + """Default async mode respects an explicit `reference_ids` scope.""" reference_ids = [item.reference_id for item in TEST_DATASET_ITEMS[:2]] - job = dataset_image_sync.deduplicate( + job = dataset_image_async.deduplicate( threshold=DEDUP_DEFAULT_TEST_THRESHOLD, reference_ids=reference_ids, - asynchronous=True, ) assert isinstance(job, DeduplicationJob) @@ -534,18 +568,17 @@ def test_deduplicate_async_with_reference_ids(dataset_image_sync): @pytest.mark.integration -def test_deduplicate_by_ids_async(dataset_image_sync): - """Async `deduplicate_by_ids` parallels the sync entrypoint.""" - initial_result = dataset_image_sync.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD +def test_deduplicate_by_ids_returns_job(dataset_image_async): + """Default async `deduplicate_by_ids` returns a job and result.""" + initial_result = _deduplication_result( + dataset_image_async.deduplicate(threshold=DEDUP_DEFAULT_TEST_THRESHOLD) ) item_ids = initial_result.unique_item_ids assert len(item_ids) > 0 - job = dataset_image_sync.deduplicate_by_ids( + job = dataset_image_async.deduplicate_by_ids( threshold=DEDUP_DEFAULT_TEST_THRESHOLD, dataset_item_ids=item_ids, - asynchronous=True, ) assert isinstance(job, DeduplicationJob) @@ -556,25 +589,23 @@ def test_deduplicate_by_ids_async(dataset_image_sync): @pytest.mark.integration -def test_deduplicate_async_identifies_duplicates(dataset_with_duplicates): - """Sanity check: async mode produces the same dedup outcome as sync for a - dataset with known duplicates.""" - job = dataset_with_duplicates.deduplicate(threshold=0, asynchronous=True) +def test_deduplicate_identifies_duplicates_via_job(dataset_with_duplicates): + """Sanity check: default async mode produces the expected dedup outcome.""" + job = dataset_with_duplicates.deduplicate(threshold=0) assert isinstance(job, DeduplicationJob) result = job.result() assert result.stats.original_count == 3 - # Same duplicate-detection logic regardless of transport. assert result.stats.deduplicated_count == 2 assert len(result.unique_reference_ids) == 2 @pytest.mark.integration -def test_deduplicate_async_split_wait_and_fetch(dataset_image_sync): +def test_deduplicate_split_wait_and_fetch(dataset_image_async): """`result(wait_for_completion=False)` is a usable mode when the caller has already blocked via `sleep_until_complete()`.""" - job = dataset_image_sync.deduplicate( - threshold=DEDUP_DEFAULT_TEST_THRESHOLD, asynchronous=True + job = dataset_image_async.deduplicate( + threshold=DEDUP_DEFAULT_TEST_THRESHOLD ) assert isinstance(job, DeduplicationJob) From b3f783db0214669300aea0c800b081cbf7c8defb Mon Sep 17 00:00:00 2001 From: Edwin Pavlovsky Date: Wed, 6 May 2026 13:56:54 -0400 Subject: [PATCH 5/6] Updater CHANGELOG --- CHANGELOG.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 08689ce6..c00b814d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.18.1](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.18.1) - 2026-05-05 -### Tooling / CI -- Added local Trivy scanning targets and usage docs for reproducing SG2.0 filesystem/image vulnerability scans, SARIF CI output, SBOM generation, and cleanup workflows. +### Changed +- `Dataset.deduplicate()` and `Dataset.deduplicate_by_ids()` now run asynchronously and return a `DeduplicationJob` instead of returning a `DeduplicationResult` directly. Call `job.result()` to wait for completion and retrieve the result. + +### Removed +- Sync deduplication support for `Dataset.deduplicate()` and `Dataset.deduplicate_by_ids()`. ## [0.18.0](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.18.0) - 2026-04-29 From e48300a81a1ed5406908c5371f672e1c5299d570 Mon Sep 17 00:00:00 2001 From: Edwin Pavlovsky Date: Wed, 6 May 2026 14:29:44 -0400 Subject: [PATCH 6/6] Address greptile --- nucleus/deduplication.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/nucleus/deduplication.py b/nucleus/deduplication.py index badad5c8..0119fcdc 100644 --- a/nucleus/deduplication.py +++ b/nucleus/deduplication.py @@ -1,8 +1,24 @@ from dataclasses import dataclass -from typing import Any, Dict, List, cast +from typing import Any, Dict, List, Sequence, cast from nucleus.async_job import AsyncJob, JobError +REQUIRED_RESULT_FIELDS = ("unique_item_ids", "unique_reference_ids", "stats") +REQUIRED_STATS_FIELDS = ("threshold", "original_count", "deduplicated_count") + + +def _require_fields( + payload: Dict[str, Any], required_fields: Sequence[str], context: str +) -> None: + missing_fields = [ + field for field in required_fields if field not in payload + ] + if missing_fields: + missing_fields_message = ", ".join(missing_fields) + raise RuntimeError( + f"Deduplication job result missing {context} field(s): {missing_fields_message}" + ) + @dataclass class DeduplicationStats: @@ -90,6 +106,8 @@ def result( Raises: JobError: If the job did not finish successfully (e.g. it was cancelled or hit a server error). + RuntimeError: If the completed job response is missing expected + result fields. """ if wait_for_completion: self.sleep_until_complete(verbose_std_out=False) @@ -102,12 +120,14 @@ def result( # the `message` slot is a JSON dict in practice. Cast locally so # static checkers don't flag the dict accesses below. msg = cast(Dict[str, Any], status["message"] or {}) + _require_fields(msg, REQUIRED_RESULT_FIELDS, "result") stats = cast(Dict[str, Any], msg.get("stats") or {}) + _require_fields(stats, REQUIRED_STATS_FIELDS, "stats") return DeduplicationResult( unique_item_ids=msg["unique_item_ids"], unique_reference_ids=msg["unique_reference_ids"], stats=DeduplicationStats( - threshold=stats.get("threshold", 0), + threshold=stats["threshold"], original_count=stats["original_count"], deduplicated_count=stats["deduplicated_count"], ),