@@ -54,86 +54,108 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
5454
5555 QueueHandler < PullResponse > databaseUpdateQueue = new ( 1 , async pullResponse =>
5656 {
57- if ( pullResponse . Items . Any ( ) )
57+ EntityMetadata ? currentMetadata = null ;
58+
59+ try
5860 {
59- DateTimeOffset lastSynchronization = await DeltaTokenStore . GetDeltaTokenAsync ( pullResponse . QueryId , cancellationToken ) . ConfigureAwait ( false ) ;
60- foreach ( object item in pullResponse . Items )
61+ if ( pullResponse . Items . Any ( ) )
6162 {
62- EntityMetadata metadata = EntityResolver . GetEntityMetadata ( item , pullResponse . EntityType ) ;
63- object ? originalEntity = await context . FindAsync ( pullResponse . EntityType , [ metadata . Id ] , cancellationToken ) . ConfigureAwait ( false ) ;
64-
65- if ( originalEntity is null && ! metadata . Deleted )
66- {
67- _ = context . Add ( item ) ;
68- result . IncrementAdditions ( ) ;
69- }
70- else if ( originalEntity is not null && metadata . Deleted )
63+ DateTimeOffset lastSynchronization = await DeltaTokenStore . GetDeltaTokenAsync ( pullResponse . QueryId , cancellationToken ) . ConfigureAwait ( false ) ;
64+ foreach ( object item in pullResponse . Items )
7165 {
72- _ = context . Remove ( originalEntity ) ;
73- result . IncrementDeletions ( ) ;
74- }
75- else if ( originalEntity is not null && ! metadata . Deleted )
76- {
77- // Gather properties marked with [JsonIgnore]
78- HashSet < string > ignoredProps = pullResponse . EntityType
79- . GetProperties ( BindingFlags . Public | BindingFlags . Instance )
80- . Where ( p => p . IsDefined ( typeof ( JsonIgnoreAttribute ) , inherit : true ) )
81- . Select ( p => p . Name )
82- . ToHashSet ( ) ;
83-
84- EntityEntry originalEntry = context . Entry ( originalEntity ) ;
85- EntityEntry newEntry = context . Entry ( item ) ;
66+ EntityMetadata metadata = EntityResolver . GetEntityMetadata ( item , pullResponse . EntityType ) ;
67+ currentMetadata = metadata ;
68+ object ? originalEntity = await context . FindAsync ( pullResponse . EntityType , [ metadata . Id ] , cancellationToken ) . ConfigureAwait ( false ) ;
8669
87- // Only copy properties that are not marked with [JsonIgnore]
88- foreach ( IProperty property in originalEntry . Metadata . GetProperties ( ) )
70+ if ( originalEntity is null && ! metadata . Deleted )
71+ {
72+ _ = context . Add ( item ) ;
73+ result . IncrementAdditions ( ) ;
74+ }
75+ else if ( originalEntity is not null && metadata . Deleted )
8976 {
90- if ( ! ignoredProps . Contains ( property . Name ) )
77+ _ = context . Remove ( originalEntity ) ;
78+ result . IncrementDeletions ( ) ;
79+ }
80+ else if ( originalEntity is not null && ! metadata . Deleted )
81+ {
82+ // Gather properties marked with [JsonIgnore]
83+ HashSet < string > ignoredProps = pullResponse . EntityType
84+ . GetProperties ( BindingFlags . Public | BindingFlags . Instance )
85+ . Where ( p => p . IsDefined ( typeof ( JsonIgnoreAttribute ) , inherit : true ) )
86+ . Select ( p => p . Name )
87+ . ToHashSet ( ) ;
88+
89+ EntityEntry originalEntry = context . Entry ( originalEntity ) ;
90+ EntityEntry newEntry = context . Entry ( item ) ;
91+
92+ // Only copy properties that are not marked with [JsonIgnore]
93+ foreach ( IProperty property in originalEntry . Metadata . GetProperties ( ) )
9194 {
92- originalEntry . Property ( property . Name ) . CurrentValue = newEntry . Property ( property . Name ) . CurrentValue ;
95+ if ( ! ignoredProps . Contains ( property . Name ) )
96+ {
97+ originalEntry . Property ( property . Name ) . CurrentValue = newEntry . Property ( property . Name ) . CurrentValue ;
98+ }
9399 }
100+
101+ result . IncrementReplacements ( ) ;
94102 }
95103
96- result . IncrementReplacements ( ) ;
104+ if ( metadata . UpdatedAt > lastSynchronization )
105+ {
106+ lastSynchronization = metadata . UpdatedAt . Value ;
107+ bool isAdded = await DeltaTokenStore . SetDeltaTokenAsync ( pullResponse . QueryId , metadata . UpdatedAt . Value , cancellationToken ) . ConfigureAwait ( false ) ;
108+ if ( isAdded )
109+ {
110+ // Sqlite oddity - you can't add then update; it changes the change type to UPDATE, which then fails.
111+ _ = await context . SaveChangesAsync ( true , false , cancellationToken ) . ConfigureAwait ( false ) ;
112+ }
113+ }
114+ currentMetadata = null ;
97115 }
98116
99- if ( metadata . UpdatedAt > lastSynchronization )
117+ if ( pullOptions . SaveAfterEveryServiceRequest )
100118 {
101- lastSynchronization = metadata . UpdatedAt . Value ;
102- bool isAdded = await DeltaTokenStore . SetDeltaTokenAsync ( pullResponse . QueryId , metadata . UpdatedAt . Value , cancellationToken ) . ConfigureAwait ( false ) ;
103- if ( isAdded )
104- {
105- // Sqlite oddity - you can't add then update; it changes the change type to UPDATE, which then fails.
106- _ = await context . SaveChangesAsync ( true , false , cancellationToken ) . ConfigureAwait ( false ) ;
107- }
119+ _ = await context . SaveChangesAsync ( true , false , cancellationToken ) . ConfigureAwait ( false ) ;
108120 }
109- }
110121
111- if ( pullOptions . SaveAfterEveryServiceRequest )
112- {
113- _ = await context . SaveChangesAsync ( true , false , cancellationToken ) . ConfigureAwait ( false ) ;
122+ context . SendSynchronizationEvent ( new SynchronizationEventArgs ( )
123+ {
124+ EventType = SynchronizationEventType . ItemsCommitted ,
125+ EntityType = pullResponse . EntityType ,
126+ ItemsProcessed = pullResponse . TotalItemsProcessed ,
127+ ItemsTotal = pullResponse . TotalRequestItems ,
128+ QueryId = pullResponse . QueryId
129+ } ) ;
114130 }
115131
116- context . SendSynchronizationEvent ( new SynchronizationEventArgs ( )
132+ if ( pullResponse . Completed )
117133 {
118- EventType = SynchronizationEventType . ItemsCommitted ,
119- EntityType = pullResponse . EntityType ,
120- ItemsProcessed = pullResponse . TotalItemsProcessed ,
121- ItemsTotal = pullResponse . TotalRequestItems ,
122- QueryId = pullResponse . QueryId
123- } ) ;
134+ context . SendSynchronizationEvent ( new SynchronizationEventArgs ( )
135+ {
136+ EventType = SynchronizationEventType . PullEnded ,
137+ EntityType = pullResponse . EntityType ,
138+ ItemsProcessed = pullResponse . TotalItemsProcessed ,
139+ ItemsTotal = pullResponse . TotalRequestItems ,
140+ QueryId = pullResponse . QueryId ,
141+ Exception = pullResponse . Exception ,
142+ ServiceResponse = pullResponse . Exception is DatasyncPullException ex ? ex . ServiceResponse : null
143+ } ) ;
144+ }
124145 }
125-
126- if ( pullResponse . Completed )
146+ catch ( Exception ex )
127147 {
148+ // An exception is thrown in the local processing section of the pull operation. We can't
149+ // handle it properly, so we add it to the result and send a synchronization event to allow
150+ // the developer to capture the exception.
151+ result . AddLocalException ( currentMetadata , ex ) ;
128152 context . SendSynchronizationEvent ( new SynchronizationEventArgs ( )
129153 {
130- EventType = SynchronizationEventType . PullEnded ,
154+ EventType = SynchronizationEventType . LocalException ,
131155 EntityType = pullResponse . EntityType ,
132- ItemsProcessed = pullResponse . TotalItemsProcessed ,
133- ItemsTotal = pullResponse . TotalRequestItems ,
134156 QueryId = pullResponse . QueryId ,
135- Exception = pullResponse . Exception ,
136- ServiceResponse = pullResponse . Exception is DatasyncPullException ex ? ex . ServiceResponse : null
157+ Exception = ex ,
158+ EntityMetadata = currentMetadata
137159 } ) ;
138160 }
139161 } ) ;
@@ -189,6 +211,20 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
189211 result . AddFailedRequest ( requestUri , ex . ServiceResponse ) ;
190212 databaseUpdateQueue . Enqueue ( new PullResponse ( pullRequest . EntityType , pullRequest . QueryId , [ ] , totalCount , itemsProcessed , true , ex ) ) ;
191213 }
214+ catch ( Exception localex )
215+ {
216+ // An exception is thrown that is locally generated. We can't handle it properly, so we
217+ // add it to the result and send a synchronization event to allow the developer to capture
218+ // the exception.
219+ result . AddLocalException ( null , localex ) ;
220+ context . SendSynchronizationEvent ( new SynchronizationEventArgs ( )
221+ {
222+ EventType = SynchronizationEventType . LocalException ,
223+ EntityType = pullRequest . EntityType ,
224+ QueryId = pullRequest . QueryId ,
225+ Exception = localex
226+ } ) ;
227+ }
192228 } ) ;
193229
194230 // Get requests we need to enqueue. Note : do not enqueue them yet. Context only supports one outstanding query at a time and we don't want a query from a background task being run concurrently with GetDeltaTokenAsync.
0 commit comments