Skip to content

Commit

Permalink
[Rust] fix panic
Browse files Browse the repository at this point in the history
Signed-off-by: mag1c1an1 <mag1cian@icloud.com>

fix proto

Signed-off-by: mag1c1an1 <mag1cian@icloud.com>

fix lakesoul-io

Signed-off-by: mag1c1an1 <mag1cian@icloud.com>

fix lakesoul-io-c

Signed-off-by: mag1c1an1 <mag1cian@icloud.com>

fix lakesoul-datafusion

Signed-off-by: mag1c1an1 <mag1cian@icloud.com>

fix lakesoul-datafusion

Signed-off-by: mag1c1an1 <mag1cian@icloud.com>
  • Loading branch information
mag1c1an1 committed Feb 2, 2024
1 parent 4e4b415 commit 5648c05
Show file tree
Hide file tree
Showing 27 changed files with 284 additions and 347 deletions.
22 changes: 22 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ serde = { version = "1.0", features = ["derive", "std", "rc"] }
rand = "^0.8"
bytes = "1.4.0"
half = "^2.1"
tracing = "0.1.40"
tracing = "0.1.40"
thiserror = "1.0"
1 change: 1 addition & 0 deletions rust/lakesoul-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ tokio = { workspace = true }
rand = { workspace = true }
bytes = { workspace = true }
tracing = "0.1.40"
thiserror = { workspace = true }

[dev-dependencies]
ctor = "0.2"
Expand Down
9 changes: 4 additions & 5 deletions rust/lakesoul-datafusion/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ pub(crate) async fn create_table(client: MetaDataClientRef, table_name: &str, co
.create_table(TableInfo {
table_id: format!("table_{}", uuid::Uuid::new_v4()),
table_name: table_name.to_string(),
table_path: format!("file://{}default/{}", env::temp_dir().to_str().unwrap(), table_name),
table_schema: serde_json::to_string::<ArrowJavaSchema>(&config.schema().into()).unwrap(),
table_path: format!("file://{}default/{}", env::temp_dir().to_str().expect("can not get tmp dir"), table_name),
table_schema: serde_json::to_string::<ArrowJavaSchema>(&config.schema().into())?,
table_namespace: "default".to_string(),
properties: serde_json::to_string(&LakeSoulTableProperty {
hash_bucket_num: Some(4),
Expand Down Expand Up @@ -81,7 +81,7 @@ pub(crate) async fn create_io_config_builder(
}

pub(crate) fn parse_table_info_partitions(partitions: String) -> (Vec<String>, Vec<String>) {
let (range_keys, hash_keys) = partitions.split_at(partitions.find(';').unwrap());
let (range_keys, hash_keys) = partitions.split_at(partitions.find(';').expect("wrong partition layout"));
let hash_keys = &hash_keys[1..];
(
range_keys
Expand Down Expand Up @@ -131,8 +131,7 @@ pub(crate) async fn commit_data(
.collect(),
commit_op: CommitOp::AppendCommit as i32,
timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs() as i64,
commit_id: {
let (high, low) = uuid::Uuid::new_v4().as_u64_pair();
Expand Down
90 changes: 19 additions & 71 deletions rust/lakesoul-datafusion/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0

use std::{error::Error, fmt::Display, result, sync::Arc};
use tokio::task::JoinError;
use std::{result, sync::Arc};

use lakesoul_io::lakesoul_reader::{ArrowError, DataFusionError};
use lakesoul_metadata::error::LakeSoulMetaDataError;
Expand All @@ -15,76 +14,25 @@ pub type Result<T, E = LakeSoulError> = result::Result<T, E>;
pub type SharedResult<T> = result::Result<T, Arc<LakeSoulError>>;

/// Error type for generic operations that could result in LakeSoulMetaDataError::External
pub type GenericError = Box<dyn Error + Send + Sync>;
pub type GenericError = Box<dyn std::error::Error + Send + Sync>;

#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
pub enum LakeSoulError {
MetaDataError(LakeSoulMetaDataError),
DataFusionError(DataFusionError),
ArrowError(ArrowError),
SerdeJsonError(serde_json::Error),
TokioJoinError(tokio::task::JoinError),
#[error("metadata error: {0}")]
MetaDataError(#[from] LakeSoulMetaDataError),
#[error("Datafusion error: {0}")]
DataFusionError(#[from] DataFusionError),
#[error("arrow error: {0}")]
ArrowError(#[from] ArrowError),
#[error("serde_json error: {0}")]
SerdeJsonError(#[from] serde_json::Error),
#[error("tokio error: {0}")]
TokioJoinError(#[from] tokio::task::JoinError),
#[error("sys time error: {0}")]
SysTimeError(#[from] std::time::SystemTimeError),
#[error(
"Internal error: {0}.\nThis was likely caused by a bug in LakeSoul's \
code and we would welcome that you file an bug report in our issue tracker"
)]
Internal(String),
}

impl From<LakeSoulMetaDataError> for LakeSoulError {
fn from(err: LakeSoulMetaDataError) -> Self {
Self::MetaDataError(err)
}
}

impl From<DataFusionError> for LakeSoulError {
fn from(err: DataFusionError) -> Self {
Self::DataFusionError(err)
}
}

impl From<ArrowError> for LakeSoulError {
fn from(err: ArrowError) -> Self {
Self::ArrowError(err)
}
}

impl From<serde_json::Error> for LakeSoulError {
fn from(err: serde_json::Error) -> Self {
Self::SerdeJsonError(err)
}
}

impl From<tokio::task::JoinError> for LakeSoulError {
fn from(err: JoinError) -> Self {
Self::TokioJoinError(err)
}
}

impl Display for LakeSoulError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match *self {
LakeSoulError::MetaDataError(ref desc) => write!(f, "metadata error: {desc}"),
LakeSoulError::DataFusionError(ref desc) => write!(f, "DataFusion error: {desc}"),
LakeSoulError::SerdeJsonError(ref desc) => write!(f, "serde_json error: {desc}"),
LakeSoulError::ArrowError(ref desc) => write!(f, "arrow error: {desc}"),
LakeSoulError::TokioJoinError(ref desc) => write!(f, "tokio error: {desc}"),
LakeSoulError::Internal(ref desc) => {
write!(
f,
"Internal error: {desc}.\nThis was likely caused by a bug in LakeSoul's \
code and we would welcome that you file an bug report in our issue tracker"
)
}
}
}
}

impl Error for LakeSoulError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
LakeSoulError::MetaDataError(e) => Some(e),
LakeSoulError::DataFusionError(e) => Some(e),
LakeSoulError::SerdeJsonError(e) => Some(e),
LakeSoulError::ArrowError(e) => Some(e),
LakeSoulError::TokioJoinError(e) => Some(e),
LakeSoulError::Internal(_) => None,
}
}
}
3 changes: 2 additions & 1 deletion rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use crate::{

pub(crate) fn create_io_config_builder_from_table_info(table_info: Arc<TableInfo>) -> LakeSoulIOConfigBuilder {
let (range_partitions, hash_partitions) = parse_table_info_partitions(table_info.partitions.clone());
let properties = serde_json::from_str::<LakeSoulTableProperty>(&table_info.properties).unwrap();
let properties = serde_json::from_str::<LakeSoulTableProperty>(&table_info.properties)
.expect("deserialize lakesoul table property failed");
LakeSoulIOConfigBuilder::new()
.with_schema(schema_from_metadata_str(&table_info.table_schema))
.with_prefix(table_info.table_path.clone())
Expand Down
2 changes: 1 addition & 1 deletion rust/lakesoul-datafusion/src/lakesoul_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl LakeSoulTable {
let table_schema = schema_from_metadata_str(&table_info.table_schema);

let table_name = table_info.table_name.clone();
let properties = serde_json::from_str::<LakeSoulTableProperty>(&table_info.properties).unwrap();
let properties = serde_json::from_str::<LakeSoulTableProperty>(&table_info.properties)?;
let (_, hash_partitions) = parse_table_info_partitions(table_info.partitions.clone());

Ok(Self {
Expand Down
4 changes: 2 additions & 2 deletions rust/lakesoul-datafusion/src/planner/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ impl PhysicalPlanner for LakeSoulPhysicalPlanner {
let schema = table_name.schema();
// let schema = session_state.schema_for_ref(table_name)?;
let lakesoul_table = LakeSoulTable::for_namespace_and_name(schema.unwrap_or("default"), name)
.await
.unwrap();
.await.map_err(|e| DataFusionError::External(Box::new(e)))?;

match lakesoul_table.as_sink_provider(session_state).await {
Ok(provider) => {
let physical_input = self.create_physical_plan(input, session_state).await?;
Expand Down
8 changes: 4 additions & 4 deletions rust/lakesoul-datafusion/src/serialize/arrow_java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,20 +294,20 @@ impl From<&ArrowJavaField> for Field {
field.children.iter().map(|f| f.into()).collect::<Vec<Field>>(),
)),
ArrowJavaType::List => {
assert!(field.children.len() == 1);
assert_eq!(field.children.len(), 1);
DataType::List(Arc::new(field.children.first().unwrap().into()))
}
ArrowJavaType::LargeList => {
assert!(field.children.len() == 1);
assert_eq!(field.children.len(), 1);
DataType::LargeList(Arc::new(field.children.first().unwrap().into()))
}
ArrowJavaType::FixedSizeList { list_size } => {
assert!(field.children.len() == 1);
assert_eq!(field.children.len(), 1);
DataType::FixedSizeList(Arc::new(field.children.first().unwrap().into()), *list_size)
}
ArrowJavaType::Union => todo!("Union type not supported"),
ArrowJavaType::Map { keys_sorted } => {
assert!(field.children.len() == 1);
assert_eq!(field.children.len(), 1);
DataType::Map(Arc::new(field.children.first().unwrap().into()), *keys_sorted)
}
ArrowJavaType::Int { is_signed, bit_width } => {
Expand Down
2 changes: 1 addition & 1 deletion rust/lakesoul-io-c/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ extern crate cbindgen;
use std::env;

fn main() {
let crate_dir = env::var("CARGO_MANIFEST_DIR").unwrap();
let crate_dir = env::var("CARGO_MANIFEST_DIR").expect("can not find CARGO_MANIFEST_DIR");

cbindgen::Builder::new()
.with_crate(crate_dir)
Expand Down
7 changes: 5 additions & 2 deletions rust/lakesoul-io/src/datasource/listing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::{datasource::TableProvider, logical_expr::Expr};

use datafusion::logical_expr::{TableProviderFilterPushDown, TableType};
use datafusion_common::{FileTypeWriterOptions, Result};
use datafusion_common::{DataFusionError, FileTypeWriterOptions, Result};
use tracing::{debug, instrument};

use crate::lakesoul_io_config::LakeSoulIOConfig;
Expand Down Expand Up @@ -58,7 +58,10 @@ impl LakeSoulListingTable {
.map(ListingTableUrl::parse)
.collect::<Result<Vec<_>>>()?;
// Create default parquet options
let object_store_url = table_paths.first().unwrap().object_store();
let object_store_url = table_paths
.first()
.ok_or(DataFusionError::Internal("no table path".to_string()))?
.object_store();
let store = session_state.runtime_env().object_store(object_store_url.clone())?;
let target_schema = uniform_schema(lakesoul_io_config.schema());

Expand Down
Loading

0 comments on commit 5648c05

Please sign in to comment.