Skip to content

Commit

Permalink
chore: refactor some of the writebuilder preconditions into the appro…
Browse files Browse the repository at this point in the history
…priate function

This change also refactors the check_preconditions function itself to be
a bit less redundant and even includes some unit tests! Holy smokes!

Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
  • Loading branch information
rtyler committed Nov 24, 2024
1 parent 79456f1 commit c623644
Showing 1 changed file with 181 additions and 36 deletions.
217 changes: 181 additions & 36 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,20 +309,45 @@ impl WriteBuilder {
}

async fn check_preconditions(&self) -> DeltaResult<Vec<Action>> {
if self.schema_mode == Some(SchemaMode::Overwrite) && self.mode != SaveMode::Overwrite {
return Err(DeltaTableError::Generic(
"Schema overwrite not supported for Append".to_string(),
));
}

let batches: &Vec<RecordBatch> = match &self.batches {
Some(batches) => {
if batches.is_empty() {
error!("The WriteBuilder was an empty set of batches!");
return Err(WriteError::MissingData.into());
}
batches
}
None => {
if self.input.is_none() {
error!("The WriteBuilder must have an input plan _or_ batches!");
return Err(WriteError::MissingData.into());
}
// provide an empty array in the case that an input plan exists
&vec![]
}
};

let schema: StructType = match &self.input {
Some(plan) => (plan.schema()).try_into()?,
None => (batches[0].schema()).try_into()?,
};

match &self.snapshot {
Some(snapshot) => {
PROTOCOL.can_write_to(snapshot)?;

let schema: StructType = if let Some(plan) = &self.input {
(plan.schema()).try_into()?
} else if let Some(batches) = &self.batches {
if batches.is_empty() {
return Err(WriteError::MissingData.into());
if self.mode == SaveMode::Overwrite {
PROTOCOL.check_append_only(&snapshot.snapshot)?;
if !snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("WRITE".into()));
}
(batches[0].schema()).try_into()?
} else {
return Err(WriteError::MissingData.into());
};
}

PROTOCOL.can_write_to(snapshot)?;

if self.schema_mode.is_none() {
PROTOCOL.check_can_write_timestamp_ntz(snapshot, &schema)?;
Expand All @@ -335,16 +360,6 @@ impl WriteBuilder {
}
}
None => {
let schema: StructType = if let Some(plan) = &self.input {
Ok(plan.schema().try_into()?)
} else if let Some(batches) = &self.batches {
if batches.is_empty() {
return Err(WriteError::MissingData.into());
}
Ok(batches[0].schema().try_into()?)
} else {
Err(WriteError::MissingData)
}?;
let mut builder = CreateBuilder::new()
.with_log_store(self.log_store.clone())
.with_columns(schema.fields().cloned())
Expand Down Expand Up @@ -786,21 +801,8 @@ impl std::future::IntoFuture for WriteBuilder {
let mut metrics = WriteMetrics::default();
let exec_start = Instant::now();

if this.mode == SaveMode::Overwrite {
if let Some(snapshot) = &this.snapshot {
PROTOCOL.check_append_only(&snapshot.snapshot)?;
if !snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("WRITE".into()));
}
}
}
if this.schema_mode == Some(SchemaMode::Overwrite) && this.mode != SaveMode::Overwrite {
return Err(DeltaTableError::Generic(
"Schema overwrite not supported for Append".to_string(),
));
}

// Create table actions to initialize table in case it does not yet exist and should be created
// Create table actions to initialize table in case it does not yet exist and should be
// created
let mut actions = this.check_preconditions().await?;

let active_partitions = this
Expand Down Expand Up @@ -2320,4 +2322,147 @@ mod tests {
assert!(!cdc_actions.is_empty());
Ok(())
}

/// SMall module to collect test cases which validate the [WriteBuilder]'s
/// check_preconditions() function
mod check_preconditions_test {
use super::*;

#[tokio::test]
async fn test_schema_overwrite_on_append() -> DeltaResult<()> {
let table_schema = get_delta_schema();
let batch = get_record_batch(None, false);
let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().cloned())
.await?;
let writer = DeltaOps(table)
.write(vec![batch])
.with_schema_mode(SchemaMode::Overwrite)
.with_save_mode(SaveMode::Append);

let check = writer.check_preconditions().await;
assert!(check.is_err());
Ok(())
}

#[tokio::test]
async fn test_savemode_overwrite_on_append_table() -> DeltaResult<()> {
let table_schema = get_delta_schema();
let batch = get_record_batch(None, false);
let table = DeltaOps::new_in_memory()
.create()
.with_configuration_property(TableProperty::AppendOnly, Some("true".to_string()))
.with_columns(table_schema.fields().cloned())
.await?;
let writer = DeltaOps(table)
.write(vec![batch])
.with_save_mode(SaveMode::Overwrite);

let check = writer.check_preconditions().await;
assert!(check.is_err());
Ok(())
}

#[tokio::test]
async fn test_empty_set_of_batches() -> DeltaResult<()> {
let table_schema = get_delta_schema();
let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().cloned())
.await?;
let writer = DeltaOps(table).write(vec![]);

match writer.check_preconditions().await {
Ok(_) => panic!("Expected check_preconditions to fail!"),
Err(DeltaTableError::GenericError { .. }) => {}
Err(e) => panic!("Unexpected error returned: {e:#?}"),
}
Ok(())
}

#[tokio::test]
async fn test_errorifexists() -> DeltaResult<()> {
let table_schema = get_delta_schema();
let batch = get_record_batch(None, false);
let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().cloned())
.await?;
let writer = DeltaOps(table)
.write(vec![batch])
.with_save_mode(SaveMode::ErrorIfExists);

match writer.check_preconditions().await {
Ok(_) => panic!("Expected check_preconditions to fail!"),
Err(DeltaTableError::GenericError { .. }) => {}
Err(e) => panic!("Unexpected error returned: {e:#?}"),
}
Ok(())
}

#[tokio::test]
async fn test_allow_empty_batches_with_input_plan() -> DeltaResult<()> {
let table_schema = get_delta_schema();
let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().cloned())
.await?;

let ctx = SessionContext::new();
let plan = ctx
.sql("SELECT 1 as id")
.await
.unwrap()
.create_physical_plan()
.await
.unwrap();
let writer = WriteBuilder::new(table.log_store.clone(), table.state)
.with_input_execution_plan(plan)
.with_save_mode(SaveMode::Overwrite);

let _ = writer.check_preconditions().await?;
Ok(())
}

#[tokio::test]
async fn test_no_snapshot_create_actions() -> DeltaResult<()> {
let table_schema = get_delta_schema();
let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().cloned())
.await?;
let batch = get_record_batch(None, false);
let writer =
WriteBuilder::new(table.log_store.clone(), None).with_input_batches(vec![batch]);

let actions = writer.check_preconditions().await?;
assert_eq!(
actions.len(),
2,
"Expecting a Protocol and a Metadata action in {actions:?}"
);

Ok(())
}

#[tokio::test]
async fn test_no_snapshot_err_no_batches_check() -> DeltaResult<()> {
let table_schema = get_delta_schema();
let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().cloned())
.await?;
let writer =
WriteBuilder::new(table.log_store.clone(), None).with_input_batches(vec![]);

match writer.check_preconditions().await {
Ok(_) => panic!("Expected check_preconditions to fail!"),
Err(DeltaTableError::GenericError { .. }) => {}
Err(e) => panic!("Unexpected error returned: {e:#?}"),
}

Ok(())
}
}
}

0 comments on commit c623644

Please sign in to comment.