Skip to content

Commit

Permalink
[Native] skip file size < 8 (#575)
Browse files Browse the repository at this point in the history
Signed-off-by: chenxu <chenxu@dmetasoul.com>
Co-authored-by: chenxu <chenxu@dmetasoul.com>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Jan 20, 2025
1 parent fe80113 commit 9df97d6
Showing 1 changed file with 49 additions and 33 deletions.
82 changes: 49 additions & 33 deletions rust/lakesoul-io/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
//
// SPDX-License-Identifier: Apache-2.0

use std::{collections::HashMap, sync::Arc};

use arrow::datatypes::UInt32Type;
use arrow_array::{RecordBatch, UInt32Array};
use arrow_array::{Array, RecordBatch, UInt32Array};
use arrow_buffer::i256;
use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaBuilder, SchemaRef, TimeUnit};
use chrono::{DateTime, Duration};
Expand All @@ -23,9 +21,13 @@ use datafusion::{
};
use datafusion_common::DataFusionError::{External, Internal};
use datafusion_common::{cast::as_primitive_array, DFSchema, DataFusionError, Result, ScalarValue};
use std::iter::zip;
use std::{collections::HashMap, sync::Arc};

use datafusion_substrait::substrait::proto::Plan;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::ObjectMeta;
use parquet::format::FileMetaData;
use proto::proto::entity::JniWrapper;
use rand::distributions::DistString;
Expand Down Expand Up @@ -94,7 +96,7 @@ fn range_partition_to_partition_cols(
pub fn get_columnar_values(
batch: &RecordBatch,
range_partitions: Arc<Vec<String>>,
) -> datafusion::error::Result<Vec<(String, ScalarValue)>> {
) -> Result<Vec<(String, ScalarValue)>> {
range_partitions
.iter()
.map(|range_col| {
Expand All @@ -104,12 +106,10 @@ pub fn get_columnar_values(
Err(e) => Err(e),
}
} else {
Err(datafusion::error::DataFusionError::External(
format!("Invalid partition desc of {}", range_col).into(),
))
Err(External(format!("Invalid partition desc of {}", range_col).into()))
}
})
.collect::<datafusion::error::Result<Vec<_>>>()
.collect::<Result<Vec<_>>>()
}

pub fn format_scalar_value(v: &ScalarValue) -> String {
Expand Down Expand Up @@ -310,11 +310,7 @@ pub fn partition_desc_to_scalar_values(schema: SchemaRef, partition_desc: String
Some((name, val)) => {
part_values.push((name, val));
}
_ => {
return Err(datafusion::error::DataFusionError::External(
format!("Invalid partition_desc: {}", partition_desc).into(),
))
}
_ => return Err(External(format!("Invalid partition_desc: {}", partition_desc).into())),
}
}
let mut scalar_values = Vec::with_capacity(schema.fields().len());
Expand Down Expand Up @@ -350,9 +346,7 @@ pub fn partition_desc_from_file_scan_config(conf: &FileScanConfig) -> Result<(St
.map(|(idx, col)| (col.name().clone(), file.partition_values[idx].to_string())),
),
)),
None => Err(DataFusionError::External(
format!("Invalid file_group {:?}", conf.file_groups).into(),
)),
None => Err(External(format!("Invalid file_group {:?}", conf.file_groups).into())),
}
}
}
Expand All @@ -371,8 +365,19 @@ pub async fn listing_table_from_lakesoul_io_config(
.iter()
.map(ListingTableUrl::parse)
.collect::<Result<Vec<_>>>()?;
let object_metas = get_file_object_meta(session_state, &table_paths).await?;
let (table_paths, object_metas): (Vec<_>, Vec<_>) = zip(table_paths, object_metas)
.filter(|(_, obj_meta)| {
let valid = obj_meta.size >= 8;
if !valid {
println!("File {}, size {}, is invalid", obj_meta.location, obj_meta.size);
}
valid
})
.unzip();
// Resolve the schema
let resolved_schema = infer_schema(session_state, &table_paths, Arc::clone(&file_format)).await?;
let resolved_schema =
infer_schema(session_state, &table_paths, &object_metas, Arc::clone(&file_format)).await?;

let target_schema = if lakesoul_io_config.inferring_schema {
SchemaRef::new(Schema::empty())
Expand Down Expand Up @@ -419,31 +424,42 @@ pub async fn listing_table_from_lakesoul_io_config(
Ok((config.file_schema.clone(), Arc::new(ListingTable::try_new(config)?)))
}

pub async fn get_file_object_meta(sc: &SessionState, table_paths: &[ListingTableUrl]) -> Result<Vec<ObjectMeta>> {
let object_store_url = table_paths
.first()
.ok_or(Internal("no table path".to_string()))?
.object_store();
let store = sc.runtime_env().object_store(object_store_url.clone())?;
futures::stream::iter(table_paths)
.map(|path| {
let store = store.clone();
async move {
let path = Path::from_url_path(<ListingTableUrl as AsRef<Url>>::as_ref(path).path())
.map_err(object_store::Error::from)?;
store.head(&path).await
}
})
.boxed()
.buffered(sc.config_options().execution.meta_fetch_concurrency)
.try_collect()
.await
.map_err(DataFusionError::from)
}

pub async fn infer_schema(
sc: &SessionState,
table_paths: &[ListingTableUrl],
object_metas: &[ObjectMeta],
file_format: Arc<dyn FileFormat>,
) -> Result<SchemaRef> {
// Create default parquet options
let object_store_url = table_paths
.first()
.ok_or(DataFusionError::Internal("no table path".to_string()))?
.ok_or(Internal("no table path".to_string()))?
.object_store();
let store = sc.runtime_env().object_store(object_store_url.clone())?;
let mut objects = vec![];

for url in table_paths {
objects.push(
store
.head(&Path::from_url_path(
<ListingTableUrl as AsRef<Url>>::as_ref(url).path(),
)?)
.await?,
);
}

// Resolve the schema
file_format.infer_schema(sc, &store, &objects).await
file_format.infer_schema(sc, &store, &object_metas).await
}

pub fn apply_partition_filter(wrapper: JniWrapper, schema: SchemaRef, filter: Plan) -> Result<JniWrapper> {
Expand Down Expand Up @@ -550,7 +566,7 @@ pub fn get_batch_memory_size(batch: &RecordBatch) -> Result<usize> {
}

pub fn get_file_size(metadata: &FileMetaData) -> usize {
let footer_size= metadata.footer_signing_key_metadata.as_ref().map_or(0, |f| f.len());
let footer_size = metadata.footer_signing_key_metadata.as_ref().map_or(0, |f| f.len());
dbg!(&metadata);
let rg_size = metadata
.row_groups
Expand All @@ -567,4 +583,4 @@ pub fn get_file_exist_col(metadata: &FileMetaData) -> String {
.map(|schema_element| schema_element.name.clone())
.collect::<Vec<_>>()
.join(",")
}
}

0 comments on commit 9df97d6

Please sign in to comment.