Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/Speckle.Sdk/Api/Blob/BlobApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ public partial interface IBlobApi : IDisposable;
/// Low level access to the blob API
/// </summary>
/// <seealso cref="FileImportResource"/>
/// <seealso cref="ServerApi"/>
[GenerateAutoInterface]
public sealed class BlobApi : IBlobApi
{
Expand Down
171 changes: 0 additions & 171 deletions src/Speckle.Sdk/Api/Operations/Operations.Receive.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using Microsoft.Extensions.Logging;
using Speckle.Sdk.Logging;
using Speckle.Sdk.Models;
using Speckle.Sdk.Serialisation;
using Speckle.Sdk.Transports;

namespace Speckle.Sdk.Api;
Expand Down Expand Up @@ -55,173 +53,4 @@ CancellationToken cancellationToken
await process.DisposeAsync().ConfigureAwait(false);
}
}

/// <summary>
/// Receives an object (and all its sub-children) from the two provided <see cref="ITransport"/>s.
/// <br/>
/// Will first try and find objects using the <paramref name="localTransport"/> (the faster transport)
/// If not found, will attempt to copy the objects from the <paramref name="remoteTransport"/> into the <paramref name="localTransport"/> before deserialization
/// </summary>
/// <remarks>
/// If Transports are properly implemented, there is no hard distinction between what is a local or remote transport; it's still just an <see cref="ITransport"/>.
/// <br/>So, for example, if you want to receive an object without actually writing it first to a local transport, you can just pass a <see cref="ServerTransport"/> as a local transport.
/// <br/>This is not recommended, but shows what you can do. Another tidbit: the local transport does not need to be disk-bound; it can easily be an in <see cref="MemoryTransport"/>. In memory transports are the fastest ones, but they're of limited use for larger datasets
/// </remarks>
/// <param name="objectId">The id of the object to receive</param>
/// <param name="remoteTransport">The remote transport (slower). If <see langword="null"/>, will assume all objects are present in <paramref name="localTransport"/></param>
/// <param name="localTransport">The local transport (faster). If <see langword="null"/>, will use a default <see cref="SQLiteTransport"/> cache</param>
/// <param name="onProgressAction">Action invoked on progress iterations</param>
/// <param name="cancellationToken"></param>
/// <exception cref="TransportException">Failed to retrieve objects from the provided transport(s)</exception>
/// <exception cref="SpeckleDeserializeException">Deserialization of the requested object(s) failed</exception>
/// <exception cref="OperationCanceledException"><paramref name="cancellationToken"/> requested cancel</exception>
/// <returns>The requested Speckle Object</returns>
public async Task<Base> Receive(
string objectId,
ITransport? remoteTransport = null,
ITransport? localTransport = null,
IProgress<ProgressArgs>? onProgressAction = null,
CancellationToken cancellationToken = default
)
{
using var receiveActivity = activityFactory.Start("Operations.Receive");
metricsFactory.CreateCounter<long>("Receive").Add(1);

if (remoteTransport != null)
{
receiveActivity?.SetTags("remoteTransportContext", remoteTransport.TransportContext);
}
receiveActivity?.SetTag("objectId", objectId);

try
{
using IDisposable? d1 = UseDefaultTransportIfNull(localTransport, out localTransport);
receiveActivity?.SetTags("localTransportContext", localTransport.TransportContext);

var result = await ReceiveImpl(objectId, remoteTransport, localTransport, onProgressAction, cancellationToken)
.ConfigureAwait(false);

receiveActivity?.SetStatus(SdkActivityStatusCode.Ok);
return result;
}
catch (Exception ex)
{
receiveActivity?.SetStatus(SdkActivityStatusCode.Error);
receiveActivity?.RecordException(ex);
throw;
}
}

/// <inheritdoc cref="Receive(string,ITransport?,ITransport?,IProgress{ProgressArgs}?,CancellationToken)"/>
private async Task<Base> ReceiveImpl(
string objectId,
ITransport? remoteTransport,
ITransport localTransport,
IProgress<ProgressArgs>? internalProgressAction,
CancellationToken cancellationToken
)
{
// Setup Local Transport
localTransport.OnProgressAction = internalProgressAction;
localTransport.CancellationToken = cancellationToken;

// Setup Remote Transport
if (remoteTransport is not null)
{
remoteTransport.OnProgressAction = internalProgressAction;
remoteTransport.CancellationToken = cancellationToken;
}

// Setup Serializer
SpeckleObjectDeserializer serializer = new()
{
ReadTransport = localTransport,
OnProgressAction = internalProgressAction,
CancellationToken = cancellationToken,
BlobStorageFolder = (remoteTransport as IBlobCapableTransport)?.BlobStorageFolder,
};

// Try Local Receive
string? objString = await LocalReceive(objectId, localTransport).ConfigureAwait(false);

if (objString is null)
{
// Fall back to remote
if (remoteTransport is null)
{
throw new TransportException(
$"Could not find specified object using the local transport {localTransport.TransportName}, and you didn't provide a fallback remote from which to pull it."
);
}

logger.LogDebug(
"Cannot find object {objectId} in the local transport, hitting remote {transportName}",
objectId,
remoteTransport.TransportName
);

objString = await RemoteReceive(objectId, remoteTransport, localTransport).ConfigureAwait(false);
}

using var serializerActivity = activityFactory.Start();

// Proceed to deserialize the object, now safely knowing that all its children are present in the local (fast) transport.
return await DeserializeActivity(objString, serializer).ConfigureAwait(false);
}

/// <summary>
/// Try and get the object from the local transport. If it's there, we assume all its children are there
/// This assumption is hard-wired into the <see cref="SpeckleObjectDeserializer"/>
/// </summary>
/// <param name="objectId"></param>
/// <param name="localTransport"></param>
/// <returns></returns>
/// <exception cref="SpeckleDeserializeException"></exception>
internal static async Task<string?> LocalReceive(string objectId, ITransport localTransport)
{
string? objString = await localTransport.GetObject(objectId).ConfigureAwait(false);
if (objString is null)
{
return null;
}
return objString;
}

/// <summary>
/// Copies the requested object and all its children from <paramref name="remoteTransport"/> to <paramref name="localTransport"/>
/// </summary>
/// <seealso cref="ITransport.CopyObjectAndChildren"/>
/// <param name="objectId"></param>
/// <param name="remoteTransport"></param>
/// <param name="localTransport"></param>
/// <returns></returns>
/// <exception cref="TransportException">Remote transport was not specified</exception>
private static async Task<string> RemoteReceive(
string objectId,
ITransport remoteTransport,
ITransport localTransport
)
{
var objString = await remoteTransport.CopyObjectAndChildren(objectId, localTransport).ConfigureAwait(false);

// DON'T THINK THIS IS NEEDED CopyObjectAndChildren should call this
// Wait for the local transport to finish "writing" - in this case, it signifies that the remote transport has done pushing copying objects into it. (TODO: I can see some scenarios where latency can screw things up, and we should rather wait on the remote transport).
await localTransport.WriteComplete().ConfigureAwait(false);

return objString;
}

private static IDisposable? UseDefaultTransportIfNull(ITransport? userTransport, out ITransport actualLocalTransport)
{
if (userTransport is not null)
{
actualLocalTransport = userTransport;
return null;
}

//User did not specify a transport, default to SQLite
SQLiteTransport defaultLocalTransport = new();
actualLocalTransport = defaultLocalTransport;
return defaultLocalTransport;
}
}
186 changes: 0 additions & 186 deletions src/Speckle.Sdk/Api/Operations/Operations.Send.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
using System.Diagnostics;
using Microsoft.Extensions.Logging;
using Speckle.Newtonsoft.Json.Linq;
using Speckle.Sdk.Logging;
using Speckle.Sdk.Models;
using Speckle.Sdk.Serialisation;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.Transports;

Expand Down Expand Up @@ -63,186 +59,4 @@ CancellationToken cancellationToken
await process.DisposeAsync().ConfigureAwait(false);
}
}

/// <summary>
/// Sends a Speckle Object to the provided <paramref name="transport"/> and (optionally) the default local cache
/// </summary>
/// <remarks/>
/// <inheritdoc cref="Send(Base, IReadOnlyCollection{ITransport}, IProgress{ProgressArgs}?, CancellationToken)"/>
/// <param name="useDefaultCache">When <see langword="true"/>, an additional <see cref="SQLiteTransport"/> will be included</param>
/// <exception cref="ArgumentNullException">The <paramref name="transport"/> or <paramref name="value"/> was <see langword="null"/></exception>
/// <example><code>
/// using ServerTransport destination = new(account, streamId);
/// var (objectId, references) = await Send(mySpeckleObject, destination, true);
/// </code></example>
public async Task<(string rootObjId, IReadOnlyDictionary<string, ObjectReference> convertedReferences)> Send(
Base value,
IServerTransport transport,
bool useDefaultCache,
IProgress<ProgressArgs>? onProgressAction = null,
CancellationToken cancellationToken = default
)
{
if (transport is null)
{
throw new ArgumentNullException(nameof(transport), "Expected a transport to be explicitly specified");
}

List<ITransport> transports = new() { transport };
using SQLiteTransport2? localCache = useDefaultCache
? new SQLiteTransport2(transport.StreamId) { TransportName = "LC2" }
: null;
if (localCache is not null)
{
transports.Add(localCache);
}

return await Send(value, transports, onProgressAction, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Sends a Speckle Object to the provided <paramref name="transport"/> and (optionally) the default local cache
/// </summary>
/// <remarks/>
/// <inheritdoc cref="Send(Base, IReadOnlyCollection{ITransport}, IProgress{ProgressArgs}?, CancellationToken)"/>
/// <param name="useDefaultCache">When <see langword="true"/>, an additional <see cref="SQLiteTransport"/> will be included</param>
/// <exception cref="ArgumentNullException">The <paramref name="transport"/> or <paramref name="value"/> was <see langword="null"/></exception>
/// <example><code>
/// using ServerTransport destination = new(account, streamId);
/// var (objectId, references) = await Send(mySpeckleObject, destination, true);
/// </code></example>
public async Task<(string rootObjId, IReadOnlyDictionary<string, ObjectReference> convertedReferences)> Send(
Base value,
ITransport transport,
bool useDefaultCache,
IProgress<ProgressArgs>? onProgressAction = null,
CancellationToken cancellationToken = default
)
{
if (transport is null)
{
throw new ArgumentNullException(nameof(transport), "Expected a transport to be explicitly specified");
}

List<ITransport> transports = new() { transport };
using SQLiteTransport? localCache = useDefaultCache ? new SQLiteTransport { TransportName = "LC" } : null;
if (localCache is not null)
{
transports.Add(localCache);
}

return await Send(value, transports, onProgressAction, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Sends a Speckle Object to the provided <paramref name="transports"/>
/// </summary>
/// <remarks>Only sends to the specified transports, the default local cache won't be used unless you also pass it in</remarks>
/// <returns>The id (hash) of the object sent</returns>
/// <param name="value">The object you want to send</param>
/// <param name="transports">Where you want to send them</param>
/// <param name="onProgressAction">Action that gets triggered on every progress tick (keeps track of all transports)</param>
/// <param name="cancellationToken"></param>
/// <exception cref="ArgumentException">No transports were specified</exception>
/// <exception cref="ArgumentNullException">The <paramref name="value"/> was <see langword="null"/></exception>
/// <exception cref="SpeckleException">Serialization or Send operation was unsuccessful</exception>
/// <exception cref="TransportException">One or more <paramref name="transports"/> failed to send</exception>
/// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> requested cancellation</exception>
public async Task<(string rootObjId, IReadOnlyDictionary<string, ObjectReference> convertedReferences)> Send(
Base value,
IReadOnlyCollection<ITransport> transports,
IProgress<ProgressArgs>? onProgressAction = null,
CancellationToken cancellationToken = default
)
{
#pragma warning disable CA1510
if (value is null)
{
throw new ArgumentNullException(nameof(value));
}
#pragma warning restore CA1510

if (transports.Count == 0)
{
throw new ArgumentException("Expected at least on transport to be specified", nameof(transports));
}

// make sure all logs in the operation have the proper context
metricsFactory.CreateCounter<long>("Send").Add(1);
using var activity = activityFactory.Start();
activity?.SetTag("correlationId", Guid.NewGuid().ToString());
{
var sendTimer = Stopwatch.StartNew();
logger.LogDebug("Starting send operation");

SpeckleObjectSerializer serializerV2 = new(transports, onProgressAction, true, cancellationToken);

foreach (var t in transports)
{
t.OnProgressAction = onProgressAction;
t.CancellationToken = cancellationToken;
t.BeginWrite();
}

try
{
var rootObjectId = await SerializerSend(value, serializerV2, cancellationToken).ConfigureAwait(false);

sendTimer.Stop();
activity?.SetTags("transportElapsedBreakdown", transports.ToDictionary(t => t.TransportName, t => t.Elapsed));
activity?.SetTag(
"note",
"the elapsed summary doesn't need to add up to the total elapsed... Threading magic..."
);
activity?.SetTag("serializerElapsed", serializerV2.Elapsed);
logger.LogDebug(
"Finished sending objects after {elapsed}, result {objectId}",
sendTimer.Elapsed.TotalSeconds,
rootObjectId
);

return (rootObjectId, serializerV2.ObjectReferences);
}
catch (Exception ex) when (!ex.IsFatal())
{
logger.LogInformation(ex, "Send operation failed after {elapsed} seconds", sendTimer.Elapsed.TotalSeconds);
if (ex is OperationCanceledException or SpeckleException)
{
throw;
}

throw new SpeckleException("Send operation was unsuccessful", ex);
}
finally
{
foreach (var t in transports)
{
t.EndWrite();
}
}
}
}

/// <returns><inheritdoc cref="Send(Base, IReadOnlyCollection{ITransport}, IProgress{ProgressArgs}?, CancellationToken)"/></returns>
internal static async Task<string> SerializerSend(
Base value,
SpeckleObjectSerializer serializer,
CancellationToken cancellationToken = default
)
{
string obj = serializer.Serialize(value);
Task[] transportAwaits = serializer.WriteTransports.Select(t => t.WriteComplete()).ToArray();

cancellationToken.ThrowIfCancellationRequested();

await Task.WhenAll(transportAwaits).ConfigureAwait(false);

JToken? idToken = JObject.Parse(obj).GetValue("id");
if (idToken == null)
{
throw new SpeckleException("Failed to get id of serialized object");
}

return idToken.ToString();
}
}
Loading
Loading