1
1
use alloc:: string:: String ;
2
2
use alloc:: vec:: Vec ;
3
- use serde:: { de, Deserialize , Deserializer , Serialize } ;
4
- use serde_json as json;
3
+ use serde:: { Deserialize , Serialize } ;
5
4
6
- use crate :: util:: { deserialize_string_to_i64, deserialize_optional_string_to_i64} ;
7
- use alloc:: format;
8
- use alloc:: string:: { ToString } ;
9
- use core:: fmt;
10
- use serde:: de:: { MapAccess , Visitor } ;
11
-
12
- use sqlite_nostd as sqlite;
13
- use sqlite_nostd:: { Connection , ResultCode } ;
14
- use uuid:: Uuid ;
15
- use crate :: error:: { SQLiteError , PSResult } ;
16
-
17
- use crate :: ext:: SafeManagedStmt ;
5
+ use crate :: util:: { deserialize_optional_string_to_i64, deserialize_string_to_i64} ;
18
6
19
7
#[ derive( Serialize , Deserialize , Debug ) ]
20
8
pub struct Checkpoint {
@@ -31,154 +19,3 @@ pub struct BucketChecksum {
31
19
pub bucket : String ,
32
20
pub checksum : i32 ,
33
21
}
34
-
35
-
36
- #[ derive( Serialize , Deserialize , Debug ) ]
37
- pub struct CheckpointComplete {
38
- #[ serde( deserialize_with = "deserialize_string_to_i64" ) ]
39
- last_op_id : i64
40
- }
41
-
42
- #[ derive( Serialize , Deserialize , Debug ) ]
43
- pub struct SyncBucketData {
44
- // TODO: complete this
45
- bucket : String
46
- }
47
-
48
- #[ derive( Serialize , Deserialize , Debug ) ]
49
- pub struct Keepalive {
50
- token_expires_in : i32
51
- }
52
-
53
- #[ derive( Serialize , Deserialize , Debug ) ]
54
- pub struct CheckpointDiff {
55
- #[ serde( deserialize_with = "deserialize_string_to_i64" ) ]
56
- last_op_id : i64 ,
57
- updated_buckets : Vec < BucketChecksum > ,
58
- removed_buckets : Vec < String > ,
59
- #[ serde( default ) ]
60
- #[ serde( deserialize_with = "deserialize_optional_string_to_i64" ) ]
61
- write_checkpoint : Option < i64 >
62
- }
63
-
64
-
65
-
66
- #[ derive( Debug ) ]
67
- pub enum StreamingSyncLine {
68
- CheckpointLine ( Checkpoint ) ,
69
- CheckpointDiffLine ( CheckpointDiff ) ,
70
- CheckpointCompleteLine ( CheckpointComplete ) ,
71
- SyncBucketDataLine ( SyncBucketData ) ,
72
- KeepaliveLine ( i32 ) ,
73
- Unknown
74
- }
75
-
76
- // Serde does not supporting ignoring unknown fields in externally-tagged enums, so we use our own
77
- // serializer.
78
-
79
- struct StreamingSyncLineVisitor ;
80
-
81
- impl < ' de > Visitor < ' de > for StreamingSyncLineVisitor {
82
- type Value = StreamingSyncLine ;
83
-
84
- fn expecting ( & self , formatter : & mut fmt:: Formatter ) -> fmt:: Result {
85
- formatter. write_str ( "sync data" )
86
- }
87
-
88
- fn visit_map < A > ( self , mut access : A ) -> Result < Self :: Value , A :: Error >
89
- where
90
- A : MapAccess < ' de > ,
91
- {
92
- let mut r = StreamingSyncLine :: Unknown ;
93
- while let Some ( ( key, value) ) = access. next_entry :: < String , json:: Value > ( ) ? {
94
- if !matches ! ( r, StreamingSyncLine :: Unknown ) {
95
- // Generally, we don't expect to receive multiple in one line.
96
- // But if it does happen, we keep the first one.
97
- continue ;
98
- }
99
- match key. as_str ( ) {
100
- "checkpoint" => {
101
- r = StreamingSyncLine :: CheckpointLine (
102
- serde_json:: from_value ( value) . map_err ( de:: Error :: custom) ?,
103
- ) ;
104
- }
105
- "checkpoint_diff" => {
106
- r = StreamingSyncLine :: CheckpointDiffLine (
107
- serde_json:: from_value ( value) . map_err ( de:: Error :: custom) ?,
108
- ) ;
109
- }
110
- "checkpoint_complete" => {
111
- r = StreamingSyncLine :: CheckpointCompleteLine (
112
- serde_json:: from_value ( value) . map_err ( de:: Error :: custom) ?,
113
- ) ;
114
- }
115
- "data" => {
116
- r = StreamingSyncLine :: SyncBucketDataLine (
117
- serde_json:: from_value ( value) . map_err ( de:: Error :: custom) ?,
118
- ) ;
119
- }
120
- "token_expires_in" => {
121
- r = StreamingSyncLine :: KeepaliveLine (
122
- serde_json:: from_value ( value) . map_err ( de:: Error :: custom) ?,
123
- ) ;
124
- }
125
- _ => { }
126
- }
127
- }
128
-
129
- Ok ( r)
130
- }
131
- }
132
-
133
- impl < ' de > Deserialize < ' de > for StreamingSyncLine {
134
- fn deserialize < D > ( deserializer : D ) -> Result < Self , D :: Error >
135
- where
136
- D : Deserializer < ' de > ,
137
- {
138
- deserializer. deserialize_map ( StreamingSyncLineVisitor )
139
- }
140
- }
141
-
142
-
143
- #[ cfg( test) ]
144
- mod tests {
145
- use core:: assert_matches:: assert_matches;
146
- use super :: * ;
147
-
148
- #[ test]
149
- fn json_parsing_test ( ) {
150
- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"token_expires_in": 42}"# ) . unwrap ( ) ;
151
- assert_matches ! ( line, StreamingSyncLine :: KeepaliveLine ( 42 ) ) ;
152
-
153
- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"checkpoint_complete": {"last_op_id": "123"}}"# ) . unwrap ( ) ;
154
- assert_matches ! ( line, StreamingSyncLine :: CheckpointCompleteLine ( CheckpointComplete { last_op_id: 123 } ) ) ;
155
-
156
- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"checkpoint_complete": {"last_op_id": "123", "other": "foo"}}"# ) . unwrap ( ) ;
157
- assert_matches ! ( line, StreamingSyncLine :: CheckpointCompleteLine ( CheckpointComplete { last_op_id: 123 } ) ) ;
158
-
159
- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"checkpoint": {"last_op_id": "123", "buckets": []}}"# ) . unwrap ( ) ;
160
- assert_matches ! ( line, StreamingSyncLine :: CheckpointLine ( Checkpoint { last_op_id: 123 , .. } ) ) ;
161
-
162
- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"checkpoint": {"last_op_id": "123", "write_checkpoint": "42", "buckets": []}}"# ) . unwrap ( ) ;
163
- assert_matches ! ( line, StreamingSyncLine :: CheckpointLine ( Checkpoint { last_op_id: 123 , write_checkpoint: Some ( 42 ) , .. } ) ) ;
164
-
165
- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"checkpoint_diff": {"last_op_id": "123", "updated_buckets": [], "removed_buckets": []}}"# ) . unwrap ( ) ;
166
- assert_matches ! ( line, StreamingSyncLine :: CheckpointDiffLine ( CheckpointDiff { last_op_id: 123 , .. } ) ) ;
167
-
168
- // Additional/unknown fields
169
- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"token_expires_in": 42, "foo": 1}"# ) . unwrap ( ) ;
170
- assert_matches ! ( line, StreamingSyncLine :: KeepaliveLine ( 42 ) ) ;
171
- let line: StreamingSyncLine = serde_json:: from_str ( r#"{}"# ) . unwrap ( ) ;
172
- assert_matches ! ( line, StreamingSyncLine :: Unknown ) ;
173
- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"other":"test"}"# ) . unwrap ( ) ;
174
- assert_matches ! ( line, StreamingSyncLine :: Unknown ) ;
175
-
176
- // Multiple - keep the first one
177
- let line: StreamingSyncLine = serde_json:: from_str ( r#"{"token_expires_in": 42, "checkpoint_complete": {"last_op_id": "123"}}"# ) . unwrap ( ) ;
178
- assert_matches ! ( line, StreamingSyncLine :: KeepaliveLine ( 42 ) ) ;
179
-
180
- // Test error handling
181
- let line: Result < StreamingSyncLine , _ > = serde_json:: from_str ( r#"{"token_expires_in": "42"}"# ) ;
182
- assert ! ( line. is_err( ) ) ;
183
- }
184
- }
0 commit comments