diff --git a/Cargo.lock b/Cargo.lock index 4c65b7d..8bc4c50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2456,7 +2456,16 @@ checksum = "1eb4e60eb2f8c6e02b1f1e7634ef91738b1104b5bc2fa30458d10cd00917dbbf" dependencies = [ "bumpalo", "rmp", - "shopify_function_wasm_api_core", + "shopify_function_wasm_api_core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "shopify_function_provider" +version = "2.0.0" +dependencies = [ + "bumpalo", + "rmp", + "shopify_function_wasm_api_core 0.1.0", ] [[package]] @@ -2467,11 +2476,30 @@ checksum = "a57a2e64ef7d28cbe26bf591fd084093327d9d359e38355010720d818cd92ba9" dependencies = [ "rmp-serde", "serde_json", - "shopify_function_provider", - "shopify_function_wasm_api_core", + "shopify_function_provider 1.0.1", + "shopify_function_wasm_api_core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "thiserror 2.0.14", +] + +[[package]] +name = "shopify_function_wasm_api" +version = "0.2.0" +dependencies = [ + "rmp-serde", + "seq-macro", + "serde_json", + "shopify_function_provider 2.0.0", + "shopify_function_wasm_api_core 0.1.0", "thiserror 2.0.14", ] +[[package]] +name = "shopify_function_wasm_api_core" +version = "0.1.0" +dependencies = [ + "strum", +] + [[package]] name = "shopify_function_wasm_api_core" version = "0.1.0" @@ -3129,7 +3157,15 @@ name = "wasm_api_v1" version = "0.1.0" dependencies = [ "anyhow", - "shopify_function_wasm_api", + "shopify_function_wasm_api 0.1.0", +] + +[[package]] +name = "wasm_api_v2" +version = "0.1.0" +dependencies = [ + "anyhow", + "shopify_function_wasm_api 0.2.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 1cd7213..46daff6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "tests/fixtures/messagepack-valid", "tests/fixtures/messagepack-invalid", "tests/fixtures/wasm_api_v1", + "tests/fixtures/wasm_api_v2", ] [package] diff --git a/src/engine.rs b/src/engine.rs index 064fe9d..a206c5b 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,13 +1,12 @@ use anyhow::{anyhow, Result}; -use rust_embed::RustEmbed; +use std::path::PathBuf; use std::string::String; -use std::{collections::HashSet, path::PathBuf}; use wasmtime::{AsContextMut, Config, Engine, Linker, Module, ResourceLimiter, Store}; -use wasmtime_wasi::pipe::{MemoryInputPipe, MemoryOutputPipe}; use wasmtime_wasi::preview1::WasiP1Ctx; -use wasmtime_wasi::{I32Exit, WasiCtxBuilder}; +use wasmtime_wasi::I32Exit; use crate::function_run_result::FunctionRunResult; +use crate::io::{IOHandler, OutputAndLogs}; use crate::{BytesContainer, BytesContainerType}; #[derive(Clone)] @@ -16,44 +15,12 @@ pub struct ProfileOpts { pub out: PathBuf, } -#[derive(RustEmbed)] -#[folder = "providers/"] -struct StandardProviders; - pub fn uses_msgpack_provider(module: &Module) -> bool { module.imports().map(|i| i.module()).any(|module| { module.starts_with("shopify_function_v") || module == "shopify_functions_javy_v2" }) } -fn import_modules( - module: &Module, - engine: &Engine, - linker: &mut Linker, - mut store: &mut Store, -) { - let imported_modules: HashSet = - module.imports().map(|i| i.module().to_string()).collect(); - - imported_modules.iter().for_each(|module_name| { - let provider_path = format!("{module_name}.wasm"); - let imported_module_bytes = StandardProviders::get(&provider_path); - - if let Some(bytes) = imported_module_bytes { - let imported_module = Module::from_binary(engine, &bytes.data) - .unwrap_or_else(|_| panic!("Failed to load module {module_name}")); - - let imported_module_instance = linker - .instantiate(&mut store, &imported_module) - .expect("Failed to instantiate imported instance"); - - linker - .instance(&mut store, module_name, imported_module_instance) - .expect("Failed to import module"); - } - }); -} - pub struct FunctionRunParams<'a> { pub function_path: PathBuf, pub input: BytesContainer, @@ -68,12 +35,12 @@ const STARTING_FUEL: u64 = u64::MAX; const MAXIMUM_MEMORIES: usize = 2; // 1 for the module, 1 for Javy's provider struct FunctionContext { - wasi: WasiP1Ctx, + wasi: Option, limiter: MemoryLimiter, } impl FunctionContext { - fn new(wasi: WasiP1Ctx) -> Self { + fn new(wasi: Option) -> Self { Self { wasi, limiter: Default::default(), @@ -128,33 +95,29 @@ pub fn run(params: FunctionRunParams) -> Result { module, } = params; - let input_stream = MemoryInputPipe::new(input.raw.clone()); - let output_stream = MemoryOutputPipe::new(usize::MAX); - let error_stream = MemoryOutputPipe::new(usize::MAX); + let mut io_handler = IOHandler::new(module, input.clone()); let mut error_logs: String = String::new(); let mut linker = Linker::new(&engine); - wasmtime_wasi::preview1::add_to_linker_sync(&mut linker, |ctx: &mut FunctionContext| { - &mut ctx.wasi - })?; - deterministic_wasi_ctx::replace_scheduling_functions(&mut linker)?; - let mut wasi_builder = WasiCtxBuilder::new(); - wasi_builder.stdin(input_stream); - wasi_builder.stdout(output_stream.clone()); - wasi_builder.stderr(error_stream.clone()); - deterministic_wasi_ctx::add_determinism_to_wasi_ctx_builder(&mut wasi_builder); - let wasi = wasi_builder.build_p1(); + let wasi = io_handler.wasi(); + if wasi.is_some() { + wasmtime_wasi::preview1::add_to_linker_sync(&mut linker, |ctx: &mut FunctionContext| { + ctx.wasi.as_mut().expect("Should have WASI context") + })?; + deterministic_wasi_ctx::replace_scheduling_functions(&mut linker)?; + } + let function_context = FunctionContext::new(wasi); let mut store = Store::new(&engine, function_context); store.limiter(|s| &mut s.limiter); + + io_handler.initialize(&engine, &mut linker, &mut store)?; + store.set_fuel(STARTING_FUEL)?; store.set_epoch_deadline(1); - import_modules(&module, &engine, &mut linker, &mut store); - - linker.module(&mut store, "Function", &module)?; - let instance = linker.instantiate(&mut store, &module)?; + let instance = linker.instantiate(&mut store, io_handler.module())?; let func = instance.get_typed_func::<(), ()>(store.as_context_mut(), export)?; @@ -163,7 +126,6 @@ pub fn run(params: FunctionRunParams) -> Result { .frequency(profile_opts.interval) .weight_unit(wasmprof::WeightUnit::Fuel) .profile(|store| func.call(store.as_context_mut(), ())); - ( result, Some(profile_data.into_collapsed_stacks().to_string()), @@ -191,18 +153,14 @@ pub fn run(params: FunctionRunParams) -> Result { } } - drop(store); - - let mut logs = error_stream - .try_into_inner() - .expect("Log stream reference still exists"); + let OutputAndLogs { + output: raw_output, + mut logs, + } = io_handler.finalize(store)?; logs.extend_from_slice(error_logs.as_bytes()); let output_codec = input.codec; - let raw_output = output_stream - .try_into_inner() - .expect("Output stream reference still exists"); let output = BytesContainer::new( BytesContainerType::Output, output_codec, diff --git a/src/io.rs b/src/io.rs new file mode 100644 index 0000000..35035f6 --- /dev/null +++ b/src/io.rs @@ -0,0 +1,215 @@ +use std::collections::HashSet; + +use anyhow::{anyhow, Result}; +use rust_embed::RustEmbed; +use wasmtime::{AsContext, AsContextMut, Engine, Instance, Linker, Module, Store}; +use wasmtime_wasi::{ + pipe::{MemoryInputPipe, MemoryOutputPipe}, + preview1::WasiP1Ctx, + WasiCtxBuilder, +}; + +use crate::BytesContainer; + +#[derive(RustEmbed)] +#[folder = "providers/"] +struct StandardProviders; + +pub(crate) struct OutputAndLogs { + pub output: Vec, + pub logs: Vec, +} + +struct WasiIO { + output: MemoryOutputPipe, + logs: MemoryOutputPipe, +} + +enum IOStrategy { + Wasi(WasiIO), + Memory(Option), +} + +pub(crate) struct IOHandler { + strategy: IOStrategy, + module: Module, + input: BytesContainer, +} + +impl IOHandler { + pub(crate) fn new(module: Module, input: BytesContainer) -> Self { + Self { + strategy: if uses_mem_io(&module) { + IOStrategy::Memory(None) + } else { + IOStrategy::Wasi(WasiIO { + output: MemoryOutputPipe::new(usize::MAX), + logs: MemoryOutputPipe::new(usize::MAX), + }) + }, + module, + input, + } + } + + pub(crate) fn module(&self) -> &Module { + &self.module + } + + pub(crate) fn wasi(&self) -> Option { + match &self.strategy { + IOStrategy::Wasi(WasiIO { output, logs }) => { + let input_stream = MemoryInputPipe::new(self.input.raw.clone()); + let mut wasi_builder = WasiCtxBuilder::new(); + wasi_builder.stdin(input_stream); + wasi_builder.stdout(output.clone()); + wasi_builder.stderr(logs.clone()); + deterministic_wasi_ctx::add_determinism_to_wasi_ctx_builder(&mut wasi_builder); + Some(wasi_builder.build_p1()) + } + IOStrategy::Memory(_instance) => None, + } + } + + pub(crate) fn initialize( + &mut self, + engine: &Engine, + linker: &mut Linker, + store: &mut Store, + ) -> Result<()> { + store.set_epoch_deadline(1); // Need to make sure we don't timeout during initialization. + store.set_fuel(u64::MAX)?; // Make sure we have fuel for initialization. + let mem_io_instance = instantiate_imports(&self.module, engine, linker, store); + if let IOStrategy::Memory(ref mut instance) = self.strategy { + *instance = mem_io_instance; + } + + if let Some(instance) = mem_io_instance { + let input_offset = instance + .get_typed_func::(store.as_context_mut(), "initialize")? + .call(store.as_context_mut(), self.input.raw.len() as _)?; + instance + .get_memory(store.as_context_mut(), "memory") + .ok_or_else(|| anyhow!("Missing memory export named memory"))? + .write(store.as_context_mut(), input_offset as _, &self.input.raw)?; + } + Ok(()) + } + + pub(crate) fn finalize(self, mut store: Store) -> Result { + match self.strategy { + IOStrategy::Memory(instance) => { + let instance = instance.expect("Should have been defined in initialize"); + store.set_epoch_deadline(1); // Make sure we don't timeout while finalizing. + store.set_fuel(u64::MAX)?; // Make sure we don't run out of fuel finalizing. + let results_offset = instance + .get_typed_func::<(), i32>(store.as_context_mut(), "finalize")? + .call(store.as_context_mut(), ())? + as usize; + + let memory = instance + .get_memory(store.as_context_mut(), "memory") + .ok_or_else(|| anyhow!("Missing memory export named memory"))?; + + let mut buf = [0; 24]; + memory.read(store.as_context(), results_offset, &mut buf)?; + + let output_offset = u32::from_le_bytes(buf[0..4].try_into().unwrap()) as usize; + let output_len = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as usize; + let log_offset1 = u32::from_le_bytes(buf[8..12].try_into().unwrap()) as usize; + let log_len1 = u32::from_le_bytes(buf[12..16].try_into().unwrap()) as usize; + let log_offset2 = u32::from_le_bytes(buf[16..20].try_into().unwrap()) as usize; + let log_len2 = u32::from_le_bytes(buf[20..24].try_into().unwrap()) as usize; + + let mut output = vec![0; output_len]; + memory.read(store.as_context(), output_offset, &mut output)?; + + let mut logs = vec![0; log_len1]; + memory.read(store.as_context(), log_offset1, &mut logs)?; + + let mut logs2 = vec![0; log_len2]; + memory.read(store.as_context(), log_offset2, &mut logs2)?; + + logs.append(&mut logs2); + + Ok(OutputAndLogs { output, logs }) + } + IOStrategy::Wasi(WasiIO { output, logs }) => { + // Need to drop store to have only one reference to output and error streams. + drop(store); + + let output = output + .try_into_inner() + .expect("Should have only one reference to output stream at this point") + .to_vec(); + let logs = logs + .try_into_inner() + .expect("Should have only one reference to error stream at this point") + .to_vec(); + Ok(OutputAndLogs { output, logs }) + } + } + } +} + +// Whether a module imports a provider that uses in-memory buffers for I/O. +fn uses_mem_io(module: &Module) -> bool { + module.imports().map(|i| i.module()).any(is_mem_io_provider) +} + +// Whether a provider exports functions for working with in-memory buffers for I/O. +fn is_mem_io_provider(module: &str) -> bool { + let javy_plugin_version = module + .strip_prefix("shopify_functions_javy_v") + .map(|s| s.parse::()) + .and_then(|result| result.ok()); + if javy_plugin_version.is_some_and(|version| version >= 3) { + return true; + } + + let functions_provider_version = module + .strip_prefix("shopify_function_v") + .map(|s| s.parse::()) + .and_then(|result| result.ok()); + if functions_provider_version.is_some_and(|version| version >= 2) { + return true; + } + + false +} + +fn instantiate_imports( + module: &Module, + engine: &Engine, + linker: &mut Linker, + mut store: &mut Store, +) -> Option { + let imported_modules: HashSet = + module.imports().map(|i| i.module().to_string()).collect(); + + let mut mem_io_instance = None; + + imported_modules.iter().for_each(|module_name| { + let provider_path = format!("{module_name}.wasm"); + let imported_module_bytes = StandardProviders::get(&provider_path); + + if let Some(bytes) = imported_module_bytes { + let imported_module = Module::from_binary(engine, &bytes.data) + .unwrap_or_else(|_| panic!("Failed to load module {module_name}")); + + let imported_module_instance = linker + .instantiate(&mut store, &imported_module) + .expect("Failed to instantiate imported instance"); + + if is_mem_io_provider(module_name) { + mem_io_instance = Some(imported_module_instance); + } + + linker + .instance(&mut store, module_name, imported_module_instance) + .expect("Failed to import module"); + } + }); + + mem_io_instance +} diff --git a/src/lib.rs b/src/lib.rs index ecde0ba..d28e3cd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod bluejay_schema_analyzer; pub mod container; pub mod engine; pub mod function_run_result; +mod io; pub mod scale_limits_analyzer; use clap::ValueEnum; diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index f9dd291..7e04a1c 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -9,7 +9,22 @@ pub fn process_with_v1_trampoline, Q: AsRef>( wasm_path: P, trampolined_path: Q, ) -> Result<()> { - let trampoline_path = TRAMPOLINE_1_0_PATH + process_with_trampoline(&TRAMPOLINE_1_0_PATH, wasm_path, trampolined_path) +} + +pub fn process_with_v2_trampoline, Q: AsRef>( + wasm_path: P, + trampolined_path: Q, +) -> Result<()> { + process_with_trampoline(&TRAMPOLINE_2_0_PATH, wasm_path, trampolined_path) +} + +fn process_with_trampoline, Q: AsRef>( + trampoline_path: &LazyLock>, + wasm_path: P, + trampolined_path: Q, +) -> Result<()> { + let trampoline_path = trampoline_path .as_ref() .map_err(|e| anyhow!("Failed to download trampoline: {e}"))?; let status = Command::new(trampoline_path) @@ -24,15 +39,19 @@ pub fn process_with_v1_trampoline, Q: AsRef>( Ok(()) } -static TRAMPOLINE_1_0_PATH: LazyLock> = LazyLock::new(|| { +static TRAMPOLINE_1_0_PATH: LazyLock> = LazyLock::new(|| trampoline_path("1.0.2")); + +static TRAMPOLINE_2_0_PATH: LazyLock> = LazyLock::new(|| trampoline_path("2.0.0")); + +fn trampoline_path(version: &str) -> Result { let binaries_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp"); - let path = binaries_path.join("trampoline-1.0.2"); + let path = binaries_path.join(format!("trampoline-{version}")); if !path.exists() { std::fs::create_dir_all(binaries_path)?; - download_trampoline(&path, "1.0.2")?; + download_trampoline(&path, version)?; } Ok(path) -}); +} fn download_trampoline(destination: &Path, version: &str) -> Result<()> { let target_os = if cfg!(target_os = "macos") { diff --git a/tests/fixtures/README.md b/tests/fixtures/README.md index 6b17774..25b685f 100644 --- a/tests/fixtures/README.md +++ b/tests/fixtures/README.md @@ -13,6 +13,8 @@ Example Functions used as test fixtures. ``` cargo build --target wasm32-wasip1 --profile=wasm -p exit_code -p exports -p log_truncation_function -p noop -p wasm_api_v1 && find target/wasm32-wasip1/wasm/{exit_code.wasm,exports.wasm,log_truncation_function.wasm,noop.wasm,wasm_api_v1.wasm} | xargs -I {} sh -c 'name=$(basename {}); wasm-opt {} -Oz --enable-bulk-memory --strip-debug -o "tests/fixtures/build/$name"' +cargo build --target wasm32-unknown-unknown --profile=wasm -p wasm_api_v2 && + find target/wasm32-unknown-unknown/wasm/wasm_api_v2.wasm | xargs -I {} sh -c 'name=$(basename {}); wasm-opt {} -Oz --enable-bulk-memory --strip-debug -o "tests/fixtures/build/$name"' ``` **JS examples:** diff --git a/tests/fixtures/build/wasm_api_v2.wasm b/tests/fixtures/build/wasm_api_v2.wasm new file mode 100644 index 0000000..5be3260 Binary files /dev/null and b/tests/fixtures/build/wasm_api_v2.wasm differ diff --git a/tests/fixtures/wasm_api_v2/Cargo.toml b/tests/fixtures/wasm_api_v2/Cargo.toml new file mode 100644 index 0000000..7f7758a --- /dev/null +++ b/tests/fixtures/wasm_api_v2/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "wasm_api_v2" +version = "0.1.0" +edition = "2021" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +anyhow = "1.0" +# shopify_function_wasm_api = "0.1.0" +shopify_function_wasm_api = { path = "/Users/jeffcharles/src/github.com/Shopify/shopify-function-wasm-api/api" } diff --git a/tests/fixtures/wasm_api_v2/src/lib.rs b/tests/fixtures/wasm_api_v2/src/lib.rs new file mode 100644 index 0000000..67b2612 --- /dev/null +++ b/tests/fixtures/wasm_api_v2/src/lib.rs @@ -0,0 +1,26 @@ +use anyhow::{anyhow, Result}; + +#[export_name = "_start"] +fn start() { + main().unwrap() +} + +fn main() -> Result<()> { + let mut ctx = shopify_function_wasm_api::Context::new(); + let input = ctx.input_get()?; + let str = input + .get_obj_prop("hello") + .as_string() + .ok_or_else(|| anyhow!("Should be string"))?; + ctx.write_object( + |ctx| { + ctx.write_utf8_str("bye")?; + ctx.write_utf8_str(&str)?; + Ok(()) + }, + 1, + )?; + ctx.log(&"a".repeat(1025)); + ctx.log(&"b".repeat(10)); + Ok(()) +} diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index a12e95b..e9dba6f 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -403,4 +403,32 @@ mod tests { Ok(()) } + + #[test] + fn run_wasm_api_v2_function() -> Result<()> { + let trampolined_module = assert_fs::NamedTempFile::new("wasm_api_v2.trampolined.wasm")?; + test_utils::process_with_v2_trampoline( + "tests/fixtures/build/wasm_api_v2.wasm", + &trampolined_module, + )?; + + let mut cmd = Command::cargo_bin("function-runner")?; + let input_file = temp_input(json!({"hello": "world"}))?; + + cmd.arg("--function") + .arg(trampolined_module.as_os_str()) + .arg("--json") + .arg("--input") + .arg(input_file.as_os_str()) + .stdout(Stdio::piped()) + .spawn()? + .wait_with_output()?; + + cmd.assert().success(); + cmd.assert().stdout(contains("world")); + cmd.assert() + .stdout(contains(format!("{}{}", "a".repeat(1015), "b".repeat(10)))); + + Ok(()) + } }