forked from CommunityToolkit/Datasync
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPullOperationManager.cs
213 lines (193 loc) · 10.2 KB
/
PullOperationManager.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using CommunityToolkit.Datasync.Client.Exceptions;
using CommunityToolkit.Datasync.Client.Offline.Models;
using CommunityToolkit.Datasync.Client.Query.Linq;
using CommunityToolkit.Datasync.Client.Query.OData;
using CommunityToolkit.Datasync.Client.Serialization;
using CommunityToolkit.Datasync.Client.Threading;
using System.Diagnostics.CodeAnalysis;
using System.Reflection;
using System.Text.Json;
namespace CommunityToolkit.Datasync.Client.Offline.Operations;
/// <summary>
/// The internals of a single pull operation.
/// </summary>
/// <remarks>
/// The synchronization lock is already assumed to be in place. The internals do not do
/// interim saves of data.
/// </remarks>
/// <param name="context">The <see cref="OfflineDbContext"/> to use for saving data</param>
/// <param name="synchronizableTypes">The list of synchronizable types that the pull operation should support.</param>
internal class PullOperationManager(OfflineDbContext context, IEnumerable<Type> synchronizableTypes) : IPullOperationManager
{
/// <summary>
/// The delta-token store, which stores the date/time of the last synchronization for a query.
/// </summary>
internal IDeltaTokenStore DeltaTokenStore { get => context.DeltaTokenStore; }
/// <summary>
/// The list of synchronizable entity types that this object can process.
/// </summary>
internal IList<Type> SynchronizableTypes { get; } = [.. synchronizableTypes];
/// <summary>
/// Executes a set of pull requests.
/// </summary>
/// <param name="requests">The list of pull requests to execute..</param>
/// <param name="pullOptions">The pull options to use.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> to observe.</param>
/// <returns>The results of the pull operation.</returns>
public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, PullOptions pullOptions, CancellationToken cancellationToken = default)
{
ArgumentValidationException.ThrowIfNotValid(pullOptions, nameof(pullOptions));
PullResult result = new();
QueueHandler<PullResponse> databaseUpdateQueue = new(1, async pullResponse =>
{
DateTimeOffset lastSynchronization = await DeltaTokenStore.GetDeltaTokenAsync(pullResponse.QueryId, cancellationToken).ConfigureAwait(false);
foreach (object item in pullResponse.Items)
{
EntityMetadata metadata = EntityResolver.GetEntityMetadata(item, pullResponse.EntityType);
object? originalEntity = await context.FindAsync(pullResponse.EntityType, [metadata.Id], cancellationToken).ConfigureAwait(false);
if (originalEntity is null && !metadata.Deleted)
{
_ = context.Add(item);
result.IncrementAdditions();
}
else if (originalEntity is not null && metadata.Deleted)
{
_ = context.Remove(originalEntity);
result.IncrementDeletions();
}
else if (originalEntity is not null && !metadata.Deleted)
{
context.Entry(originalEntity).CurrentValues.SetValues(item);
result.IncrementReplacements();
}
if (metadata.UpdatedAt > lastSynchronization)
{
lastSynchronization = metadata.UpdatedAt.Value;
bool isAdded = await DeltaTokenStore.SetDeltaTokenAsync(pullResponse.QueryId, metadata.UpdatedAt.Value, cancellationToken).ConfigureAwait(false);
if (isAdded)
{
// Sqlite oddity - you can't add then update; it changes the change type to UPDATE, which then fails.
_ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false);
}
}
}
if (pullOptions.SaveAfterEveryServiceRequest)
{
_ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false);
}
});
QueueHandler<PullRequest> serviceRequestQueue = new(pullOptions.ParallelOperations, async pullRequest =>
{
Uri endpoint = ExecutableOperation.MakeAbsoluteUri(pullRequest.HttpClient.BaseAddress, pullRequest.Endpoint);
Uri requestUri = new UriBuilder(endpoint) { Query = pullRequest.QueryDescription.ToODataQueryString() }.Uri;
Type pageType = typeof(Page<>).MakeGenericType(pullRequest.EntityType);
try
{
bool completed = false;
do
{
Page<object> page = await GetPageAsync(pullRequest.HttpClient, requestUri, pageType, cancellationToken).ConfigureAwait(false);
databaseUpdateQueue.Enqueue(new PullResponse(pullRequest.EntityType, pullRequest.QueryId, page.Items));
if (!string.IsNullOrEmpty(page.NextLink))
{
requestUri = new UriBuilder(endpoint) { Query = page.NextLink }.Uri;
}
else
{
completed = true;
}
}
while (!completed);
}
catch (DatasyncPullException ex)
{
result.AddFailedRequest(requestUri, ex.ServiceResponse);
}
});
foreach (PullRequest request in requests)
{
DateTimeOffset lastSynchronization = await context.DeltaTokenStore.GetDeltaTokenAsync(request.QueryId, cancellationToken).ConfigureAwait(false);
request.QueryDescription = PrepareQueryDescription(request.QueryDescription, lastSynchronization);
serviceRequestQueue.Enqueue(request);
}
await serviceRequestQueue.WhenComplete();
await databaseUpdateQueue.WhenComplete();
_ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false);
return result;
}
/// <summary>
/// Retrieves a single page of results from the service. This throws an error if the response is invalid, and decodes the page of results.
/// </summary>
/// <param name="client">The <see cref="HttpClient"/> to use.</param>
/// <param name="requestUri">The request URI.</param>
/// <param name="pageType">The page type (actual return type from the service).</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> to observe.</param>
/// <returns>A page of items.</returns>
/// <exception cref="DatasyncPullException">Thrown on error</exception>
internal async Task<Page<object>> GetPageAsync(HttpClient client, Uri requestUri, Type pageType, CancellationToken cancellationToken = default)
{
PropertyInfo itemsPropInfo = pageType.GetProperty("Items")
?? throw new DatasyncException($"Page type '{pageType.Name}' does not have an 'Items' property");
PropertyInfo nextLinkPropInfo = pageType.GetProperty("NextLink")
?? throw new DatasyncException($"Page type '{pageType.Name}' does not have a 'NextLink' property");
using HttpRequestMessage requestMessage = new(HttpMethod.Get, requestUri);
using HttpResponseMessage responseMessage = await client.SendAsync(requestMessage, cancellationToken).ConfigureAwait(false);
ServiceResponse response = new(responseMessage);
if (!response.IsSuccessful || !response.HasContent)
{
throw new DatasyncPullException() { ServiceResponse = response };
}
try
{
object? result = await JsonSerializer.DeserializeAsync(response.ContentStream, pageType, context.JsonSerializerOptions, cancellationToken).ConfigureAwait(false)
?? throw new DatasyncPullException("JSON result is null") { ServiceResponse = response };
Page<object> page = new Page<object>()
{
Items = (IEnumerable<object>)itemsPropInfo.GetValue(result)!,
NextLink = (string?)nextLinkPropInfo.GetValue(result)
};
return page;
}
catch (JsonException ex)
{
throw new DatasyncPullException(ex.Message, ex) { ServiceResponse = response };
}
}
/// <summary>
/// Prepares the query description for use as a pull request.
/// </summary>
/// <param name="source">The query description to modify.</param>
/// <param name="lastSynchronization">The last synchronization date/time</param>
/// <returns>A modified query description for the actual query.</returns>
internal static QueryDescription PrepareQueryDescription(QueryDescription source, DateTimeOffset lastSynchronization)
{
QueryDescription query = new(source);
if (lastSynchronization.ToUnixTimeMilliseconds() > 0L)
{
BinaryOperatorNode deltaTokenFilter = new(BinaryOperatorKind.GreaterThan)
{
LeftOperand = new MemberAccessNode(null, "updatedAt"),
RightOperand = new ConstantNode(lastSynchronization)
};
query.Filter = query.Filter is null ? deltaTokenFilter : new BinaryOperatorNode(BinaryOperatorKind.And, query.Filter, deltaTokenFilter);
}
query.QueryParameters[ODataQueryParameters.IncludeDeleted] = "true";
query.RequestTotalCount = true;
query.Top = null;
query.Skip = 0;
query.Ordering.Clear();
query.Ordering.Add(new OrderByNode(new MemberAccessNode(null, "updatedAt"), true));
return query;
}
/// <summary>
/// A record type for the database update queue handler.
/// </summary>
/// <param name="EntityType">The type of entity contained within the items.</param>
/// <param name="QueryId">The query ID for the request.</param>
/// <param name="Items">The list of items to process.</param>
[ExcludeFromCodeCoverage]
internal record PullResponse(Type EntityType, string QueryId, IEnumerable<object> Items);
}