18
18
19
19
use crate :: event:: DEFAULT_TIMESTAMP_KEY ;
20
20
use crate :: utils:: arrow:: get_field;
21
- use anyhow:: { anyhow, Error as AnyError } ;
22
21
use serde:: { Deserialize , Serialize } ;
23
22
use std:: str;
24
23
@@ -27,6 +26,7 @@ use std::{
27
26
collections:: { HashMap , HashSet } ,
28
27
sync:: Arc ,
29
28
} ;
29
+
30
30
#[ derive( Debug , Clone , PartialEq , Serialize , Deserialize ) ]
31
31
pub struct StaticSchema {
32
32
fields : Vec < SchemaFields > ,
@@ -57,13 +57,12 @@ pub struct Fields {
57
57
}
58
58
59
59
#[ derive( Default , Debug , Clone , PartialEq , Serialize , Deserialize ) ]
60
-
61
60
pub struct Metadata { }
62
61
pub fn convert_static_schema_to_arrow_schema (
63
62
static_schema : StaticSchema ,
64
63
time_partition : & str ,
65
64
custom_partition : Option < & String > ,
66
- ) -> Result < Arc < Schema > , AnyError > {
65
+ ) -> Result < Arc < Schema > , StaticSchemaError > {
67
66
let mut parsed_schema = ParsedSchema {
68
67
fields : Vec :: new ( ) ,
69
68
metadata : HashMap :: new ( ) ,
@@ -86,7 +85,9 @@ pub fn convert_static_schema_to_arrow_schema(
86
85
87
86
for partition in & custom_partition_list {
88
87
if !custom_partition_exists. contains_key ( * partition) {
89
- return Err ( anyhow ! ( "custom partition field {partition} does not exist in the schema for the static schema logstream" ) ) ;
88
+ return Err ( StaticSchemaError :: MissingCustomPartition (
89
+ partition. to_string ( ) ,
90
+ ) ) ;
90
91
}
91
92
}
92
93
}
@@ -134,29 +135,26 @@ pub fn convert_static_schema_to_arrow_schema(
134
135
parsed_schema. fields . push ( parsed_field) ;
135
136
}
136
137
if !time_partition. is_empty ( ) && !time_partition_exists {
137
- return Err ( anyhow ! {
138
- format!(
139
- "time partition field {time_partition} does not exist in the schema for the static schema logstream"
140
- ) ,
141
- } ) ;
138
+ return Err ( StaticSchemaError :: MissingTimePartition (
139
+ time_partition. to_string ( ) ,
140
+ ) ) ;
142
141
}
143
142
add_parseable_fields_to_static_schema ( parsed_schema)
144
143
}
145
144
146
145
fn add_parseable_fields_to_static_schema (
147
146
parsed_schema : ParsedSchema ,
148
- ) -> Result < Arc < Schema > , AnyError > {
147
+ ) -> Result < Arc < Schema > , StaticSchemaError > {
149
148
let mut schema: Vec < Arc < Field > > = Vec :: new ( ) ;
150
149
for field in parsed_schema. fields . iter ( ) {
151
150
let field = Field :: new ( field. name . clone ( ) , field. data_type . clone ( ) , field. nullable ) ;
152
151
schema. push ( Arc :: new ( field) ) ;
153
152
}
154
153
155
154
if get_field ( & schema, DEFAULT_TIMESTAMP_KEY ) . is_some ( ) {
156
- return Err ( anyhow ! (
157
- "field {} is a reserved field" ,
158
- DEFAULT_TIMESTAMP_KEY
159
- ) ) ;
155
+ return Err ( StaticSchemaError :: ReservedKey (
156
+ DEFAULT_TIMESTAMP_KEY
157
+ ) ) ;
160
158
} ;
161
159
162
160
// add the p_timestamp field to the event schema to the 0th index
@@ -187,22 +185,43 @@ fn default_dict_is_ordered() -> bool {
187
185
fn validate_field_names (
188
186
field_name : & str ,
189
187
existing_fields : & mut HashSet < String > ,
190
- ) -> Result < ( ) , AnyError > {
188
+ ) -> Result < ( ) , StaticSchemaError > {
191
189
if field_name. is_empty ( ) {
192
- return Err ( anyhow ! ( "field names should not be empty" ) ) ;
190
+ return Err ( StaticSchemaError :: EmptyFieldName ) ;
193
191
}
194
192
195
193
if !existing_fields. insert ( field_name. to_string ( ) ) {
196
- return Err ( anyhow ! ( "duplicate field name: {}" , field_name) ) ;
194
+ return Err ( StaticSchemaError :: DuplicateField ( field_name. to_string ( ) ) ) ;
197
195
}
198
196
199
197
Ok ( ( ) )
200
198
}
201
199
200
+ #[ derive( Debug , thiserror:: Error ) ]
201
+ pub enum StaticSchemaError {
202
+ #[ error(
203
+ "custom partition field {0} does not exist in the schema for the static schema logstream"
204
+ ) ]
205
+ MissingCustomPartition ( String ) ,
206
+
207
+ #[ error(
208
+ "time partition field {0} does not exist in the schema for the static schema logstream"
209
+ ) ]
210
+ MissingTimePartition ( String ) ,
211
+
212
+ #[ error( "field {DEFAULT_TIMESTAMP_KEY:?} is a reserved field" ) ]
213
+ ReservedKey ( & ' static str ) ,
214
+
215
+ #[ error( "field name cannot be empty" ) ]
216
+ EmptyFieldName ,
217
+
218
+ #[ error( "duplicate field name: {0}" ) ]
219
+ DuplicateField ( String ) ,
220
+ }
221
+
202
222
#[ cfg( test) ]
203
223
mod tests {
204
224
use super :: * ;
205
- use std:: collections:: HashSet ;
206
225
#[ test]
207
226
fn empty_field_names ( ) {
208
227
let mut existing_field_names: HashSet < String > = HashSet :: new ( ) ;
0 commit comments