diff --git a/Cargo.lock b/Cargo.lock index a787471d..6750ce14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -280,6 +280,7 @@ dependencies = [ "datafusion", "datafusion-functions-json", "datafusion-table-providers", + "deltalake", "flume", "futures", "futures-util", @@ -761,7 +762,7 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61b1d86e7705efe1be1b569bab41d4fa1e14e220b60a160f78de2db687add079" dependencies = [ - "bindgen 0.69.5", + "bindgen", "cc", "cmake", "dunce", @@ -810,6 +811,28 @@ dependencies = [ "uuid", ] +[[package]] +name = "aws-sdk-dynamodb" +version = "1.82.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fe8ed25686f117ab3a34dec9cf4d0b25f3555d16537858ef530b209967deecf" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-sso" version = "1.73.0" @@ -1139,6 +1162,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "302eaff5357a264a2c42f127ecb8bac761cf99749fc3dc95677e2743991f99e7" dependencies = [ "fastrand", + "tokio", ] [[package]] @@ -1374,24 +1398,6 @@ dependencies = [ "which", ] -[[package]] -name = "bindgen" -version = "0.71.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3" -dependencies = [ - "bitflags", - "cexpr", - "clang-sys", - "itertools 0.13.0", - "proc-macro2", - "quote", - "regex", - "rustc-hash 2.1.1", - "shlex", - "syn 2.0.101", -] - [[package]] name = "bit-set" version = "0.8.0" @@ -1560,6 +1566,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "bytemuck" +version = "1.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c76a5792e44e4abe34d3abf15636779261d45a7450612059293d1d2cfc63422" + [[package]] name = "byteorder" version = "1.5.0" @@ -2007,6 +2019,15 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "convert_case" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baaaa0ecca5b51987b9423ccdc971514dd8b0bb7b4060b983d3664dad3f1f89f" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -2310,7 +2331,7 @@ dependencies = [ "parquet", "rand 0.8.5", "regex", - "sqlparser", + "sqlparser 0.55.0", "tempfile", "tokio", "url", @@ -2389,7 +2410,7 @@ dependencies = [ "paste", "pyo3", "recursive", - "sqlparser", + "sqlparser 0.55.0", "tokio", "web-time", ] @@ -2590,7 +2611,7 @@ dependencies = [ "paste", "recursive", "serde_json", - "sqlparser", + "sqlparser 0.55.0", ] [[package]] @@ -2925,7 +2946,7 @@ dependencies = [ "log", "recursive", "regex", - "sqlparser", + "sqlparser 0.55.0", ] [[package]] @@ -2987,6 +3008,197 @@ dependencies = [ "generic-array", ] +[[package]] +name = "delta_kernel" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c96f51383ba327a1403e6e3458f8fc979d09d7200af56fa32681619f6c760dee" +dependencies = [ + "arrow", + "bytes", + "chrono", + "delta_kernel_derive", + "futures", + "indexmap 2.9.0", + "itertools 0.14.0", + "object_store", + "parquet", + "reqwest", + "roaring", + "rustc_version", + "serde", + "serde_json", + "strum 0.27.1", + "thiserror 2.0.12", + "tokio", + "tracing", + "url", + "uuid", + "z85", +] + +[[package]] +name = "delta_kernel_derive" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7b49a2e67ebafbe644e36f251ee985f237bfb39e4ef1e312eb5876535bc449e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + +[[package]] +name = "deltalake" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5c9558d4d4f64d006196dd05e01bef3ac25e4250164f04e89f6461b8d8130f8" +dependencies = [ + "deltalake-aws", + "deltalake-azure", + "deltalake-core", + "deltalake-gcp", +] + +[[package]] +name = "deltalake-aws" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e80ccc8edaad2ffd8eaa04732ae9b573cbf88a2ce58f087479427bec718c7e2" +dependencies = [ + "async-trait", + "aws-config", + "aws-credential-types", + "aws-sdk-dynamodb", + "aws-sdk-sts", + "aws-smithy-runtime-api", + "backon", + "bytes", + "chrono", + "deltalake-core", + "futures", + "maplit", + "object_store", + "regex", + "thiserror 2.0.12", + "tokio", + "tracing", + "url", + "uuid", +] + +[[package]] +name = "deltalake-azure" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d79b37806a7e6bb0dfa2a156ddd62e935d4c0cba6f96a2982da5dfe109b0918" +dependencies = [ + "async-trait", + "bytes", + "deltalake-core", + "futures", + "object_store", + "regex", + "thiserror 2.0.12", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "deltalake-core" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96e6d67a03d7ee6ff55607f58e041d67c9e9cce8b2e43f00f4ca0729f912a0b" +dependencies = [ + "arrow", + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-ipc", + "arrow-json", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "async-trait", + "bytes", + "cfg-if", + "chrono", + "dashmap", + "datafusion", + "datafusion-common", + "datafusion-expr", + "datafusion-functions", + "datafusion-functions-aggregate", + "datafusion-physical-expr", + "datafusion-physical-plan", + "datafusion-proto", + "datafusion-sql", + "delta_kernel", + "deltalake-derive", + "either", + "futures", + "humantime", + "indexmap 2.9.0", + "itertools 0.14.0", + "maplit", + "num-bigint", + "num-traits", + "num_cpus", + "object_store", + "parking_lot", + "parquet", + "percent-encoding", + "pin-project-lite", + "rand 0.8.5", + "regex", + "roaring", + "serde", + "serde_json", + "sqlparser 0.56.0", + "strum 0.27.1", + "thiserror 2.0.12", + "tokio", + "tracing", + "url", + "urlencoding", + "uuid", + "z85", +] + +[[package]] +name = "deltalake-derive" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bdd39efa077173455fa69c17437141d14ec6273a371d7d3d25ea7f30f61d4c9" +dependencies = [ + "convert_case 0.8.0", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.101", +] + +[[package]] +name = "deltalake-gcp" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68535c5eb131ceeb713bfc7664d12b270ade5631257ad1d3e13663e4143e8d99" +dependencies = [ + "async-trait", + "bytes", + "deltalake-core", + "futures", + "object_store", + "regex", + "thiserror 2.0.12", + "tokio", + "tracing", + "url", +] + [[package]] name = "der" version = "0.7.10" @@ -3201,7 +3413,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.60.2", ] [[package]] @@ -4536,7 +4748,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.53.2", ] [[package]] @@ -4703,6 +4915,12 @@ dependencies = [ "roff", ] +[[package]] +name = "maplit" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" + [[package]] name = "matchers" version = "0.1.0" @@ -6098,7 +6316,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -6468,6 +6686,16 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" +[[package]] +name = "roaring" +version = "0.10.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e8d2cfa184d94d0726d650a9f4a1be7f9b76ac9fdb954219878dc00c1c1e7b" +dependencies = [ + "bytemuck", + "byteorder", +] + [[package]] name = "roff" version = "0.1.0" @@ -6585,7 +6813,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -6598,7 +6826,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.9.4", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -7287,6 +7515,16 @@ dependencies = [ "sqlparser_derive", ] +[[package]] +name = "sqlparser" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e68feb51ffa54fc841e086f58da543facfe3d7ae2a60d69b0a8cbbd30d16ae8d" +dependencies = [ + "log", + "recursive", +] + [[package]] name = "sqlparser_derive" version = "0.3.0" @@ -7504,7 +7742,7 @@ dependencies = [ "cfg-if", "libc", "psm", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -7566,6 +7804,15 @@ version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" +[[package]] +name = "strum" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f64def088c51c9510a8579e3c5d67c65349dcf755e5479ad3d010aa6454e2c32" +dependencies = [ + "strum_macros 0.27.1", +] + [[package]] name = "strum_macros" version = "0.25.3" @@ -7592,6 +7839,19 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "strum_macros" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c77a8c5abcaf0f9ce05d62342b7d298c346515365c36b673df4ebe3ced01fde8" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.101", +] + [[package]] name = "subtle" version = "2.6.1" @@ -7710,7 +7970,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix 1.0.7", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -8558,6 +8818,7 @@ checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d" dependencies = [ "getrandom 0.3.3", "js-sys", + "rand 0.9.1", "serde", "wasm-bindgen", ] @@ -8610,7 +8871,7 @@ dependencies = [ "clap", "codespan-reporting", "community-id", - "convert_case", + "convert_case 0.7.1", "crc", "crypto_secretbox", "csv", @@ -9371,6 +9632,12 @@ dependencies = [ "synstructure", ] +[[package]] +name = "z85" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b3a41ce106832b4da1c065baa4c31cf640cf965fa1483816402b7f6b96f0a64" + [[package]] name = "zerocopy" version = "0.8.26" @@ -9468,20 +9735,19 @@ dependencies = [ [[package]] name = "zstd-safe" -version = "7.2.4" +version = "7.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +checksum = "54a3ab4db68cea366acc5c897c7b4d4d1b8994a9cd6e6f841f8964566a419059" dependencies = [ "zstd-sys", ] [[package]] name = "zstd-sys" -version = "2.0.15+zstd.1.5.7" +version = "2.0.12+zstd.1.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237" +checksum = "0a4e40c320c3cb459d9a9ff6de98cff88f4751ee9275d140e2be94a2b74e4c13" dependencies = [ - "bindgen 0.71.1", "cc", "pkg-config", ] diff --git a/crates/arkflow-plugin/Cargo.toml b/crates/arkflow-plugin/Cargo.toml index 987d4a2c..acd4d180 100644 --- a/crates/arkflow-plugin/Cargo.toml +++ b/crates/arkflow-plugin/Cargo.toml @@ -83,6 +83,9 @@ tokio-modbus = { version = "0.16", default-features = false, features = ["tcp"] object_store = { version = "0.12", features = ["aws", "azure", "gcp"] } hdfs-native-object-store = "0.14" +# Delta Lake +deltalake = { version = "0.26", features = ["s3", "azure", "gcs", "datafusion"] } + # python pyo3 = { version = "0.24", features = ["auto-initialize", "serde"] } diff --git a/crates/arkflow-plugin/src/output/deltalake.rs b/crates/arkflow-plugin/src/output/deltalake.rs new file mode 100644 index 00000000..4f407c1c --- /dev/null +++ b/crates/arkflow-plugin/src/output/deltalake.rs @@ -0,0 +1,423 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! Delta Lake output component +//! +//! This component writes data to Delta Lake tables stored in object storage. + +use std::collections::HashMap; +use std::sync::Arc; + +use arkflow_core::output::{register_output_builder, Output, OutputBuilder}; +use arkflow_core::{Error, MessageBatch, Resource}; +use async_trait::async_trait; +use deltalake::kernel::Schema; +use deltalake::{ + arrow::record_batch::RecordBatch, operations::create::CreateBuilder, DeltaOps, DeltaTable, + DeltaTableBuilder, +}; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; +use tracing::{debug, info, warn}; + +/// Delta Lake output configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +struct DeltaLakeOutputConfig { + /// Delta table URI (e.g., s3://bucket/path/to/table, azure://container/path, gs://bucket/path) + table_uri: String, + /// Write mode: append, overwrite, error, ignore + #[serde(default = "default_write_mode")] + write_mode: WriteMode, + /// Storage options for authentication and configuration + #[serde(default)] + storage_options: HashMap, + /// Whether to create table if it doesn't exist + #[serde(default = "default_create_if_not_exists")] + create_if_not_exists: bool, + /// Schema for table creation (optional, inferred from data if not provided) + schema: Option, + /// Partition columns (optional) + #[serde(default)] + partition_columns: Vec, + /// Maximum number of retries for write operations + #[serde(default = "default_max_retries")] + max_retries: u32, + /// Retry delay in milliseconds + #[serde(default = "default_retry_delay_ms")] + retry_delay_ms: u64, +} + +/// Write mode enumeration for better type safety +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +enum WriteMode { + Append, + Overwrite, + Error, + Ignore, +} + +fn default_write_mode() -> WriteMode { + WriteMode::Append +} + +fn default_create_if_not_exists() -> bool { + true +} + +fn default_max_retries() -> u32 { + 3 +} + +fn default_retry_delay_ms() -> u64 { + 1000 +} + +/// Delta Lake output component +struct DeltaLakeOutput { + config: DeltaLakeOutputConfig, + table: Arc>>, +} + +impl DeltaLakeOutput { + /// Create a new Delta Lake output component + pub fn new(config: DeltaLakeOutputConfig) -> Result { + // Validate configuration at creation time + Self::validate_config(&config)?; + + Ok(Self { + config, + table: Arc::new(RwLock::new(None)), + }) + } + + /// Validate configuration + fn validate_config(config: &DeltaLakeOutputConfig) -> Result<(), Error> { + if config.table_uri.trim().is_empty() { + return Err(Error::Config("table_uri cannot be empty".to_string())); + } + + // Validate URI format + if !config.table_uri.starts_with("s3://") + && !config.table_uri.starts_with("azure://") + && !config.table_uri.starts_with("gs://") + && !config.table_uri.starts_with("file://") + && !config.table_uri.starts_with("/") { + warn!("Table URI '{}' may not be a valid storage path", config.table_uri); + } + + Ok(()) + } + + /// Get or create the Delta table + async fn get_or_create_table(&self) -> Result { + // First try read lock for better concurrency + { + let table_guard = self.table.read().await; + if let Some(table) = table_guard.as_ref() { + return Ok(table.clone()); + } + } + + // Need write lock to initialize table + let mut table_guard = self.table.write().await; + + // Double-check pattern to avoid race condition + if let Some(table) = table_guard.as_ref() { + return Ok(table.clone()); + } + + // Try to load existing table + let mut builder = DeltaTableBuilder::from_uri(&self.config.table_uri); + + // Add storage options + builder = builder.with_storage_options(self.config.storage_options.clone()); + + match builder.load().await { + Ok(table) => { + info!("Loaded existing Delta table from {}", self.config.table_uri); + table_guard.replace(table.clone()); + Ok(table) + } + Err(load_err) if self.config.create_if_not_exists => { + info!("Creating new Delta table at {} (load error: {})", self.config.table_uri, load_err); + self.create_table_internal(&mut table_guard).await + } + Err(e) => Err(Error::Connection(format!( + "Failed to load Delta table and create_if_not_exists is false: {}", + e + ))), + } + } + + /// Create a new Delta table (internal method with write lock already held) + async fn create_table_internal( + &self, + table_guard: &mut tokio::sync::RwLockWriteGuard<'_, Option>, + ) -> Result { + let mut create_builder = CreateBuilder::new() + .with_location(&self.config.table_uri) + .with_storage_options(self.config.storage_options.clone()); + + // Add partition columns if specified + if !self.config.partition_columns.is_empty() { + create_builder = + create_builder.with_partition_columns(self.config.partition_columns.clone()); + } + + // If schema is provided, parse and use it + if let Some(schema_str) = &self.config.schema { + let schema = self.parse_schema(schema_str)?; + let fields: Vec<_> = schema.fields().cloned().collect(); + create_builder = create_builder.with_columns(fields); + } + + let table = create_builder + .await + .map_err(|e| Error::Connection(format!("Failed to create Delta table: {}", e)))?; + + table_guard.replace(table.clone()); + Ok(table) + } + + /// Execute write operation with retry logic + async fn write_with_retry(&self, record_batch: RecordBatch) -> Result { + let mut last_error = None; + + for attempt in 0..=self.config.max_retries { + if attempt > 0 { + let delay = std::time::Duration::from_millis(self.config.retry_delay_ms * attempt as u64); + debug!("Retrying write operation after {:?} (attempt {})", delay, attempt); + tokio::time::sleep(delay).await; + } + + match self.execute_write_operation(record_batch.clone()).await { + Ok(table) => return Ok(table), + Err(e) => { + warn!("Write attempt {} failed: {}", attempt + 1, e); + last_error = Some(e); + } + } + } + + Err(last_error.unwrap_or_else(|| Error::Process("All write attempts failed".to_string()))) + } + + /// Execute the actual write operation + async fn execute_write_operation(&self, record_batch: RecordBatch) -> Result { + let table = self.get_or_create_table().await?; + let ops = DeltaOps::from(table); + + let result = match self.config.write_mode { + WriteMode::Append => ops + .write([record_batch]) + .await + .map_err(|e| Error::Process(format!("Failed to append to Delta table: {}", e)))?, + WriteMode::Overwrite => ops + .write([record_batch]) + .with_save_mode(deltalake::protocol::SaveMode::Overwrite) + .await + .map_err(|e| Error::Process(format!("Failed to overwrite Delta table: {}", e)))?, + WriteMode::Error | WriteMode::Ignore => { + return Err(Error::Config(format!( + "Write mode '{:?}' not implemented yet", + self.config.write_mode + ))); + } + }; + + Ok(result) + } + + /// Parse schema from JSON string + fn parse_schema(&self, schema_str: &str) -> Result { + serde_json::from_str(schema_str) + .map_err(|e| Error::Config(format!("Invalid schema JSON: {}", e))) + } +} + +#[async_trait] +impl Output for DeltaLakeOutput { + async fn connect(&self) -> Result<(), Error> { + // Initialize the table connection + self.get_or_create_table().await?; + info!("Connected to Delta table at {}", self.config.table_uri); + Ok(()) + } + + async fn write(&self, msg: MessageBatch) -> Result<(), Error> { + // Convert MessageBatch to RecordBatch + let record_batch: RecordBatch = msg.into(); + + // Early return for empty batches + if record_batch.num_rows() == 0 { + debug!("Skipping write for empty record batch"); + return Ok(()); + } + + debug!("Writing {} rows to Delta table", record_batch.num_rows()); + + // Execute write with retry logic + let result = self.write_with_retry(record_batch).await?; + + debug!( + "Successfully wrote data to Delta table, version: {}", + result.version() + ); + + // Update the table reference with the new version + let mut table_guard = self.table.write().await; + table_guard.replace(result); + + Ok(()) + } + + async fn close(&self) -> Result<(), Error> { + info!("Closing Delta Lake output connection"); + let mut table_guard = self.table.write().await; + table_guard.take(); + Ok(()) + } +} + +/// Delta Lake output builder +pub(crate) struct DeltaLakeOutputBuilder; + +impl OutputBuilder for DeltaLakeOutputBuilder { + fn build( + &self, + _name: Option<&String>, + config: &Option, + _resource: &Resource, + ) -> Result, Error> { + if config.is_none() { + return Err(Error::Config( + "Delta Lake output configuration is missing".to_string(), + )); + } + + // Parse the configuration + let config: DeltaLakeOutputConfig = serde_json::from_value(config.clone().unwrap()) + .map_err(|e| Error::Config(format!("Invalid Delta Lake configuration: {}", e)))?; + + // Validate required fields + if config.table_uri.is_empty() { + return Err(Error::Config("table_uri is required".to_string())); + } + + Ok(Arc::new(DeltaLakeOutput::new(config)?)) + } +} + +/// Initialize the Delta Lake output component +pub fn init() -> Result<(), Error> { + register_output_builder("deltalake", Arc::new(DeltaLakeOutputBuilder)) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_config_parsing() { + let config_json = json!({ + "table_uri": "s3://my-bucket/my-table", + "write_mode": "append", + "storage_options": { + "AWS_ACCESS_KEY_ID": "test", + "AWS_SECRET_ACCESS_KEY": "test" + }, + "create_if_not_exists": true, + "partition_columns": ["year", "month"], + "max_retries": 5, + "retry_delay_ms": 2000 + }); + + let config: DeltaLakeOutputConfig = serde_json::from_value(config_json).unwrap(); + assert_eq!(config.table_uri, "s3://my-bucket/my-table"); + assert_eq!(config.write_mode, WriteMode::Append); + assert_eq!(config.storage_options.len(), 2); + assert!(config.create_if_not_exists); + assert_eq!(config.partition_columns, vec!["year", "month"]); + assert_eq!(config.max_retries, 5); + assert_eq!(config.retry_delay_ms, 2000); + } + + #[test] + fn test_config_defaults() { + let minimal_config_json = json!({ + "table_uri": "s3://my-bucket/my-table" + }); + + let config: DeltaLakeOutputConfig = serde_json::from_value(minimal_config_json).unwrap(); + assert_eq!(config.table_uri, "s3://my-bucket/my-table"); + assert_eq!(config.write_mode, WriteMode::Append); + assert!(config.storage_options.is_empty()); + assert!(config.create_if_not_exists); + assert!(config.partition_columns.is_empty()); + assert_eq!(config.max_retries, 3); + assert_eq!(config.retry_delay_ms, 1000); + } + + #[test] + fn test_write_mode_serialization() { + assert_eq!(serde_json::to_string(&WriteMode::Append).unwrap(), "\"append\""); + assert_eq!(serde_json::to_string(&WriteMode::Overwrite).unwrap(), "\"overwrite\""); + assert_eq!(serde_json::to_string(&WriteMode::Error).unwrap(), "\"error\""); + assert_eq!(serde_json::to_string(&WriteMode::Ignore).unwrap(), "\"ignore\""); + } + + #[test] + fn test_config_validation() { + // Valid config + let valid_config = DeltaLakeOutputConfig { + table_uri: "s3://my-bucket/my-table".to_string(), + write_mode: WriteMode::Append, + storage_options: HashMap::new(), + create_if_not_exists: true, + schema: None, + partition_columns: vec![], + max_retries: 3, + retry_delay_ms: 1000, + }; + assert!(DeltaLakeOutput::new(valid_config).is_ok()); + + // Invalid config - empty table_uri + let invalid_config = DeltaLakeOutputConfig { + table_uri: "".to_string(), + write_mode: WriteMode::Append, + storage_options: HashMap::new(), + create_if_not_exists: true, + schema: None, + partition_columns: vec![], + max_retries: 3, + retry_delay_ms: 1000, + }; + assert!(DeltaLakeOutput::new(invalid_config).is_err()); + + // Invalid config - whitespace only table_uri + let whitespace_config = DeltaLakeOutputConfig { + table_uri: " ".to_string(), + write_mode: WriteMode::Append, + storage_options: HashMap::new(), + create_if_not_exists: true, + schema: None, + partition_columns: vec![], + max_retries: 3, + retry_delay_ms: 1000, + }; + assert!(DeltaLakeOutput::new(whitespace_config).is_err()); + } +} diff --git a/crates/arkflow-plugin/src/output/mod.rs b/crates/arkflow-plugin/src/output/mod.rs index eb144a76..989dbf02 100644 --- a/crates/arkflow-plugin/src/output/mod.rs +++ b/crates/arkflow-plugin/src/output/mod.rs @@ -18,16 +18,18 @@ use arkflow_core::Error; +pub mod deltalake; pub mod drop; pub mod http; pub mod kafka; pub mod mqtt; -pub mod sql; pub mod nats; pub mod redis; +pub mod sql; pub mod stdout; pub fn init() -> Result<(), Error> { + deltalake::init()?; drop::init()?; http::init()?; kafka::init()?;