Skip to content

Commit

Permalink
[infra] Update pyspark java iceberg library to 1.6.0 (#1462)
Browse files Browse the repository at this point in the history
* update pyspark java iceberb library to 1.6.0

* fix test

* add reminder

* make link
  • Loading branch information
kevinjqliu authored Dec 22, 2024
1 parent 9f47077 commit b450c1c
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 9 deletions.
1 change: 1 addition & 0 deletions dev/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$
RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} && mkdir -p /home/iceberg/spark-events
WORKDIR ${SPARK_HOME}

# Remember to also update `tests/conftest`'s spark setting
ENV SPARK_VERSION=3.5.3
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12
ENV ICEBERG_VERSION=1.6.0
Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2240,9 +2240,10 @@ def spark() -> "SparkSession":

from pyspark.sql import SparkSession

# Remember to also update `dev/Dockerfile`
spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2])
scala_version = "2.12"
iceberg_version = "1.4.3"
iceberg_version = "1.6.0"

os.environ["PYSPARK_SUBMIT_ARGS"] = (
f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version},"
Expand Down
10 changes: 2 additions & 8 deletions tests/integration/test_deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,7 @@ def test_delete_partitioned_table_positional_deletes(spark: SparkSession, sessio
# Will rewrite a data file without the positional delete
tbl.delete(EqualTo("number", 40))

# One positional delete has been added, but an OVERWRITE status is set
# https://github.com/apache/iceberg/issues/10122
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "overwrite", "overwrite"]
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "delete", "overwrite"]
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10], "number": [20]}


Expand Down Expand Up @@ -410,8 +408,6 @@ def test_overwrite_partitioned_table(spark: SparkSession, session_catalog: RestC
# Will rewrite a data file without the positional delete
tbl.overwrite(arrow_tbl, "number_partitioned == 10")

# One positional delete has been added, but an OVERWRITE status is set
# https://github.com/apache/iceberg/issues/10122
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "delete", "append"]
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10, 10, 20], "number": [4, 5, 3]}

Expand Down Expand Up @@ -461,13 +457,11 @@ def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSessio
# Will rewrite a data file without a positional delete
tbl.delete(EqualTo("number", 201))

# One positional delete has been added, but an OVERWRITE status is set
# https://github.com/apache/iceberg/issues/10122
snapshots = tbl.snapshots()
assert len(snapshots) == 3

# Snapshots produced by Spark
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()[0:2]] == ["append", "overwrite"]
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()[0:2]] == ["append", "delete"]

# Will rewrite one parquet file
assert snapshots[2].summary == Summary(
Expand Down

0 comments on commit b450c1c

Please sign in to comment.