18
18
19
19
use crate :: event:: DEFAULT_TIMESTAMP_KEY ;
20
20
use crate :: utils:: arrow:: get_field;
21
- use anyhow:: { anyhow, Error as AnyError } ;
22
21
use serde:: { Deserialize , Serialize } ;
23
22
use std:: str;
24
23
@@ -63,7 +62,7 @@ pub fn convert_static_schema_to_arrow_schema(
63
62
static_schema : StaticSchema ,
64
63
time_partition : & str ,
65
64
custom_partition : Option < & String > ,
66
- ) -> Result < Arc < Schema > , AnyError > {
65
+ ) -> Result < Arc < Schema > , StaticSchemaError > {
67
66
let mut parsed_schema = ParsedSchema {
68
67
fields : Vec :: new ( ) ,
69
68
metadata : HashMap :: new ( ) ,
@@ -86,7 +85,9 @@ pub fn convert_static_schema_to_arrow_schema(
86
85
87
86
for partition in & custom_partition_list {
88
87
if !custom_partition_exists. contains_key ( * partition) {
89
- return Err ( anyhow ! ( "custom partition field {partition} does not exist in the schema for the static schema logstream" ) ) ;
88
+ return Err ( StaticSchemaError :: MissingCustomPartition (
89
+ partition. to_string ( ) ,
90
+ ) ) ;
90
91
}
91
92
}
92
93
}
@@ -134,29 +135,24 @@ pub fn convert_static_schema_to_arrow_schema(
134
135
parsed_schema. fields . push ( parsed_field) ;
135
136
}
136
137
if !time_partition. is_empty ( ) && !time_partition_exists {
137
- return Err ( anyhow ! {
138
- format!(
139
- "time partition field {time_partition} does not exist in the schema for the static schema logstream"
140
- ) ,
141
- } ) ;
138
+ return Err ( StaticSchemaError :: MissingTimePartition (
139
+ time_partition. to_string ( ) ,
140
+ ) ) ;
142
141
}
143
142
add_parseable_fields_to_static_schema ( parsed_schema)
144
143
}
145
144
146
145
fn add_parseable_fields_to_static_schema (
147
146
parsed_schema : ParsedSchema ,
148
- ) -> Result < Arc < Schema > , AnyError > {
147
+ ) -> Result < Arc < Schema > , StaticSchemaError > {
149
148
let mut schema: Vec < Arc < Field > > = Vec :: new ( ) ;
150
149
for field in parsed_schema. fields . iter ( ) {
151
150
let field = Field :: new ( field. name . clone ( ) , field. data_type . clone ( ) , field. nullable ) ;
152
151
schema. push ( Arc :: new ( field) ) ;
153
152
}
154
153
155
154
if get_field ( & schema, DEFAULT_TIMESTAMP_KEY ) . is_some ( ) {
156
- return Err ( anyhow ! (
157
- "field {} is a reserved field" ,
158
- DEFAULT_TIMESTAMP_KEY
159
- ) ) ;
155
+ return Err ( StaticSchemaError :: DefaultTime ) ;
160
156
} ;
161
157
162
158
// add the p_timestamp field to the event schema to the 0th index
@@ -187,18 +183,40 @@ fn default_dict_is_ordered() -> bool {
187
183
fn validate_field_names (
188
184
field_name : & str ,
189
185
existing_fields : & mut HashSet < String > ,
190
- ) -> Result < ( ) , AnyError > {
186
+ ) -> Result < ( ) , StaticSchemaError > {
191
187
if field_name. is_empty ( ) {
192
- return Err ( anyhow ! ( "field names should not be empty" ) ) ;
188
+ return Err ( StaticSchemaError :: EmptyFieldName ) ;
193
189
}
194
190
195
191
if !existing_fields. insert ( field_name. to_string ( ) ) {
196
- return Err ( anyhow ! ( "duplicate field name: {}" , field_name) ) ;
192
+ return Err ( StaticSchemaError :: DuplicateField ( field_name. to_string ( ) ) ) ;
197
193
}
198
194
199
195
Ok ( ( ) )
200
196
}
201
197
198
+ #[ derive( Debug , thiserror:: Error ) ]
199
+ pub enum StaticSchemaError {
200
+ #[ error(
201
+ "custom partition field {0} does not exist in the schema for the static schema logstream"
202
+ ) ]
203
+ MissingCustomPartition ( String ) ,
204
+
205
+ #[ error(
206
+ "time partition field {0} does not exist in the schema for the static schema logstream"
207
+ ) ]
208
+ MissingTimePartition ( String ) ,
209
+
210
+ #[ error( "field {DEFAULT_TIMESTAMP_KEY:?} is a reserved field" ) ]
211
+ DefaultTime ,
212
+
213
+ #[ error( "field name cannot be empty" ) ]
214
+ EmptyFieldName ,
215
+
216
+ #[ error( "duplicate field name: {0}" ) ]
217
+ DuplicateField ( String ) ,
218
+ }
219
+
202
220
#[ cfg( test) ]
203
221
mod tests {
204
222
use super :: * ;
0 commit comments