Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions pyiceberg/table/puffin.py
Original file line numberDiff line numberDiff line change
Expand Up@@ -14,7 +14,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
importio
importmath
importzlib
fromcollections.abcimportIterable
fromtypingimportTYPE_CHECKING, Literal

frompydanticimportField
Expand All@@ -27,6 +30,7 @@

# Short for: Puffin Fratercula arctica, version 1
MAGIC_BYTES=b"PFA1"
DELETION_VECTOR_MAGIC=b"\xd1\xd3\x39\x64"
EMPTY_BITMAP=FrozenBitMap()
MAX_JAVA_SIGNED=int(math.pow(2, 31)) -1
PROPERTY_REFERENCED_DATA_FILE="referenced-data-file"
Expand DownExpand Up@@ -62,6 +66,35 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
returnbitmaps


def_serialize_bitmaps(bitmaps: dict[int, BitMap]) ->bytes:
"""
Serialize a dictionary of bitmaps into a byte array.
The format is:
- 8 bytes: number of bitmaps (little-endian)
- For each bitmap:
- 4 bytes: key (little-endian)
- n bytes: serialized bitmap
"""
withio.BytesIO() asout:
sorted_keys=sorted(bitmaps.keys())

# number of bitmaps
out.write(len(sorted_keys).to_bytes(8, "little"))

forkeyinsorted_keys:
ifkey<0:
raiseValueError(f"Invalid unsigned key: {key}")
ifkey>MAX_JAVA_SIGNED:
raiseValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl")

# key
out.write(key.to_bytes(4, "little"))
# bitmap
out.write(bitmaps[key].serialize())
returnout.getvalue()


classPuffinBlobMetadata(IcebergBaseModel):
type: Literal["deletion-vector-v1"] =Field()
fields: list[int] =Field()
Expand DownExpand Up@@ -114,3 +147,105 @@ def __init__(self, puffin: bytes) -> None:

defto_vector(self) ->dict[str, "pa.ChunkedArray"]:
return{path: _bitmaps_to_chunked_array(bitmaps) forpath, bitmapsinself._deletion_vectors.items()}


classPuffinWriter:
_blobs: list[PuffinBlobMetadata]
_blob_payloads: list[bytes]
_created_by: str|None

def__init__(self, created_by: str|None=None) ->None:
self._blobs= []
self._blob_payloads= []
self._created_by=created_by

defset_blob(
self,
positions: Iterable[int],
referenced_data_file: str,
) ->None:
# We only support one blob at the moment
self._blobs= []
self._blob_payloads= []

# 1. Create bitmaps from positions
bitmaps: dict[int, BitMap] ={}
forposinpositions:
key=pos>>32
low_bits=pos&0xFFFFFFFF
ifkeynotinbitmaps:
bitmaps[key] =BitMap()
bitmaps[key].add(low_bits)

# Calculate the cardinality from the bitmaps
cardinality=sum(len(bm) forbminbitmaps.values())

# 2. Serialize bitmaps for the vector payload
vector_payload=_serialize_bitmaps(bitmaps)

# 3. Construct the full blob payload for deletion-vector-v1
withio.BytesIO() asblob_payload_buffer:
# Magic bytes for DV
blob_payload_buffer.write(DELETION_VECTOR_MAGIC)
# The vector itself
blob_payload_buffer.write(vector_payload)

# The content for CRC calculation
crc_content=blob_payload_buffer.getvalue()
crc32=zlib.crc32(crc_content)

# The full blob to be stored in the Puffin file
withio.BytesIO() asfull_blob_buffer:
# Combined length of the vector and magic bytes stored as 4 bytes, big-endian
full_blob_buffer.write(len(crc_content).to_bytes(4, "big"))
# The content (magic + vector)
full_blob_buffer.write(crc_content)
# A CRC-32 checksum of the magic bytes and serialized vector as 4 bytes, big-endian
full_blob_buffer.write(crc32.to_bytes(4, "big"))

self._blob_payloads.append(full_blob_buffer.getvalue())

# 4. Create blob metadata
properties={PROPERTY_REFERENCED_DATA_FILE: referenced_data_file, "cardinality": str(cardinality)}

self._blobs.append(
PuffinBlobMetadata(
type="deletion-vector-v1",
fields=[2147483645], # Java INT_MAX - 2, reserved field id for deletion vectors
snapshot_id=-1,
sequence_number=-1,
offset=0, # TODO: Use DeleteFileIndex data
length=0, # TODO: Use DeleteFileIndex data
properties=properties,
compression_codec=None,
)
)

deffinish(self) ->bytes:
withio.BytesIO() asout:
payload_buffer=io.BytesIO()
forblob_payloadinself._blob_payloads:
payload_buffer.write(blob_payload)

updated_blobs_metadata: list[PuffinBlobMetadata] = []
current_offset=4# Start after file magic (4 bytes)
fori, blob_payloadinenumerate(self._blob_payloads):
original_metadata_dict=self._blobs[i].model_dump(by_alias=True, exclude_none=True)
original_metadata_dict["offset"] =current_offset
original_metadata_dict["length"] =len(blob_payload)
updated_blobs_metadata.append(PuffinBlobMetadata(**original_metadata_dict))
current_offset+=len(blob_payload)

footer=Footer(blobs=updated_blobs_metadata, properties={"created-by": self._created_by} ifself._created_byelse{})
footer_payload_bytes=footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8")

# Final assembly
out.write(MAGIC_BYTES)
out.write(payload_buffer.getvalue())
out.write(MAGIC_BYTES)
out.write(footer_payload_bytes)
out.write(len(footer_payload_bytes).to_bytes(4, "little"))
out.write((0).to_bytes(4, "little")) # flags
out.write(MAGIC_BYTES)

returnout.getvalue()
93 changes: 93 additions & 0 deletions tests/integration/test_puffin_spark_interop.py
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
importpytest
frompyspark.sqlimportSparkSession

frompyiceberg.catalog.restimportRestCatalog
frompyiceberg.manifestimportManifestContent
frompyiceberg.table.puffinimportPuffinFile


defrun_spark_commands(spark: SparkSession, sqls: list[str]) ->None:
forsqlinsqls:
spark.sql(sql)


@pytest.mark.integration
deftest_read_spark_written_puffin_dv(spark: SparkSession, session_catalog: RestCatalog) ->None:
"""Verify pyiceberg can read Puffin DVs written by Spark."""
identifier="default.spark_puffin_format_test"

run_spark_commands(spark, [f"DROP TABLE IF EXISTS {identifier}"])
run_spark_commands(
spark,
[
f"""
CREATE TABLE {identifier} (id BIGINT)
USING iceberg
TBLPROPERTIES (
'format-version' = '3',
'write.delete.mode' = 'merge-on-read'
)
""",
],
)

df=spark.range(1, 51)
df.coalesce(1).writeTo(identifier).append()

files_before=spark.sql(f"SELECT * FROM {identifier}.files").collect()
assertlen(files_before) ==1, f"Expected 1 file, got {len(files_before)}"

run_spark_commands(spark, [f"DELETE FROM {identifier} WHERE id IN (10, 20, 30, 40)"])

table=session_catalog.load_table(identifier)
current_snapshot=table.current_snapshot()
assertcurrent_snapshotisnotNone

manifests=current_snapshot.manifests(table.io)
delete_manifests= [mforminmanifestsifm.content==ManifestContent.DELETES]
assertlen(delete_manifests) >0, "Expected delete manifest with DVs"

delete_manifest=delete_manifests[0]
entries=list(delete_manifest.fetch_manifest_entry(table.io))
assertlen(entries) >0, "Expected at least one delete file entry"

delete_entry=entries[0]
puffin_path=delete_entry.data_file.file_path
assertpuffin_path.endswith(".puffin"), f"Expected Puffin file, got: {puffin_path}"

input_file=table.io.new_input(puffin_path)
withinput_file.open() asf:
puffin_bytes=f.read()

puffin=PuffinFile(puffin_bytes)

assertlen(puffin.footer.blobs) ==1, "Expected exactly one blob"

blob=puffin.footer.blobs[0]
assertblob.type=="deletion-vector-v1"
assert"referenced-data-file"inblob.properties
assertblob.properties["cardinality"] =="4"

dv_dict=puffin.to_vector()
assertlen(dv_dict) ==1, "Expected one data file's deletions"

for_data_file_path, chunked_arrayindv_dict.items():
positions=chunked_array.to_pylist()
assertlen(positions) ==4, f"Expected 4 deleted positions, got {len(positions)}"
assertsorted(positions) == [9, 19, 29, 39], f"Unexpected positions: {positions}"
65 changes: 64 additions & 1 deletion tests/table/test_puffin.py
Original file line numberDiff line numberDiff line change
Expand Up@@ -19,7 +19,7 @@
importpytest
frompyroaringimportBitMap

frompyiceberg.table.puffinimport_deserialize_bitmap
frompyiceberg.table.puffinimportPROPERTY_REFERENCED_DATA_FILE, PuffinFile, PuffinWriter, _deserialize_bitmap


def_open_file(file: str) ->bytes:
Expand DownExpand Up@@ -71,3 +71,66 @@ def test_map_high_vals() -> None:

withpytest.raises(ValueError, match="Key 4022190063 is too large, max 2147483647 to maintain compatibility with Java impl"):
_=_deserialize_bitmap(puffin)


deftest_puffin_round_trip() ->None:
# Define some deletion positions for a file
deletions= [5, (1<<32) +1, 5] # Test with a high-bit position and duplicate

file_path="path/to/data.parquet"

# Write the Puffin file
writer=PuffinWriter(created_by="my-test-app")
writer.set_blob(positions=deletions, referenced_data_file=file_path)
puffin_bytes=writer.finish()

# Read the Puffin file back
reader=PuffinFile(puffin_bytes)

# Assert footer metadata
assertreader.footer.properties["created-by"] =="my-test-app"
assertlen(reader.footer.blobs) ==1

blob_meta=reader.footer.blobs[0]
assertblob_meta.properties[PROPERTY_REFERENCED_DATA_FILE] ==file_path
assertblob_meta.properties["cardinality"] ==str(len(set(deletions)))

# Assert the content of deletion vectors
read_vectors=reader.to_vector()

assertfile_pathinread_vectors
assertread_vectors[file_path].to_pylist() ==sorted(set(deletions))


deftest_write_and_read_puffin_file() ->None:
writer=PuffinWriter()
writer.set_blob(positions=[1, 2, 3], referenced_data_file="file1.parquet")
writer.set_blob(positions=[4, 5, 6], referenced_data_file="file2.parquet")
puffin_bytes=writer.finish()

reader=PuffinFile(puffin_bytes)

assertlen(reader.footer.blobs) ==1
blob=reader.footer.blobs[0]

assertblob.properties["referenced-data-file"] =="file2.parquet"
assertblob.properties["cardinality"] =="3"
assertblob.type=="deletion-vector-v1"
assertblob.snapshot_id==-1
assertblob.sequence_number==-1
assertblob.compression_codecisNone

vectors=reader.to_vector()
assertlen(vectors) ==1
assert"file1.parquet"notinvectors
assertvectors["file2.parquet"].to_pylist() == [4, 5, 6]


deftest_puffin_file_with_no_blobs() ->None:
writer=PuffinWriter()
puffin_bytes=writer.finish()

reader=PuffinFile(puffin_bytes)
assertlen(reader.footer.blobs) ==0
assertlen(reader.to_vector()) ==0
assert"created-by"notinreader.footer.properties