1
1
use anyhow:: { anyhow, Result } ;
2
- use rust_embed :: RustEmbed ;
2
+ use std :: path :: PathBuf ;
3
3
use std:: string:: String ;
4
- use std:: { collections:: HashSet , path:: PathBuf } ;
5
4
use wasmtime:: { AsContextMut , Config , Engine , Linker , Module , ResourceLimiter , Store } ;
6
- use wasmtime_wasi:: pipe:: { MemoryInputPipe , MemoryOutputPipe } ;
7
5
use wasmtime_wasi:: preview1:: WasiP1Ctx ;
8
- use wasmtime_wasi:: { I32Exit , WasiCtxBuilder } ;
6
+ use wasmtime_wasi:: I32Exit ;
9
7
10
8
use crate :: function_run_result:: FunctionRunResult ;
9
+ use crate :: io:: { IOHandler , OutputAndLogs } ;
11
10
use crate :: { BytesContainer , BytesContainerType } ;
12
11
13
12
#[ derive( Clone ) ]
@@ -16,44 +15,12 @@ pub struct ProfileOpts {
16
15
pub out : PathBuf ,
17
16
}
18
17
19
- #[ derive( RustEmbed ) ]
20
- #[ folder = "providers/" ]
21
- struct StandardProviders ;
22
-
23
18
pub fn uses_msgpack_provider ( module : & Module ) -> bool {
24
19
module. imports ( ) . map ( |i| i. module ( ) ) . any ( |module| {
25
20
module. starts_with ( "shopify_function_v" ) || module == "shopify_functions_javy_v2"
26
21
} )
27
22
}
28
23
29
- fn import_modules < T > (
30
- module : & Module ,
31
- engine : & Engine ,
32
- linker : & mut Linker < T > ,
33
- mut store : & mut Store < T > ,
34
- ) {
35
- let imported_modules: HashSet < String > =
36
- module. imports ( ) . map ( |i| i. module ( ) . to_string ( ) ) . collect ( ) ;
37
-
38
- imported_modules. iter ( ) . for_each ( |module_name| {
39
- let provider_path = format ! ( "{module_name}.wasm" ) ;
40
- let imported_module_bytes = StandardProviders :: get ( & provider_path) ;
41
-
42
- if let Some ( bytes) = imported_module_bytes {
43
- let imported_module = Module :: from_binary ( engine, & bytes. data )
44
- . unwrap_or_else ( |_| panic ! ( "Failed to load module {module_name}" ) ) ;
45
-
46
- let imported_module_instance = linker
47
- . instantiate ( & mut store, & imported_module)
48
- . expect ( "Failed to instantiate imported instance" ) ;
49
-
50
- linker
51
- . instance ( & mut store, module_name, imported_module_instance)
52
- . expect ( "Failed to import module" ) ;
53
- }
54
- } ) ;
55
- }
56
-
57
24
pub struct FunctionRunParams < ' a > {
58
25
pub function_path : PathBuf ,
59
26
pub input : BytesContainer ,
@@ -68,12 +35,12 @@ const STARTING_FUEL: u64 = u64::MAX;
68
35
const MAXIMUM_MEMORIES : usize = 2 ; // 1 for the module, 1 for Javy's provider
69
36
70
37
struct FunctionContext {
71
- wasi : WasiP1Ctx ,
38
+ wasi : Option < WasiP1Ctx > ,
72
39
limiter : MemoryLimiter ,
73
40
}
74
41
75
42
impl FunctionContext {
76
- fn new ( wasi : WasiP1Ctx ) -> Self {
43
+ fn new ( wasi : Option < WasiP1Ctx > ) -> Self {
77
44
Self {
78
45
wasi,
79
46
limiter : Default :: default ( ) ,
@@ -128,33 +95,29 @@ pub fn run(params: FunctionRunParams) -> Result<FunctionRunResult> {
128
95
module,
129
96
} = params;
130
97
131
- let input_stream = MemoryInputPipe :: new ( input. raw . clone ( ) ) ;
132
- let output_stream = MemoryOutputPipe :: new ( usize:: MAX ) ;
133
- let error_stream = MemoryOutputPipe :: new ( usize:: MAX ) ;
98
+ let mut io_handler = IOHandler :: new ( module, input. clone ( ) ) ;
134
99
135
100
let mut error_logs: String = String :: new ( ) ;
136
101
137
102
let mut linker = Linker :: new ( & engine) ;
138
- wasmtime_wasi:: preview1:: add_to_linker_sync ( & mut linker, |ctx : & mut FunctionContext | {
139
- & mut ctx. wasi
140
- } ) ?;
141
- deterministic_wasi_ctx:: replace_scheduling_functions ( & mut linker) ?;
142
- let mut wasi_builder = WasiCtxBuilder :: new ( ) ;
143
- wasi_builder. stdin ( input_stream) ;
144
- wasi_builder. stdout ( output_stream. clone ( ) ) ;
145
- wasi_builder. stderr ( error_stream. clone ( ) ) ;
146
- deterministic_wasi_ctx:: add_determinism_to_wasi_ctx_builder ( & mut wasi_builder) ;
147
- let wasi = wasi_builder. build_p1 ( ) ;
103
+ let wasi = io_handler. wasi ( ) ;
104
+ if wasi. is_some ( ) {
105
+ wasmtime_wasi:: preview1:: add_to_linker_sync ( & mut linker, |ctx : & mut FunctionContext | {
106
+ ctx. wasi . as_mut ( ) . expect ( "Should have WASI context" )
107
+ } ) ?;
108
+ deterministic_wasi_ctx:: replace_scheduling_functions ( & mut linker) ?;
109
+ }
110
+
148
111
let function_context = FunctionContext :: new ( wasi) ;
149
112
let mut store = Store :: new ( & engine, function_context) ;
150
113
store. limiter ( |s| & mut s. limiter ) ;
114
+
115
+ io_handler. initialize ( & engine, & mut linker, & mut store) ?;
116
+
151
117
store. set_fuel ( STARTING_FUEL ) ?;
152
118
store. set_epoch_deadline ( 1 ) ;
153
119
154
- import_modules ( & module, & engine, & mut linker, & mut store) ;
155
-
156
- linker. module ( & mut store, "Function" , & module) ?;
157
- let instance = linker. instantiate ( & mut store, & module) ?;
120
+ let instance = linker. instantiate ( & mut store, io_handler. module ( ) ) ?;
158
121
159
122
let func = instance. get_typed_func :: < ( ) , ( ) > ( store. as_context_mut ( ) , export) ?;
160
123
@@ -163,7 +126,6 @@ pub fn run(params: FunctionRunParams) -> Result<FunctionRunResult> {
163
126
. frequency ( profile_opts. interval )
164
127
. weight_unit ( wasmprof:: WeightUnit :: Fuel )
165
128
. profile ( |store| func. call ( store. as_context_mut ( ) , ( ) ) ) ;
166
-
167
129
(
168
130
result,
169
131
Some ( profile_data. into_collapsed_stacks ( ) . to_string ( ) ) ,
@@ -191,18 +153,14 @@ pub fn run(params: FunctionRunParams) -> Result<FunctionRunResult> {
191
153
}
192
154
}
193
155
194
- drop ( store) ;
195
-
196
- let mut logs = error_stream
197
- . try_into_inner ( )
198
- . expect ( "Log stream reference still exists" ) ;
156
+ let OutputAndLogs {
157
+ output : raw_output,
158
+ mut logs,
159
+ } = io_handler. finalize ( store) ?;
199
160
200
161
logs. extend_from_slice ( error_logs. as_bytes ( ) ) ;
201
162
202
163
let output_codec = input. codec ;
203
- let raw_output = output_stream
204
- . try_into_inner ( )
205
- . expect ( "Output stream reference still exists" ) ;
206
164
let output = BytesContainer :: new (
207
165
BytesContainerType :: Output ,
208
166
output_codec,
0 commit comments