Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ All notable changes to the [Nucleus Python Client](https://github.com/scaleapi/n
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.18.1](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.18.1) - 2026-05-05

### Changed
- `Dataset.deduplicate()` and `Dataset.deduplicate_by_ids()` now run asynchronously and return a `DeduplicationJob` instead of returning a `DeduplicationResult` directly. Call `job.result()` to wait for completion and retrieve the result.

### Removed
- Sync deduplication support for `Dataset.deduplicate()` and `Dataset.deduplicate_by_ids()`.

## [0.18.0](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.18.0) - 2026-04-29

### Removed
Expand Down
7 changes: 6 additions & 1 deletion nucleus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"AsyncJob",
"EmbeddingsExportJob",
"BoxAnnotation",
"DeduplicationJob",
"DeduplicationResult",
"DeduplicationStats",
"BoxPrediction",
Expand Down Expand Up @@ -131,7 +132,11 @@
from .data_transfer_object.job_status import JobInfoRequestPayload
from .dataset import Dataset
from .dataset_item import DatasetItem
from .deduplication import DeduplicationResult, DeduplicationStats
from .deduplication import (
DeduplicationJob,
DeduplicationResult,
DeduplicationStats,
)
from .deprecation_warning import deprecated
from .errors import (
DatasetItemRetrievalError,
Expand Down
40 changes: 15 additions & 25 deletions nucleus/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
check_items_have_dimensions,
)
from .dataset_item_uploader import DatasetItemUploader
from .deduplication import DeduplicationResult, DeduplicationStats
from .deduplication import DeduplicationJob
from .deprecation_warning import deprecated
from .errors import NotFoundError, NucleusAPIError
from .job import CustomerJobTypes, jobs_status_overview
Expand Down Expand Up @@ -1016,7 +1016,7 @@ def deduplicate(
self,
threshold: int,
reference_ids: Optional[List[str]] = None,
) -> DeduplicationResult:
) -> DeduplicationJob:
"""Deduplicate images or frames using user-defined reference IDs.

This method can deduplicate an entire dataset (when reference_ids is omitted)
Expand All @@ -1029,6 +1029,7 @@ def deduplicate(
not the scenes themselves. Frame reference IDs or dataset item IDs
should be provided for scene datasets.
- For very large datasets, this operation may take significant time.
This operation runs asynchronously to avoid HTTP timeouts.

Parameters:
threshold: Hamming distance threshold (0-64). Lower = stricter.
Expand All @@ -1038,7 +1039,9 @@ def deduplicate(
Cannot be an empty list - use None for entire dataset.

Returns:
DeduplicationResult with unique_reference_ids, unique_item_ids, and stats.
:class:`DeduplicationJob`: A background job. Call
``job.result()`` to block and unpack the
:class:`DeduplicationResult`.

Raises:
ValueError: If reference_ids is an empty list (use None for entire dataset).
Expand All @@ -1058,23 +1061,15 @@ def deduplicate(
payload[REFERENCE_IDS_KEY] = reference_ids

response = self._client.make_request(
payload, f"dataset/{self.id}/deduplicate"
)
return DeduplicationResult(
unique_item_ids=response["unique_item_ids"],
unique_reference_ids=response["unique_reference_ids"],
stats=DeduplicationStats(
threshold=threshold,
original_count=response["stats"]["original_count"],
deduplicated_count=response["stats"]["deduplicated_count"],
),
payload, f"dataset/{self.id}/deduplicate_async"
)
return DeduplicationJob.from_json(response, self._client)

def deduplicate_by_ids(
self,
threshold: int,
dataset_item_ids: List[str],
) -> DeduplicationResult:
) -> DeduplicationJob:
"""Deduplicate images or frames using internal Nucleus dataset item IDs.

This method identifies items by internal Nucleus IDs (e.g., "di_abc123...")
Expand All @@ -1090,7 +1085,9 @@ def deduplicate_by_ids(
user-defined reference IDs. Must be non-empty.

Returns:
DeduplicationResult with unique_item_ids, unique_reference_ids, and stats.
:class:`DeduplicationJob`: A background job. Call
``job.result()`` to block and unpack the
:class:`DeduplicationResult`.

Raises:
ValueError: If dataset_item_ids is empty.
Expand All @@ -1109,18 +1106,11 @@ def deduplicate_by_ids(
DATASET_ITEM_IDS_KEY: dataset_item_ids,
THRESHOLD_KEY: threshold,
}

response = self._client.make_request(
payload, f"dataset/{self.id}/deduplicate"
)
return DeduplicationResult(
unique_item_ids=response["unique_item_ids"],
unique_reference_ids=response["unique_reference_ids"],
stats=DeduplicationStats(
threshold=threshold,
original_count=response["stats"]["original_count"],
deduplicated_count=response["stats"]["deduplicated_count"],
),
payload, f"dataset/{self.id}/deduplicate_async"
)
return DeduplicationJob.from_json(response, self._client)

def build_slice(
self,
Expand Down
2 changes: 1 addition & 1 deletion nucleus/dataset_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class DatasetItem: # pylint: disable=R0902
camera intrinsics the metadata of your camera image items. Nucleus
requires these intrinsics to create visualizations such as cuboid
projections. Refer to our `guide to uploading 3D data
<https://nucleus.scale.com/docs/uploading-3d-data>`_ for more
<https://nucleus.scale.com/docs/uploading-3d-data>`__ for more
info.

Coordinate metadata may be provided to enable the Map Chart in the Nucleus Dataset charts page.
Expand Down
124 changes: 121 additions & 3 deletions nucleus/deduplication.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,134 @@
from dataclasses import dataclass
from typing import List
from typing import Any, Dict, List, Sequence, cast

from nucleus.async_job import AsyncJob, JobError

REQUIRED_RESULT_FIELDS = ("unique_item_ids", "unique_reference_ids", "stats")
REQUIRED_STATS_FIELDS = ("threshold", "original_count", "deduplicated_count")


def _require_fields(
payload: Dict[str, Any], required_fields: Sequence[str], context: str
) -> None:
missing_fields = [
field for field in required_fields if field not in payload
]
if missing_fields:
missing_fields_message = ", ".join(missing_fields)
raise RuntimeError(
f"Deduplication job result missing {context} field(s): {missing_fields_message}"
)


@dataclass
class DeduplicationStats:
"""Summary statistics for a deduplication run.

Attributes:
threshold: The Hamming distance threshold the run was executed at.
Lower values are stricter; ``0`` means exact matches only.
original_count: How many items were considered before deduplication.
deduplicated_count: How many unique items remained afterwards.
"""

threshold: int
original_count: int
deduplicated_count: int


@dataclass
class DeduplicationResult:
unique_item_ids: List[str] # Internal dataset item IDs
unique_reference_ids: List[str] # User-defined reference IDs
"""Output of a deduplication run.

Attributes:
unique_item_ids: Nucleus-internal dataset item IDs (e.g.
``"di_abc123..."``) that survived deduplication. One entry per
kept item.
unique_reference_ids: The user-defined reference IDs you supplied at
upload time, in the same order as ``unique_item_ids``.
stats: Summary statistics for the run. See :class:`DeduplicationStats`.
"""

unique_item_ids: List[str]
unique_reference_ids: List[str]
stats: DeduplicationStats


class DeduplicationJob(AsyncJob):
"""Handle to a long-running deduplication job.

Returned from :meth:`Dataset.deduplicate` and
:meth:`Dataset.deduplicate_by_ids`. Deduplication always runs in the
background; collect the completed output with :meth:`result`.

Inherits all the standard :class:`AsyncJob` controls
(:meth:`status`, :meth:`errors`, :meth:`sleep_until_complete`).

::

import nucleus

client = nucleus.NucleusClient(YOUR_API_KEY)
dataset = client.get_dataset("ds_xxx")

job = dataset.deduplicate(threshold=10)
result = job.result() # blocks until done
print(result.stats.deduplicated_count)
print(result.unique_reference_ids)

# You can also deduplicate a known set of internal dataset item IDs.
job = dataset.deduplicate_by_ids(
threshold=10,
dataset_item_ids=["di_xxx", "di_yyy"],
)
result = job.result()

# Or split the wait and fetch yourself.
job.sleep_until_complete()
result = job.result(wait_for_completion=False)
"""

def result(
self, wait_for_completion: bool = True
) -> "DeduplicationResult":
"""Return the deduplication result, optionally waiting for the job.

Parameters:
wait_for_completion: When ``True`` (default), block until the job
reaches a terminal state. When ``False``, the caller is
expected to have already waited (e.g. via
:meth:`sleep_until_complete`).

Returns:
A :class:`DeduplicationResult` containing the kept item IDs,
reference IDs, and run statistics.

Raises:
JobError: If the job did not finish successfully (e.g. it was
cancelled or hit a server error).
RuntimeError: If the completed job response is missing expected
result fields.
"""
if wait_for_completion:
self.sleep_until_complete(verbose_std_out=False)

status = self.status()
if status["status"] != "Completed":
raise JobError(status, self)

# AsyncJob.status() is typed as Dict[str, str] in the base class, but
# the `message` slot is a JSON dict in practice. Cast locally so
# static checkers don't flag the dict accesses below.
msg = cast(Dict[str, Any], status["message"] or {})
_require_fields(msg, REQUIRED_RESULT_FIELDS, "result")
stats = cast(Dict[str, Any], msg.get("stats") or {})
_require_fields(stats, REQUIRED_STATS_FIELDS, "stats")
return DeduplicationResult(
unique_item_ids=msg["unique_item_ids"],
unique_reference_ids=msg["unique_reference_ids"],
stats=DeduplicationStats(
threshold=stats["threshold"],
original_count=stats["original_count"],
deduplicated_count=stats["deduplicated_count"],
),
)
4 changes: 2 additions & 2 deletions nucleus/scene.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class Frame:
pointcloud and any number of images (e.g. from different angles).

Refer to our `guide to uploading 3D data
<https://docs.nucleus.scale.com/docs/uploading-3d-data>`_ for more info!
<https://nucleus.scale.com/docs/uploading-3d-data>`__ for more info!
"""

def __init__(self, **kwargs: DatasetItem) -> None:
Expand Down Expand Up @@ -419,7 +419,7 @@ class LidarScene(Scene):
`{ "context_attachments": [ { "attachment": 'https://example.com/1' }, { "attachment": 'https://example.com/2' }, ... ] }`.

Refer to our `guide to uploading 3D data
<https://docs.nucleus.scale.com/docs/uploading-3d-data>`_ for more info!
<https://nucleus.scale.com/docs/uploading-3d-data>`__ for more info!
"""

def __repr__(self) -> str:
Expand Down
Loading