From 8d0c94726b7cd30b5853261579d4a5688e05ea77 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 26 Apr 2026 18:23:50 +0800 Subject: [PATCH 01/17] [python] Support value stats with truncate mode by default --- .../pypaimon/common/options/core_options.py | 11 ++- .../tests/py36/rest_ao_read_write_test.py | 33 +++---- .../pypaimon/tests/reader_base_test.py | 79 +++++++++++----- .../pypaimon/write/writer/data_writer.py | 94 ++++++++++++++----- 4 files changed, 147 insertions(+), 70 deletions(-) diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index 8b84e6d38d73..e435a0827029 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -199,8 +199,10 @@ class CoreOptions: METADATA_STATS_MODE: ConfigOption[str] = ( ConfigOptions.key("metadata.stats-mode") .string_type() - .default_value("none") - .with_description("Stats Mode, Python by default is none. Java is truncate(16).") + .default_value("truncate(16)") + .with_description("The mode of metadata stats. Available modes: " + "'none' (no stats), 'counts' (null counts only), " + "'full' (exact min/max), 'truncate(length)' (truncated min/max).") ) BLOB_AS_DESCRIPTOR: ConfigOption[bool] = ( @@ -475,7 +477,10 @@ def file_block_size(self, default=None): return self.options.get(CoreOptions.FILE_BLOCK_SIZE, default) def metadata_stats_enabled(self, default=None): - return self.options.get(CoreOptions.METADATA_STATS_MODE, default) == "full" + return self.options.get(CoreOptions.METADATA_STATS_MODE, default) != "none" + + def metadata_stats_mode(self, default=None): + return self.options.get(CoreOptions.METADATA_STATS_MODE, default) def blob_as_descriptor(self, default=None): return self.options.get(CoreOptions.BLOB_AS_DESCRIPTOR, default) diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py index cfdf33b755d5..e35bad033a59 100644 --- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py @@ -144,9 +144,7 @@ def test_full_data_types(self): ('f10', pa.decimal128(10, 2)), ('f11', pa.date32()), ]) - stats_enabled = random.random() < 0.5 - options = {'metadata.stats-mode': 'full'} if stats_enabled else {} - schema = Schema.from_pyarrow_schema(simple_pa_schema, options=options) + schema = Schema.from_pyarrow_schema(simple_pa_schema) self.rest_catalog.create_table('default.test_full_data_types', schema, False) table = self.rest_catalog.get_table('default.test_full_data_types') @@ -186,25 +184,16 @@ def test_full_data_types(self): manifest_files[0].file_name, lambda row: table_scan.file_scanner._filter_manifest_entry(row), drop_stats=False) - # Python write does not produce value stats - if stats_enabled: - self.assertEqual(manifest_entries[0].file.value_stats_cols, None) - min_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data, - table.fields).values - max_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data, - table.fields).values - expected_min_values = [col[0].as_py() for col in expect_data] - expected_max_values = [col[1].as_py() for col in expect_data] - self.assertEqual(min_value_stats, expected_min_values) - self.assertEqual(max_value_stats, expected_max_values) - else: - self.assertEqual(manifest_entries[0].file.value_stats_cols, []) - min_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data, - []).values - max_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data, - []).values - self.assertEqual(min_value_stats, []) - self.assertEqual(max_value_stats, []) + # Both 'full' and default 'truncate(16)' modes produce value stats + self.assertEqual(manifest_entries[0].file.value_stats_cols, None) + min_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data, + table.fields).values + max_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data, + table.fields).values + expected_min_values = [col[0].as_py() for col in expect_data] + expected_max_values = [col[1].as_py() for col in expect_data] + self.assertEqual(min_value_stats, expected_min_values) + self.assertEqual(max_value_stats, expected_max_values) def test_mixed_add_and_delete_entries_same_partition(self): """Test record_count calculation with mixed ADD/DELETE entries in same partition.""" diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 657678f9eabc..847083845ab0 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -216,10 +216,9 @@ def test_full_data_types(self): table_read = read_builder.new_read() splits = table_scan.plan().splits() - # assert data file without stats + # splits have stats dropped (drop_stats=True by default) first_file = splits[0].files[0] self.assertEqual(first_file.value_stats_cols, []) - self.assertEqual(first_file.value_stats, SimpleStats.empty_stats()) # assert equal actual_data = table_read.to_arrow(splits) @@ -231,25 +230,16 @@ def test_full_data_types(self): manifest_entries = table_scan.file_scanner.manifest_file_manager.read( manifest_files[0].file_name, lambda row: table_scan.file_scanner._filter_manifest_entry(row), False) - # Python write does not produce value stats - if stats_enabled: - self.assertEqual(manifest_entries[0].file.value_stats_cols, None) - min_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data, - table.fields).values - max_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data, - table.fields).values - expected_min_values = [col[0].as_py() for col in expect_data] - expected_max_values = [col[1].as_py() for col in expect_data] - self.assertEqual(min_value_stats, expected_min_values) - self.assertEqual(max_value_stats, expected_max_values) - else: - self.assertEqual(manifest_entries[0].file.value_stats_cols, []) - min_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data, - []).values - max_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data, - []).values - self.assertEqual(min_value_stats, []) - self.assertEqual(max_value_stats, []) + # Both 'full' and default 'truncate(16)' modes produce value stats + self.assertEqual(manifest_entries[0].file.value_stats_cols, None) + min_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data, + table.fields).values + max_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data, + table.fields).values + expected_min_values = [col[0].as_py() for col in expect_data] + expected_max_values = [col[1].as_py() for col in expect_data] + self.assertEqual(min_value_stats, expected_min_values) + self.assertEqual(max_value_stats, expected_max_values) def test_write_wrong_schema(self): self.catalog.create_table('default.test_wrong_schema', @@ -552,6 +542,53 @@ def test_primary_key_value_stats_excludes_system_fields(self): self.assertFalse(is_system_field, f"value_stats_cols should not contain system field: {field_name}") + def test_truncate_stats(self): + from pypaimon.write.writer.data_writer import _truncate_min, _truncate_max + + self.assertEqual(_truncate_min('abcdefghij', 5), 'abcde') + self.assertEqual(_truncate_min('abc', 5), 'abc') + self.assertEqual(_truncate_min(None, 5), None) + self.assertEqual(_truncate_min(42, 5), 42) + + self.assertEqual(_truncate_max('abc', 5), 'abc') + self.assertEqual(_truncate_max('abcdefghij', 5), 'abcdf') + self.assertEqual(_truncate_max(None, 5), None) + self.assertEqual(_truncate_max(42, 5), 42) + + self.assertEqual(_truncate_min(b'\x01\x02\x03\x04\x05\x06', 3), b'\x01\x02\x03\x04\x05\x06') + self.assertEqual(_truncate_max(b'\x01\x02\x03\x04\x05\x06', 3), b'\x01\x02\x03\x04\x05\x06') + + def test_default_truncate_stats_e2e(self): + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db_truncate_e2e", True) + + pa_schema = pa.schema([('id', pa.int64()), ('name', pa.string())]) + schema = Schema.from_pyarrow_schema(pa_schema) + catalog.create_table("test_db_truncate_e2e.t", schema, False) + table = catalog.get_table("test_db_truncate_e2e.t") + + long_str = 'a' * 30 + data = pa.Table.from_pydict({'id': [1], 'name': [long_str]}) + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(data) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + snap = SnapshotManager(table).get_latest_snapshot() + rb = table.new_read_builder() + scan = rb.new_scan() + mf = scan.file_scanner.manifest_list_manager.read_all(snap) + entries = scan.file_scanner.manifest_file_manager.read( + mf[0].file_name, lambda r: True, drop_stats=False) + stats = entries[0].file.value_stats + min_row = GenericRowDeserializer.from_bytes(stats.min_values.data, table.fields) + max_row = GenericRowDeserializer.from_bytes(stats.max_values.data, table.fields) + self.assertEqual(min_row.values[1], 'a' * 16) + self.assertEqual(max_row.values[1], 'a' * 15 + 'b') + def test_value_stats_empty_when_stats_disabled(self): catalog = CatalogFactory.create({ "warehouse": self.warehouse diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 725a1fb230de..ae1604ec3e16 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -31,6 +31,29 @@ from pypaimon.table.row.generic_row import GenericRow +def _truncate_min(value, length): + if value is None: + return None + if isinstance(value, str) and len(value) > length: + return value[:length] + return value + + +def _truncate_max(value, length): + if value is None: + return None + if isinstance(value, str): + if len(value) <= length: + return value + truncated = value[:length] + for i in range(len(truncated) - 1, -1, -1): + next_cp = ord(truncated[i]) + 1 + if next_cp != 0 and next_cp <= 0x10FFFF: + return truncated[:i] + chr(next_cp) + return None + return value + + class DataWriter(ABC): """Base class for data writers that handle PyArrow tables directly.""" @@ -63,6 +86,7 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, op self.committed_files: List[DataFileMeta] = [] self.write_cols = write_cols self.blob_as_descriptor = self.options.blob_as_descriptor() + self.stats_mode = self.options.metadata_stats_mode() self.path_factory = self.table.path_factory() self.external_path_provider: Optional[ExternalPathProvider] = self.path_factory.create_external_path_provider( @@ -191,24 +215,20 @@ def _write_data_to_file(self, data: pa.Table): min_key = [col.to_pylist()[0] for col in min_key_row_batch.columns] max_key = [col.to_pylist()[0] for col in max_key_row_batch.columns] - # key stats & value stats - value_stats_enabled = self.options.metadata_stats_enabled() - if value_stats_enabled: - stats_fields = self.table.fields if self.table.is_primary_key_table \ - else PyarrowFieldParser.to_paimon_schema(data.schema) - else: - stats_fields = self.table.trimmed_primary_keys_fields - column_stats = { - field.name: self._get_column_stats(data, field.name) - for field in stats_fields - } + # key stats (always computed with "full" mode, not affected by stats mode) key_fields = self.trimmed_primary_keys_fields - key_stats = self._collect_value_stats(data, key_fields, column_stats) + key_stats = self._collect_value_stats(data, key_fields, mode="full") if not all(count == 0 for count in key_stats.null_counts): raise RuntimeError("Primary key should not be null") - value_fields = stats_fields if value_stats_enabled else [] - value_stats = self._collect_value_stats(data, value_fields, column_stats) + # value stats + value_stats_enabled = self.options.metadata_stats_enabled() + if value_stats_enabled: + value_fields = self.table.fields if self.table.is_primary_key_table \ + else PyarrowFieldParser.to_paimon_schema(data.schema) + else: + value_fields = [] + value_stats = self._collect_value_stats(data, value_fields) min_seq = self.sequence_generator.start max_seq = self.sequence_generator.current @@ -269,13 +289,15 @@ def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> int: return best_split def _collect_value_stats(self, data: pa.Table, fields: List, - column_stats: Optional[Dict[str, Dict]] = None) -> SimpleStats: + column_stats: Optional[Dict[str, Dict]] = None, + mode: str = None) -> SimpleStats: if not fields: return SimpleStats.empty_stats() - + if column_stats is None or not column_stats: + m = mode or self.stats_mode column_stats = { - field.name: self._get_column_stats(data, field.name) + field.name: self._get_column_stats(data, field.name, m) for field in fields } @@ -290,32 +312,56 @@ def _collect_value_stats(self, data: pa.Table, fields: List, ) @staticmethod - def _get_column_stats(record_batch: pa.RecordBatch, column_name: str) -> Dict: + def _get_column_stats(record_batch: pa.RecordBatch, column_name: str, + mode: str = "truncate(16)") -> Dict: + upper_mode = mode.upper() + + if upper_mode == "NONE": + return { + "min_values": None, + "max_values": None, + "null_counts": None, + } + column_array = record_batch.column(column_name) + + if upper_mode == "COUNTS": + return { + "min_values": None, + "max_values": None, + "null_counts": column_array.null_count, + } + if column_array.null_count == len(column_array): return { "min_values": None, "max_values": None, "null_counts": column_array.null_count, } - + column_type = column_array.type - supports_minmax = not (pa.types.is_nested(column_type) or pa.types.is_map(column_type)) - + supports_minmax = not (pa.types.is_nested(column_type) or pa.types.is_map(column_type) + or pa.types.is_large_binary(column_type)) + if not supports_minmax: return { "min_values": None, "max_values": None, "null_counts": column_array.null_count, } - + min_values = pc.min(column_array).as_py() max_values = pc.max(column_array).as_py() - null_counts = column_array.null_count + + if upper_mode.startswith("TRUNCATE(") and upper_mode.endswith(")"): + truncate_length = int(upper_mode[9:-1]) + min_values = _truncate_min(min_values, truncate_length) + max_values = _truncate_max(max_values, truncate_length) + return { "min_values": min_values, "max_values": max_values, - "null_counts": null_counts, + "null_counts": column_array.null_count, } From d33b32316f3c9809def4caaaaf1bef57f9d27220 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 26 Apr 2026 19:19:24 +0800 Subject: [PATCH 02/17] fix code format --- paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py index e35bad033a59..48be66b422d6 100644 --- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py @@ -17,7 +17,6 @@ """ import logging import time -import random from datetime import date from decimal import Decimal from unittest.mock import Mock From 8ce88432720c4e8333528ee1a495b3a9e761fa32 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 26 Apr 2026 19:30:46 +0800 Subject: [PATCH 03/17] [python] Remove dead code in _truncate_max --- paimon-python/pypaimon/write/writer/data_writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index ae1604ec3e16..f0c00f616b1d 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -48,7 +48,7 @@ def _truncate_max(value, length): truncated = value[:length] for i in range(len(truncated) - 1, -1, -1): next_cp = ord(truncated[i]) + 1 - if next_cp != 0 and next_cp <= 0x10FFFF: + if next_cp <= 0x10FFFF: return truncated[:i] + chr(next_cp) return None return value From 7bfcb5709f4e3efce81ea196427fa5dae444a1e6 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 26 Apr 2026 19:50:54 +0800 Subject: [PATCH 04/17] [python] Validate stats mode and align truncate failure behavior with Java --- .../pypaimon/common/options/core_options.py | 2 +- .../pypaimon/write/writer/data_writer.py | 23 +++++++++++++++---- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index e435a0827029..b457f5170c45 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -477,7 +477,7 @@ def file_block_size(self, default=None): return self.options.get(CoreOptions.FILE_BLOCK_SIZE, default) def metadata_stats_enabled(self, default=None): - return self.options.get(CoreOptions.METADATA_STATS_MODE, default) != "none" + return self.options.get(CoreOptions.METADATA_STATS_MODE, default).upper() != "NONE" def metadata_stats_mode(self, default=None): return self.options.get(CoreOptions.METADATA_STATS_MODE, default) diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index f0c00f616b1d..9d9ba384c298 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -311,12 +311,24 @@ def _collect_value_stats(self, data: pa.Table, fields: List, null_counts ) + @staticmethod + def _parse_truncate_length(mode: str): + upper = mode.upper() + if upper in ("NONE", "COUNTS", "FULL"): + return upper, None + if upper.startswith("TRUNCATE(") and upper.endswith(")"): + length = int(upper[9:-1]) + if length <= 0: + raise ValueError(f"Truncate length must be > 0, got: {mode}") + return "TRUNCATE", length + raise ValueError(f"Unsupported metadata.stats-mode: {mode}") + @staticmethod def _get_column_stats(record_batch: pa.RecordBatch, column_name: str, mode: str = "truncate(16)") -> Dict: - upper_mode = mode.upper() + parsed_mode, truncate_length = DataWriter._parse_truncate_length(mode) - if upper_mode == "NONE": + if parsed_mode == "NONE": return { "min_values": None, "max_values": None, @@ -325,7 +337,7 @@ def _get_column_stats(record_batch: pa.RecordBatch, column_name: str, column_array = record_batch.column(column_name) - if upper_mode == "COUNTS": + if parsed_mode == "COUNTS": return { "min_values": None, "max_values": None, @@ -353,10 +365,11 @@ def _get_column_stats(record_batch: pa.RecordBatch, column_name: str, min_values = pc.min(column_array).as_py() max_values = pc.max(column_array).as_py() - if upper_mode.startswith("TRUNCATE(") and upper_mode.endswith(")"): - truncate_length = int(upper_mode[9:-1]) + if truncate_length is not None: min_values = _truncate_min(min_values, truncate_length) max_values = _truncate_max(max_values, truncate_length) + if max_values is None: + min_values = None return { "min_values": min_values, From 59eb106e282fc8d61940110706320abeada3302f Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 26 Apr 2026 20:19:24 +0800 Subject: [PATCH 05/17] [python] Skip min/max stats for tz-aware timestamp columns --- paimon-python/pypaimon/write/writer/data_writer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 9d9ba384c298..dd916881d5c7 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -353,7 +353,8 @@ def _get_column_stats(record_batch: pa.RecordBatch, column_name: str, column_type = column_array.type supports_minmax = not (pa.types.is_nested(column_type) or pa.types.is_map(column_type) - or pa.types.is_large_binary(column_type)) + or pa.types.is_large_binary(column_type) + or (pa.types.is_timestamp(column_type) and column_type.tz is not None)) if not supports_minmax: return { From d3dce82003c638ec3d5a0ca1d4d4647d3b44e45c Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 7 May 2026 15:34:40 +0800 Subject: [PATCH 06/17] [python] Fix decimal value stats serialization --- .../pypaimon/table/row/generic_row.py | 108 +++++++++++++----- .../pypaimon/tests/reader_base_test.py | 42 ++++++- .../pypaimon/write/writer/data_writer.py | 5 +- 3 files changed, 126 insertions(+), 29 deletions(-) diff --git a/paimon-python/pypaimon/table/row/generic_row.py b/paimon-python/pypaimon/table/row/generic_row.py index 4aa740de7219..9f631d22476d 100644 --- a/paimon-python/pypaimon/table/row/generic_row.py +++ b/paimon-python/pypaimon/table/row/generic_row.py @@ -29,6 +29,57 @@ from pypaimon.table.row.blob import BlobData +MAX_COMPACT_DECIMAL_PRECISION = 18 + + +def _decimal_precision_scale(data_type: DataType): + type_str = str(data_type) + precision = MAX_COMPACT_DECIMAL_PRECISION + scale = 0 + if '(' in type_str and ')' in type_str: + try: + precision_scale = type_str.split('(', 1)[1].split(')', 1)[0] + parts = [p.strip() for p in precision_scale.split(',')] + if parts and parts[0]: + precision = int(parts[0]) + if len(parts) > 1 and parts[1]: + scale = int(parts[1]) + except: + pass + return precision, scale + + +def _decimal_unscaled_value(value: Decimal, scale: int) -> int: + sign, digits, exponent = value.as_tuple() + unscaled_value = 0 + for digit in digits: + unscaled_value = unscaled_value * 10 + digit + + scale_delta = exponent + scale + if scale_delta >= 0: + unscaled_value *= 10 ** scale_delta + else: + unscaled_value //= 10 ** (-scale_delta) + + return -unscaled_value if sign else unscaled_value + + +def _decimal_from_unscaled_value(unscaled_value: int, scale: int) -> Decimal: + if unscaled_value == 0: + return Decimal((0, (0,), -scale)) + sign = 1 if unscaled_value < 0 else 0 + digits = tuple(int(d) for d in str(abs(unscaled_value))) + return Decimal((sign, digits, -scale)) + + +def _int_to_signed_bytes(value: int) -> bytes: + if value == 0: + return b'\x00' + bits = value.bit_length() + 1 if value > 0 else (~value).bit_length() + 1 + length = max(1, (bits + 7) // 8) + return value.to_bytes(length, byteorder='big', signed=True) + + @dataclass class GenericRow(InternalRow): @@ -234,20 +285,21 @@ def _parse_blob(cls, bytes_data: bytes, base_offset: int, field_offset: int) -> @classmethod def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset: int, data_type: DataType) -> Decimal: - unscaled_long = struct.unpack('> 32) & 0xFFFFFFFF + length = offset_and_len & 0xFFFFFFFF + actual_decimal_offset = base_offset + sub_offset + if actual_decimal_offset + length > len(bytes_data): + raise ValueError( + f"Decimal data out of bounds: actual_offset={actual_decimal_offset}, " + f"length={length}, total_length={len(bytes_data)}") + decimal_data = bytes_data[actual_decimal_offset:actual_decimal_offset + length] + unscaled_value = int.from_bytes(decimal_data, byteorder='big', signed=True) + return _decimal_from_unscaled_value(unscaled_value, scale) @classmethod def _parse_timestamp(cls, bytes_data: bytes, base_offset: int, field_offset: int, data_type: DataType) -> datetime: @@ -327,6 +379,20 @@ def to_bytes(cls, row: Union[GenericRow, BinaryRow]) -> bytes: absolute_offset = fixed_part_size + offset_in_variable_part offset_and_len = (absolute_offset << 32) | length struct.pack_into(' 16: + raise ValueError(f"Decimal value exceeds 16 bytes: {value}") + variable_part_data.append(unscaled_bytes + b'\x00' * (16 - len(unscaled_bytes))) + absolute_offset = fixed_part_size + current_variable_offset + current_variable_offset += 16 + offset_and_len = (absolute_offset << 32) | len(unscaled_bytes) + struct.pack_into(' bytes: @classmethod def _serialize_decimal(cls, value: Decimal, data_type: DataType) -> bytes: - type_str = str(data_type) - if '(' in type_str and ')' in type_str: - try: - precision_scale = type_str.split('(')[1].split(')')[0] - if ',' in precision_scale: - scale = int(precision_scale.split(',')[1]) - else: - scale = 0 - except: - scale = 0 - else: - scale = 0 - - unscaled_value = int(value * (10 ** scale)) + _, scale = _decimal_precision_scale(data_type) + unscaled_value = _decimal_unscaled_value(value, scale) return struct.pack(' 0, got: {mode}") return "TRUNCATE", length From 1e23def746371ad4498134ff57413f42be0bc4bf Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 7 May 2026 21:08:40 +0800 Subject: [PATCH 07/17] [python] Truncate binary value stats --- .../pypaimon/tests/reader_base_test.py | 79 ++++++++++++++++++- .../pypaimon/write/writer/data_writer.py | 11 ++- 2 files changed, 87 insertions(+), 3 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index d6c4633a489b..398c9de08583 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -555,8 +555,9 @@ def test_truncate_stats(self): self.assertEqual(_truncate_max(None, 5), None) self.assertEqual(_truncate_max(42, 5), 42) - self.assertEqual(_truncate_min(b'\x01\x02\x03\x04\x05\x06', 3), b'\x01\x02\x03\x04\x05\x06') - self.assertEqual(_truncate_max(b'\x01\x02\x03\x04\x05\x06', 3), b'\x01\x02\x03\x04\x05\x06') + self.assertEqual(_truncate_min(b'\x01\x02\x03\x04\x05\x06', 3), b'\x01\x02\x03') + self.assertEqual(_truncate_max(b'\x01\x02\x03\x04\x05\x06', 3), b'\x01\x02\x04') + self.assertIsNone(_truncate_max(b'\xff\xff\xff\x00', 3)) self.assertEqual(DataWriter._parse_truncate_length('truncate(10)'), ('TRUNCATE', 10)) for invalid_mode in ['truncate(+1)', 'truncate( 1)', 'truncate(10.1)', 'truncate()', 'truncate(0)']: @@ -594,6 +595,37 @@ def test_default_truncate_stats_e2e(self): self.assertEqual(min_row.values[1], 'a' * 16) self.assertEqual(max_row.values[1], 'a' * 15 + 'b') + def test_default_truncate_binary_stats_e2e(self): + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db_truncate_binary_e2e", True) + + pa_schema = pa.schema([('id', pa.int64()), ('payload', pa.binary())]) + schema = Schema.from_pyarrow_schema(pa_schema) + catalog.create_table("test_db_truncate_binary_e2e.t", schema, False) + table = catalog.get_table("test_db_truncate_binary_e2e.t") + + long_bytes = b'a' * 30 + data = pa.Table.from_pydict({'id': [1], 'payload': [long_bytes]}, schema=pa_schema) + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(data) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + snap = SnapshotManager(table).get_latest_snapshot() + rb = table.new_read_builder() + scan = rb.new_scan() + mf = scan.file_scanner.manifest_list_manager.read_all(snap) + entries = scan.file_scanner.manifest_file_manager.read( + mf[0].file_name, lambda r: True, drop_stats=False) + stats = entries[0].file.value_stats + min_row = GenericRowDeserializer.from_bytes(stats.min_values.data, table.fields) + max_row = GenericRowDeserializer.from_bytes(stats.max_values.data, table.fields) + self.assertEqual(min_row.values[1], b'a' * 16) + self.assertEqual(max_row.values[1], b'a' * 15 + b'b') + def test_default_stats_with_high_precision_decimal(self): catalog = CatalogFactory.create({"warehouse": self.warehouse}) catalog.create_database("test_db_decimal_stats", True) @@ -712,6 +744,49 @@ def test_value_stats_empty_when_stats_disabled(self): "value_stats.null_counts should be empty (same as SimpleStats.empty_stats()) when stats are disabled" ) + def test_value_stats_counts_mode_e2e(self): + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db_stats_counts", True) + + pa_schema = pa.schema([ + ('id', pa.int64()), + ('name', pa.string()), + ('price', pa.float64()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={'metadata.stats-mode': 'counts'} + ) + catalog.create_table("test_db_stats_counts.t", schema, False) + table = catalog.get_table("test_db_stats_counts.t") + + data = pa.Table.from_pydict({ + 'id': [1, 2, 3], + 'name': ['Alice', None, 'Charlie'], + 'price': [None, 20.3, None], + }, schema=pa_schema) + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(data) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + snap = SnapshotManager(table).get_latest_snapshot() + rb = table.new_read_builder() + scan = rb.new_scan() + mf = scan.file_scanner.manifest_list_manager.read_all(snap) + entries = scan.file_scanner.manifest_file_manager.read( + mf[0].file_name, lambda r: True, drop_stats=False) + stats = entries[0].file.value_stats + min_row = GenericRowDeserializer.from_bytes(stats.min_values.data, table.fields) + max_row = GenericRowDeserializer.from_bytes(stats.max_values.data, table.fields) + + self.assertEqual(min_row.values, [None, None, None]) + self.assertEqual(max_row.values, [None, None, None]) + self.assertEqual(stats.null_counts, [0, 1, 2]) + def test_types(self): data_fields = [ DataField(0, "f0", AtomicType('TINYINT'), 'desc'), diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index c74632447d7d..0e17d8e3008d 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -34,7 +34,7 @@ def _truncate_min(value, length): if value is None: return None - if isinstance(value, str) and len(value) > length: + if isinstance(value, (bytes, str)) and len(value) > length: return value[:length] return value @@ -42,6 +42,15 @@ def _truncate_min(value, length): def _truncate_max(value, length): if value is None: return None + if isinstance(value, bytes): + if len(value) <= length: + return value + truncated = bytearray(value[:length]) + for i in range(len(truncated) - 1, -1, -1): + if truncated[i] < 0xFF: + truncated[i] += 1 + return bytes(truncated[:i + 1]) + return None if isinstance(value, str): if len(value) <= length: return value From c6ab37f8738c24d7c6065ac0fb39253efa4acc01 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 7 May 2026 21:23:09 +0800 Subject: [PATCH 08/17] [python] Clean up value stats collection --- paimon-python/pypaimon/tests/reader_base_test.py | 9 +++++++++ .../pypaimon/write/writer/data_writer.py | 15 ++++++--------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 398c9de08583..b55232cf5135 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -564,6 +564,15 @@ def test_truncate_stats(self): with self.assertRaises(ValueError): DataWriter._parse_truncate_length(invalid_mode) + def test_metadata_stats_mode_defaults(self): + from pypaimon.common.options.core_options import CoreOptions + from pypaimon.common.options import Options + + core_options = CoreOptions(Options({})) + + self.assertEqual(core_options.metadata_stats_mode(), 'truncate(16)') + self.assertTrue(core_options.metadata_stats_enabled()) + def test_default_truncate_stats_e2e(self): catalog = CatalogFactory.create({"warehouse": self.warehouse}) catalog.create_database("test_db_truncate_e2e", True) diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 0e17d8e3008d..e9de660df01b 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -297,18 +297,15 @@ def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> int: return best_split - def _collect_value_stats(self, data: pa.Table, fields: List, - column_stats: Optional[Dict[str, Dict]] = None, - mode: str = None) -> SimpleStats: + def _collect_value_stats(self, data: pa.Table, fields: List, mode: str = None) -> SimpleStats: if not fields: return SimpleStats.empty_stats() - if column_stats is None or not column_stats: - m = mode or self.stats_mode - column_stats = { - field.name: self._get_column_stats(data, field.name, m) - for field in fields - } + m = mode or self.stats_mode + column_stats = { + field.name: self._get_column_stats(data, field.name, m) + for field in fields + } min_stats = [column_stats[field.name]['min_values'] for field in fields] max_stats = [column_stats[field.name]['max_values'] for field in fields] From 800e785c95b388228f196356601e5796f39ef3e8 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 7 May 2026 21:44:50 +0800 Subject: [PATCH 09/17] [python] Avoid surrogate max stats --- .../pypaimon/tests/reader_base_test.py | 33 +++++++++++++++++++ .../pypaimon/write/writer/data_writer.py | 2 +- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index b55232cf5135..81b151c0f7fb 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -552,6 +552,8 @@ def test_truncate_stats(self): self.assertEqual(_truncate_max('abc', 5), 'abc') self.assertEqual(_truncate_max('abcdefghij', 5), 'abcdf') + self.assertIsNone(_truncate_max('\ud7ffx', 1)) + self.assertEqual(_truncate_max('a\ud7ffx', 2), 'b') self.assertEqual(_truncate_max(None, 5), None) self.assertEqual(_truncate_max(42, 5), 42) @@ -604,6 +606,37 @@ def test_default_truncate_stats_e2e(self): self.assertEqual(min_row.values[1], 'a' * 16) self.assertEqual(max_row.values[1], 'a' * 15 + 'b') + def test_default_truncate_skips_invalid_surrogate_max_e2e(self): + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db_truncate_surrogate_e2e", True) + + pa_schema = pa.schema([('id', pa.int64()), ('name', pa.string())]) + schema = Schema.from_pyarrow_schema(pa_schema) + catalog.create_table("test_db_truncate_surrogate_e2e.t", schema, False) + table = catalog.get_table("test_db_truncate_surrogate_e2e.t") + + high_boundary_str = 'a' * 15 + '\ud7ff' + 'tail' + data = pa.Table.from_pydict({'id': [1], 'name': [high_boundary_str]}, schema=pa_schema) + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(data) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + snap = SnapshotManager(table).get_latest_snapshot() + rb = table.new_read_builder() + scan = rb.new_scan() + mf = scan.file_scanner.manifest_list_manager.read_all(snap) + entries = scan.file_scanner.manifest_file_manager.read( + mf[0].file_name, lambda r: True, drop_stats=False) + stats = entries[0].file.value_stats + min_row = GenericRowDeserializer.from_bytes(stats.min_values.data, table.fields) + max_row = GenericRowDeserializer.from_bytes(stats.max_values.data, table.fields) + self.assertEqual(min_row.values[1], high_boundary_str[:16]) + self.assertEqual(max_row.values[1], 'a' * 14 + 'b') + def test_default_truncate_binary_stats_e2e(self): catalog = CatalogFactory.create({"warehouse": self.warehouse}) catalog.create_database("test_db_truncate_binary_e2e", True) diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index e9de660df01b..bc20c1717ca8 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -57,7 +57,7 @@ def _truncate_max(value, length): truncated = value[:length] for i in range(len(truncated) - 1, -1, -1): next_cp = ord(truncated[i]) + 1 - if next_cp <= 0x10FFFF: + if next_cp <= 0x10FFFF and not 0xD800 <= next_cp <= 0xDFFF: return truncated[:i] + chr(next_cp) return None return value From 47f93fa4fa142e19b3520267cc31c88ce62b36ef Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 7 May 2026 21:47:32 +0800 Subject: [PATCH 10/17] [python] Make stats test deterministic --- paimon-python/pypaimon/tests/reader_base_test.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 81b151c0f7fb..2027b3044b84 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -21,7 +21,6 @@ import shutil import tempfile import unittest -import random from datetime import date, datetime, time from decimal import Decimal from unittest.mock import Mock @@ -180,8 +179,7 @@ def test_full_data_types(self): ('f12', pa.date32()), ('f13', pa.time32('ms')), ]) - stats_enabled = random.random() < 0.5 - options = {'metadata.stats-mode': 'full'} if stats_enabled else {} + options = {'metadata.stats-mode': 'full'} schema = Schema.from_pyarrow_schema(simple_pa_schema, options=options) self.catalog.create_table('default.test_full_data_types', schema, False) table = self.catalog.get_table('default.test_full_data_types') @@ -230,7 +228,6 @@ def test_full_data_types(self): manifest_entries = table_scan.file_scanner.manifest_file_manager.read( manifest_files[0].file_name, lambda row: table_scan.file_scanner._filter_manifest_entry(row), False) - # Both 'full' and default 'truncate(16)' modes produce value stats self.assertEqual(manifest_entries[0].file.value_stats_cols, None) min_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data, table.fields).values From eb067ab0d04ad9a45117a08ea4574f769bf1c4dd Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 7 May 2026 22:21:14 +0800 Subject: [PATCH 11/17] [python] Skip high precision timestamp stats --- .../pypaimon/tests/reader_base_test.py | 53 +++++++++++++++++++ .../pypaimon/write/writer/data_writer.py | 3 +- 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 2027b3044b84..2f96ad5e5aab 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -572,6 +572,26 @@ def test_metadata_stats_mode_defaults(self): self.assertEqual(core_options.metadata_stats_mode(), 'truncate(16)') self.assertTrue(core_options.metadata_stats_enabled()) + def test_high_precision_timestamp_stats_skip_minmax(self): + from pypaimon.write.writer.data_writer import DataWriter + + data = pa.Table.from_pydict( + { + 'ts_us': [datetime(2024, 1, 1, 0, 0, 0, 999)], + 'ts_ns': [datetime(2024, 1, 1, 0, 0, 0, 999)], + }, + schema=pa.schema([ + ('ts_us', pa.timestamp('us')), + ('ts_ns', pa.timestamp('ns')), + ]) + ) + + for column in ['ts_us', 'ts_ns']: + stats = DataWriter._get_column_stats(data, column, 'full') + self.assertIsNone(stats['min_values']) + self.assertIsNone(stats['max_values']) + self.assertEqual(stats['null_counts'], 0) + def test_default_truncate_stats_e2e(self): catalog = CatalogFactory.create({"warehouse": self.warehouse}) catalog.create_database("test_db_truncate_e2e", True) @@ -700,6 +720,39 @@ def test_default_stats_with_high_precision_decimal(self): self.assertEqual(max_row.values[1], max_amount) self.assertEqual(rb.new_read().to_arrow(scan.plan().splits()), data) + def test_default_stats_with_high_precision_timestamp_e2e(self): + catalog = CatalogFactory.create({"warehouse": self.warehouse}) + catalog.create_database("test_db_timestamp_stats", True) + + pa_schema = pa.schema([('id', pa.int64()), ('ts', pa.timestamp('us'))]) + schema = Schema.from_pyarrow_schema(pa_schema) + catalog.create_table("test_db_timestamp_stats.t", schema, False) + table = catalog.get_table("test_db_timestamp_stats.t") + + value = datetime(2024, 1, 1, 0, 0, 0, 999) + data = pa.Table.from_pydict({'id': [1], 'ts': [value]}, schema=pa_schema) + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(data) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + snap = SnapshotManager(table).get_latest_snapshot() + rb = table.new_read_builder() + scan = rb.new_scan() + mf = scan.file_scanner.manifest_list_manager.read_all(snap) + entries = scan.file_scanner.manifest_file_manager.read( + mf[0].file_name, lambda r: True, drop_stats=False) + stats = entries[0].file.value_stats + min_row = GenericRowDeserializer.from_bytes(stats.min_values.data, table.fields) + max_row = GenericRowDeserializer.from_bytes(stats.max_values.data, table.fields) + self.assertIsNone(min_row.values[1]) + self.assertIsNone(max_row.values[1]) + self.assertEqual(stats.null_counts, [0, 0]) + self.assertEqual(rb.new_read().to_arrow(scan.plan().splits()), data) + def test_value_stats_empty_when_stats_disabled(self): catalog = CatalogFactory.create({ "warehouse": self.warehouse diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index bc20c1717ca8..9a1445e29c01 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -363,7 +363,8 @@ def _get_column_stats(record_batch: pa.RecordBatch, column_name: str, column_type = column_array.type supports_minmax = not (pa.types.is_nested(column_type) or pa.types.is_map(column_type) or pa.types.is_large_binary(column_type) - or (pa.types.is_timestamp(column_type) and column_type.tz is not None)) + or (pa.types.is_timestamp(column_type) + and (column_type.tz is not None or column_type.unit not in ('s', 'ms')))) if not supports_minmax: return { From 193ff28a0326b65794ccf903196933d2702ed681 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 7 May 2026 22:32:28 +0800 Subject: [PATCH 12/17] [python] Validate metadata stats mode early --- .../pypaimon/common/options/core_options.py | 26 +++++++++++- .../pypaimon/table/row/generic_row.py | 3 ++ .../pypaimon/tests/predicates_test.py | 9 ++++ .../pypaimon/tests/reader_base_test.py | 42 +++++++++++++++++++ .../pypaimon/write/writer/data_writer.py | 14 +------ 5 files changed, 80 insertions(+), 14 deletions(-) diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index b457f5170c45..a9a5c6ab42ed 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -477,10 +477,32 @@ def file_block_size(self, default=None): return self.options.get(CoreOptions.FILE_BLOCK_SIZE, default) def metadata_stats_enabled(self, default=None): - return self.options.get(CoreOptions.METADATA_STATS_MODE, default).upper() != "NONE" + mode, _ = CoreOptions.parse_metadata_stats_mode( + self.options.get(CoreOptions.METADATA_STATS_MODE, default)) + return mode != "NONE" def metadata_stats_mode(self, default=None): - return self.options.get(CoreOptions.METADATA_STATS_MODE, default) + mode = self.options.get(CoreOptions.METADATA_STATS_MODE, default) + CoreOptions.parse_metadata_stats_mode(mode) + return mode.strip() + + @staticmethod + def parse_metadata_stats_mode(mode: str): + if mode is None: + mode = CoreOptions.METADATA_STATS_MODE.default_value() + normalized = mode.strip() + upper = normalized.upper() + if upper in ("NONE", "COUNTS", "FULL"): + return upper, None + if upper.startswith("TRUNCATE(") and upper.endswith(")"): + length_text = upper[9:-1] + if not length_text or not all('0' <= c <= '9' for c in length_text): + raise ValueError(f"Unsupported metadata.stats-mode: {mode}") + length = int(length_text) + if length <= 0: + raise ValueError(f"Truncate length must be > 0, got: {mode}") + return "TRUNCATE", length + raise ValueError(f"Unsupported metadata.stats-mode: {mode}") def blob_as_descriptor(self, default=None): return self.options.get(CoreOptions.BLOB_AS_DESCRIPTOR, default) diff --git a/paimon-python/pypaimon/table/row/generic_row.py b/paimon-python/pypaimon/table/row/generic_row.py index 9f631d22476d..704ae0da8caf 100644 --- a/paimon-python/pypaimon/table/row/generic_row.py +++ b/paimon-python/pypaimon/table/row/generic_row.py @@ -471,6 +471,9 @@ def _serialize_double(cls, value: float) -> bytes: @classmethod def _serialize_decimal(cls, value: Decimal, data_type: DataType) -> bytes: + # This helper is only for compact decimals. Wider decimals are written + # through the variable-length path in to_bytes so their length can be + # preserved in the fixed part. _, scale = _decimal_precision_scale(data_type) unscaled_value = _decimal_unscaled_value(value, scale) return struct.pack(' 0, got: {mode}") - return "TRUNCATE", length - raise ValueError(f"Unsupported metadata.stats-mode: {mode}") + return CoreOptions.parse_metadata_stats_mode(mode) @staticmethod def _get_column_stats(record_batch: pa.RecordBatch, column_name: str, From 789261623b415d35dc9d31f59f875d370af8fc8c Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 8 May 2026 14:44:27 +0800 Subject: [PATCH 13/17] [python] Fix PK partial write value stats --- .../pypaimon/tests/reader_base_test.py | 38 +++++++++++++++++++ .../pypaimon/write/file_store_write.py | 3 +- .../pypaimon/write/writer/data_writer.py | 10 ++++- 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 6ab6c7860c61..30cd29f43fe2 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -1469,6 +1469,44 @@ def test_primary_key_value_stats(self): actual_ids = sorted(actual_data.column('id').to_pylist()) self.assertEqual(actual_ids, [1, 2, 3, 4, 5], "All IDs should be present") + def test_primary_key_partial_write_value_stats(self): + pa_schema = pa.schema([ + ('id', pa.int64()), + ('name', pa.string()), + ('price', pa.float64()), + ('category', pa.string()) + ]) + partial_schema = pa.schema([ + pa.field('id', pa.int64(), nullable=False), + ('name', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + primary_keys=['id'], + options={'metadata.stats-mode': 'full', 'bucket': '2'} + ) + self.catalog.create_table('default.test_pk_partial_value_stats', schema, False) + table = self.catalog.get_table('default.test_pk_partial_value_stats') + + partial_data = pa.Table.from_pydict({ + 'id': [1, 2], + 'name': ['Alice', 'Bob'], + }, schema=partial_schema) + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write().with_write_type(['id', 'name']) + writer.write_arrow(partial_data) + commit_messages = writer.prepare_commit() + writer.close() + + files = [file for msg in commit_messages for file in msg.new_files] + self.assertGreater(len(files), 0) + for file in files: + self.assertEqual(file.write_cols, ['id', 'name']) + self.assertEqual(file.value_stats_cols, None) + self.assertEqual(len(file.value_stats.min_values), 2) + self.assertEqual(len(file.value_stats.max_values), 2) + self.assertEqual(len(file.value_stats.null_counts), 2) + def test_split_target_size(self): """Test source.split.target-size configuration effect on split generation.""" from pypaimon.common.options.core_options import CoreOptions diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index 75b1d3a7d708..d8bbd98dac54 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -77,7 +77,8 @@ def max_seq_number(): partition=partition, bucket=bucket, max_seq_number=max_seq_number(), - options=options) + options=options, + write_cols=self.write_cols) else: seq_number = 0 if self.table.bucket_mode() == BucketMode.BUCKET_UNAWARE else max_seq_number() return AppendOnlyDataWriter( diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index f56d242145b8..986106bc06a5 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -234,8 +234,14 @@ def _write_data_to_file(self, data: pa.Table): # value stats value_stats_enabled = self.options.metadata_stats_enabled() if value_stats_enabled: - value_fields = self.table.fields if self.table.is_primary_key_table \ - else PyarrowFieldParser.to_paimon_schema(data.schema) + if self.table.is_primary_key_table: + value_fields = [ + field + for field in self.table.fields + if self.write_cols is None or field.name in self.write_cols + ] + else: + value_fields = PyarrowFieldParser.to_paimon_schema(data.schema) else: value_fields = [] value_stats = self._collect_value_stats(data, value_fields) From d9e976db5d572640fd758fcf97d53623e59ebc48 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 8 May 2026 16:39:16 +0800 Subject: [PATCH 14/17] [python] Keep high precision decimal stats out of scope --- .../pypaimon/table/row/generic_row.py | 111 +++++------------- .../pypaimon/tests/reader_base_test.py | 9 +- .../pypaimon/write/writer/data_writer.py | 6 +- 3 files changed, 37 insertions(+), 89 deletions(-) diff --git a/paimon-python/pypaimon/table/row/generic_row.py b/paimon-python/pypaimon/table/row/generic_row.py index 704ae0da8caf..4aa740de7219 100644 --- a/paimon-python/pypaimon/table/row/generic_row.py +++ b/paimon-python/pypaimon/table/row/generic_row.py @@ -29,57 +29,6 @@ from pypaimon.table.row.blob import BlobData -MAX_COMPACT_DECIMAL_PRECISION = 18 - - -def _decimal_precision_scale(data_type: DataType): - type_str = str(data_type) - precision = MAX_COMPACT_DECIMAL_PRECISION - scale = 0 - if '(' in type_str and ')' in type_str: - try: - precision_scale = type_str.split('(', 1)[1].split(')', 1)[0] - parts = [p.strip() for p in precision_scale.split(',')] - if parts and parts[0]: - precision = int(parts[0]) - if len(parts) > 1 and parts[1]: - scale = int(parts[1]) - except: - pass - return precision, scale - - -def _decimal_unscaled_value(value: Decimal, scale: int) -> int: - sign, digits, exponent = value.as_tuple() - unscaled_value = 0 - for digit in digits: - unscaled_value = unscaled_value * 10 + digit - - scale_delta = exponent + scale - if scale_delta >= 0: - unscaled_value *= 10 ** scale_delta - else: - unscaled_value //= 10 ** (-scale_delta) - - return -unscaled_value if sign else unscaled_value - - -def _decimal_from_unscaled_value(unscaled_value: int, scale: int) -> Decimal: - if unscaled_value == 0: - return Decimal((0, (0,), -scale)) - sign = 1 if unscaled_value < 0 else 0 - digits = tuple(int(d) for d in str(abs(unscaled_value))) - return Decimal((sign, digits, -scale)) - - -def _int_to_signed_bytes(value: int) -> bytes: - if value == 0: - return b'\x00' - bits = value.bit_length() + 1 if value > 0 else (~value).bit_length() + 1 - length = max(1, (bits + 7) // 8) - return value.to_bytes(length, byteorder='big', signed=True) - - @dataclass class GenericRow(InternalRow): @@ -285,21 +234,20 @@ def _parse_blob(cls, bytes_data: bytes, base_offset: int, field_offset: int) -> @classmethod def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset: int, data_type: DataType) -> Decimal: - precision, scale = _decimal_precision_scale(data_type) - if precision <= MAX_COMPACT_DECIMAL_PRECISION: - unscaled_value = struct.unpack('> 32) & 0xFFFFFFFF - length = offset_and_len & 0xFFFFFFFF - actual_decimal_offset = base_offset + sub_offset - if actual_decimal_offset + length > len(bytes_data): - raise ValueError( - f"Decimal data out of bounds: actual_offset={actual_decimal_offset}, " - f"length={length}, total_length={len(bytes_data)}") - decimal_data = bytes_data[actual_decimal_offset:actual_decimal_offset + length] - unscaled_value = int.from_bytes(decimal_data, byteorder='big', signed=True) - return _decimal_from_unscaled_value(unscaled_value, scale) + scale = 0 + return Decimal(unscaled_long) / (10 ** scale) @classmethod def _parse_timestamp(cls, bytes_data: bytes, base_offset: int, field_offset: int, data_type: DataType) -> datetime: @@ -379,20 +327,6 @@ def to_bytes(cls, row: Union[GenericRow, BinaryRow]) -> bytes: absolute_offset = fixed_part_size + offset_in_variable_part offset_and_len = (absolute_offset << 32) | length struct.pack_into(' 16: - raise ValueError(f"Decimal value exceeds 16 bytes: {value}") - variable_part_data.append(unscaled_bytes + b'\x00' * (16 - len(unscaled_bytes))) - absolute_offset = fixed_part_size + current_variable_offset - current_variable_offset += 16 - offset_and_len = (absolute_offset << 32) | len(unscaled_bytes) - struct.pack_into(' bytes: @classmethod def _serialize_decimal(cls, value: Decimal, data_type: DataType) -> bytes: - # This helper is only for compact decimals. Wider decimals are written - # through the variable-length path in to_bytes so their length can be - # preserved in the fixed part. - _, scale = _decimal_precision_scale(data_type) - unscaled_value = _decimal_unscaled_value(value, scale) + type_str = str(data_type) + if '(' in type_str and ')' in type_str: + try: + precision_scale = type_str.split('(')[1].split(')')[0] + if ',' in precision_scale: + scale = int(precision_scale.split(',')[1]) + else: + scale = 0 + except: + scale = 0 + else: + scale = 0 + + unscaled_value = int(value * (10 ** scale)) return struct.pack(' 18) or (pa.types.is_timestamp(column_type) and (column_type.tz is not None or column_type.unit not in ('s', 'ms')))) From 80ea552167589ab0288c78cae10fdbbbcc678e29 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 8 May 2026 17:15:07 +0800 Subject: [PATCH 15/17] [python] Cover partial write stats order --- .../pypaimon/tests/reader_base_test.py | 33 ++++++++++++++++--- .../pypaimon/write/row_key_extractor.py | 18 +++++++--- 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index e6595e3d0df8..917574557626 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -1478,8 +1478,8 @@ def test_primary_key_partial_write_value_stats(self): ('category', pa.string()) ]) partial_schema = pa.schema([ - pa.field('id', pa.int64(), nullable=False), ('name', pa.string()), + pa.field('id', pa.int64(), nullable=False), ]) schema = Schema.from_pyarrow_schema( pa_schema, @@ -1490,24 +1490,49 @@ def test_primary_key_partial_write_value_stats(self): table = self.catalog.get_table('default.test_pk_partial_value_stats') partial_data = pa.Table.from_pydict({ - 'id': [1, 2], 'name': ['Alice', 'Bob'], + 'id': [1, 2], }, schema=partial_schema) write_builder = table.new_batch_write_builder() - writer = write_builder.new_write().with_write_type(['id', 'name']) + writer = write_builder.new_write().with_write_type(['name', 'id']) writer.write_arrow(partial_data) commit_messages = writer.prepare_commit() + commit = write_builder.new_commit() + commit.commit(commit_messages) writer.close() + commit.close() files = [file for msg in commit_messages for file in msg.new_files] self.assertGreater(len(files), 0) for file in files: - self.assertEqual(file.write_cols, ['id', 'name']) + self.assertEqual(file.write_cols, ['name', 'id']) self.assertEqual(file.value_stats_cols, ['id', 'name']) self.assertEqual(len(file.value_stats.min_values), 2) self.assertEqual(len(file.value_stats.max_values), 2) self.assertEqual(len(file.value_stats.null_counts), 2) + read_builder = table.new_read_builder() + scan = read_builder.new_scan() + snap = SnapshotManager(table).get_latest_snapshot() + mf = scan.file_scanner.manifest_list_manager.read_all(snap) + entries = scan.file_scanner.manifest_file_manager.read( + mf[0].file_name, + lambda row: scan.file_scanner._filter_manifest_entry(row), + False + ) + stats_file = entries[0].file + self.assertEqual(stats_file.write_cols, ['name', 'id']) + self.assertEqual(stats_file.value_stats_cols, ['id', 'name']) + stats_fields = [table.field_dict[col] for col in stats_file.value_stats_cols] + min_row = GenericRowDeserializer.from_bytes(stats_file.value_stats.min_values.data, stats_fields) + max_row = GenericRowDeserializer.from_bytes(stats_file.value_stats.max_values.data, stats_fields) + self.assertEqual(min_row.values, [1, 'Alice']) + self.assertEqual(max_row.values, [2, 'Bob']) + + actual = read_builder.new_read().to_arrow(scan.plan().splits()) + self.assertEqual(actual.column('id').to_pylist(), [1, 2]) + self.assertEqual(actual.column('name').to_pylist(), ['Alice', 'Bob']) + def test_split_target_size(self): """Test source.split.target-size configuration effect on split generation.""" from pypaimon.common.options.core_options import CoreOptions diff --git a/paimon-python/pypaimon/write/row_key_extractor.py b/paimon-python/pypaimon/write/row_key_extractor.py index 2f09e6577b20..23c0acfac4ee 100644 --- a/paimon-python/pypaimon/write/row_key_extractor.py +++ b/paimon-python/pypaimon/write/row_key_extractor.py @@ -97,11 +97,19 @@ def _get_field_indices(self, field_names: List[str]) -> List[int]: field_map = {field.name: i for i, field in enumerate(self.table_schema.fields)} return [field_map[name] for name in field_names if name in field_map] + @staticmethod + def _get_data_field_indices(data: pa.RecordBatch, field_names: List[str]) -> List[int]: + if not field_names: + return [] + field_map = {field.name: i for i, field in enumerate(data.schema)} + return [field_map[name] for name in field_names if name in field_map] + def _extract_partitions_batch(self, data: pa.RecordBatch) -> List[Tuple]: - if not self.partition_indices: + partition_indices = self._get_data_field_indices(data, self.table_schema.partition_keys) + if not partition_indices: return [() for _ in range(data.num_rows)] - partition_columns = [data.column(i) for i in self.partition_indices] + partition_columns = [data.column(i) for i in partition_indices] partitions = [] for row_idx in range(data.num_rows): @@ -139,7 +147,8 @@ def __init__(self, table_schema: TableSchema): ] def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]: - columns = [data.column(i) for i in self.bucket_key_indices] + bucket_key_indices = self._get_data_field_indices(data, self.bucket_keys) + columns = [data.column(i) for i in bucket_key_indices] return [ _bucket_from_hash( self._binary_row_hash_code(tuple(col[row_idx].as_py() for col in columns)), @@ -302,7 +311,8 @@ def __init__(self, table_schema: 'TableSchema'): def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]: partitions = self._extract_partitions_batch(data) - columns = [data.column(i) for i in self.bucket_key_indices] + bucket_key_indices = self._get_data_field_indices(data, self.bucket_keys) + columns = [data.column(i) for i in bucket_key_indices] buckets = [] for row_idx in range(data.num_rows): key_hash = _hash_bytes_by_words( From 563b4a231b34dd75e13721493d34bcc0c1076089 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 8 May 2026 17:50:23 +0800 Subject: [PATCH 16/17] [python] Use table snapshot manager in stats tests --- .../pypaimon/tests/reader_base_test.py | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 917574557626..0036d847c21d 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -37,7 +37,6 @@ from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField, MapType, PyarrowFieldParser) from pypaimon.schema.table_schema import TableSchema -from pypaimon.snapshot.snapshot_manager import SnapshotManager from pypaimon.table.row.generic_row import GenericRow, GenericRowDeserializer from pypaimon.write.file_store_commit import FileStoreCommit @@ -223,7 +222,7 @@ def test_full_data_types(self): self.assertEqual(actual_data, expect_data) # to test GenericRow ability - latest_snapshot = SnapshotManager(table).get_latest_snapshot() + latest_snapshot = table.snapshot_manager().get_latest_snapshot() manifest_files = table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot) manifest_entries = table_scan.file_scanner.manifest_file_manager.read( manifest_files[0].file_name, lambda row: table_scan.file_scanner._filter_manifest_entry(row), False) @@ -504,7 +503,7 @@ def test_primary_key_value_stats_excludes_system_fields(self): pk_read_builder = pk_table.new_read_builder() pk_table_scan = pk_read_builder.new_scan() - latest_snapshot = SnapshotManager(pk_table).get_latest_snapshot() + latest_snapshot = pk_table.snapshot_manager().get_latest_snapshot() pk_manifest_files = pk_table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot) pk_manifest_entries = pk_table_scan.file_scanner.manifest_file_manager.read( pk_manifest_files[0].file_name, @@ -653,7 +652,7 @@ def test_default_truncate_stats_e2e(self): tw.close() tc.close() - snap = SnapshotManager(table).get_latest_snapshot() + snap = table.snapshot_manager().get_latest_snapshot() rb = table.new_read_builder() scan = rb.new_scan() mf = scan.file_scanner.manifest_list_manager.read_all(snap) @@ -684,7 +683,7 @@ def test_default_truncate_skips_invalid_surrogate_max_e2e(self): tw.close() tc.close() - snap = SnapshotManager(table).get_latest_snapshot() + snap = table.snapshot_manager().get_latest_snapshot() rb = table.new_read_builder() scan = rb.new_scan() mf = scan.file_scanner.manifest_list_manager.read_all(snap) @@ -715,7 +714,7 @@ def test_default_truncate_binary_stats_e2e(self): tw.close() tc.close() - snap = SnapshotManager(table).get_latest_snapshot() + snap = table.snapshot_manager().get_latest_snapshot() rb = table.new_read_builder() scan = rb.new_scan() mf = scan.file_scanner.manifest_list_manager.read_all(snap) @@ -749,7 +748,7 @@ def test_default_stats_skips_high_precision_decimal_minmax(self): tw.close() tc.close() - snap = SnapshotManager(table).get_latest_snapshot() + snap = table.snapshot_manager().get_latest_snapshot() rb = table.new_read_builder() scan = rb.new_scan() mf = scan.file_scanner.manifest_list_manager.read_all(snap) @@ -782,7 +781,7 @@ def test_default_stats_with_high_precision_timestamp_e2e(self): tw.close() tc.close() - snap = SnapshotManager(table).get_latest_snapshot() + snap = table.snapshot_manager().get_latest_snapshot() rb = table.new_read_builder() scan = rb.new_scan() mf = scan.file_scanner.manifest_list_manager.read_all(snap) @@ -836,7 +835,7 @@ def test_value_stats_empty_when_stats_disabled(self): read_builder = table.new_read_builder() table_scan = read_builder.new_scan() - latest_snapshot = SnapshotManager(table).get_latest_snapshot() + latest_snapshot = table.snapshot_manager().get_latest_snapshot() manifest_files = table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot) manifest_entries = table_scan.file_scanner.manifest_file_manager.read( manifest_files[0].file_name, @@ -908,7 +907,7 @@ def test_value_stats_counts_mode_e2e(self): tw.close() tc.close() - snap = SnapshotManager(table).get_latest_snapshot() + snap = table.snapshot_manager().get_latest_snapshot() rb = table.new_read_builder() scan = rb.new_scan() mf = scan.file_scanner.manifest_list_manager.read_all(snap) @@ -1392,7 +1391,7 @@ def test_primary_key_value_stats(self): # Read manifest to verify value_stats_cols is None (all fields included) read_builder = table.new_read_builder() table_scan = read_builder.new_scan() - latest_snapshot = SnapshotManager(table).get_latest_snapshot() + latest_snapshot = table.snapshot_manager().get_latest_snapshot() manifest_files = table_scan.file_scanner.manifest_list_manager.read_all(latest_snapshot) manifest_entries = table_scan.file_scanner.manifest_file_manager.read( manifest_files[0].file_name, @@ -1513,7 +1512,7 @@ def test_primary_key_partial_write_value_stats(self): read_builder = table.new_read_builder() scan = read_builder.new_scan() - snap = SnapshotManager(table).get_latest_snapshot() + snap = table.snapshot_manager().get_latest_snapshot() mf = scan.file_scanner.manifest_list_manager.read_all(snap) entries = scan.file_scanner.manifest_file_manager.read( mf[0].file_name, From 4ce604bfe46895ab9f0f44769eba2e83934ab184 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 8 May 2026 18:23:12 +0800 Subject: [PATCH 17/17] [python] Cover binary stats in mixed tests --- .../java/org/apache/paimon/JavaPyE2ETest.java | 17 +++++++++-------- .../tests/e2e/java_py_read_write_test.py | 2 ++ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java index c09bf3466384..f4fad707f296 100644 --- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java +++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java @@ -462,17 +462,18 @@ public void testReadPkTable() throws Exception { assertThat(table.rowType().getFieldTypes().get(5)) .isEqualTo(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()); assertThat(table.rowType().getFieldTypes().get(6)).isEqualTo(DataTypes.TIME()); - assertThat(table.rowType().getFieldTypes().get(7)).isInstanceOf(RowType.class); - RowType metadataType = (RowType) table.rowType().getFieldTypes().get(7); + assertThat(table.rowType().getFieldTypes().get(7)).isEqualTo(DataTypes.BYTES()); + assertThat(table.rowType().getFieldTypes().get(8)).isInstanceOf(RowType.class); + RowType metadataType = (RowType) table.rowType().getFieldTypes().get(8); assertThat(metadataType.getFieldTypes().get(2)).isInstanceOf(RowType.class); assertThat(res) .containsExactlyInAnyOrder( - "+I[1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20, 1000, (store1, 1001, (Beijing, China))]", - "+I[2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001, 2000, (store1, 1002, (Shanghai, China))]", - "+I[3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002, 3000, (store2, 1003, (Tokyo, Japan))]", - "+I[4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, 4000, (store2, 1004, (Seoul, Korea))]", - "+I[5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, 5000, (store3, 1005, (NewYork, USA))]", - "+I[6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005, 6000, (store3, 1006, (London, UK))]"); + "+I[1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20, 1000, [97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97, 97], (store1, 1001, (Beijing, China))]", + "+I[2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001, 2000, [98, 98, 98, 98, 98], (store1, 1002, (Shanghai, China))]", + "+I[3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002, 3000, [-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1], (store2, 1003, (Tokyo, Japan))]", + "+I[4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, 4000, [98, 105, 110, 97, 114, 121, 95, 118, 97, 108, 117, 101, 95, 52], (store2, 1004, (Seoul, Korea))]", + "+I[5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, 5000, [98, 105, 110, 97, 114, 121, 95, 118, 97, 108, 117, 101, 95, 53], (store3, 1005, (NewYork, USA))]", + "+I[6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005, 6000, [98, 105, 110, 97, 114, 121, 95, 118, 97, 108, 117, 101, 95, 54], (store3, 1006, (London, UK))]"); PredicateBuilder predicateBuilder = new PredicateBuilder(table.rowType()); int[] ids = {1, 2, 3, 4, 5, 6}; diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py index cf34ee27c021..032ebd6a00a6 100644 --- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py +++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py @@ -134,6 +134,7 @@ def test_py_write_read_pk_table(self, file_format): ('ts', pa.timestamp('us')), ('ts_ltz', pa.timestamp('us', tz='UTC')), ('t', pa.time32('ms')), + ('blob', pa.binary()), ('metadata', pa.struct([ pa.field('source', pa.string()), pa.field('created_at', pa.int64()), @@ -186,6 +187,7 @@ def test_py_write_read_pk_table(self, file_format): 'ts_ltz': pd.to_datetime([2000000, 2000001, 2000002, 2000003, 2000004, 2000005], unit='ms', utc=True), 't': [datetime.time(0, 0, 1), datetime.time(0, 0, 2), datetime.time(0, 0, 3), datetime.time(0, 0, 4), datetime.time(0, 0, 5), datetime.time(0, 0, 6)], + 'blob': [b'a' * 30, b'b' * 5, b'\xff' * 16, b'binary_value_4', b'binary_value_5', b'binary_value_6'], 'metadata': [ {'source': 'store1', 'created_at': 1001, 'location': {'city': 'Beijing', 'country': 'China'}}, {'source': 'store1', 'created_at': 1002, 'location': {'city': 'Shanghai', 'country': 'China'}},