diff --git a/src/SpacetimeDBClient.cs b/src/SpacetimeDBClient.cs index dc627f2f..dae336b5 100644 --- a/src/SpacetimeDBClient.cs +++ b/src/SpacetimeDBClient.cs @@ -5,6 +5,8 @@ using System.IO.Compression; using System.Linq; using System.Net.WebSockets; +using System.Security.Cryptography.X509Certificates; +using System.Text; using System.Threading; using System.Threading.Tasks; using SpacetimeDB.BSATN; @@ -139,6 +141,7 @@ struct DbOp private readonly Dictionary> waitingOneOffQueries = new(); private bool isClosing; + private bool networkStatsLoggingEnabled; private readonly Thread networkMessageProcessThread; public readonly Stats stats = new(); @@ -165,7 +168,7 @@ protected DbConnectionBase() struct UnprocessedMessage { - public byte[] bytes; + public byte[]? bytes; public DateTime timestamp; } @@ -211,24 +214,63 @@ enum CompressionAlgos : byte Brotli = 1, } - private static ServerMessage DecompressDecodeMessage(byte[] bytes) + private ServerMessage DecompressDecodeMessage(byte[] bytes, + out int decompressedCount, out byte[]? decompressedBytes, out CompressionAlgos compression) { using var stream = new MemoryStream(bytes, 1, bytes.Length - 1); // The stream will never be empty. It will at least contain the compression algo. - var compression = (CompressionAlgos)bytes[0]; + compression = (CompressionAlgos)bytes[0]; // Conditionally decompress and decode. switch (compression) { case CompressionAlgos.None: { using var binaryReader = new BinaryReader(stream); + if (networkStatsLoggingEnabled) + { + decompressedCount = bytes.Length - 1; + decompressedBytes = new byte[bytes.Length - 1]; + Array.Copy(bytes, 1, decompressedBytes, 0, decompressedCount); + } + else + { + decompressedCount = 0; + decompressedBytes = null; + } + return new ServerMessage.BSATN().Read(binaryReader); } case CompressionAlgos.Brotli: { using var decompressedStream = new BrotliStream(stream, CompressionMode.Decompress); using var binaryReader = new BinaryReader(decompressedStream); + + if (networkStatsLoggingEnabled) + { + // This is terribly inefficient, but only runs when network stats logging is enabled. + using var debugMemoryStream = new MemoryStream(bytes, 1, bytes.Length - 1); + using var debugDecompressedStream = new BrotliStream(debugMemoryStream, CompressionMode.Decompress); + decompressedBytes = new byte[500]; + decompressedCount = 0; + int readSize; + while ((readSize = debugDecompressedStream.Read(decompressedBytes, decompressedCount, decompressedBytes.Length - decompressedCount)) != 0) + { + decompressedCount += readSize; + if (decompressedCount == decompressedBytes.Length) + { + var newDecompressedBytes = new byte[decompressedBytes.Length * 2]; + Array.Copy(decompressedBytes, newDecompressedBytes, decompressedBytes.Length); + decompressedBytes = newDecompressedBytes; + } + } + } + else + { + decompressedBytes = null; + decompressedCount = 0; + } + return new ServerMessage.BSATN().Read(binaryReader); } default: @@ -322,10 +364,16 @@ void PreProcessMessages() PreProcessedMessage PreProcessMessage(UnprocessedMessage unprocessed) { var dbOps = new List(); - - var message = DecompressDecodeMessage(unprocessed.bytes); - + + var message = DecompressDecodeMessage(unprocessed.bytes!, out var decompressedByteCount, + out var decompressedBytes, out var compression); + // Network stats tracking goes from tableName => number of rows inserted row count/deleted row count/new row bytes + Dictionary? tableRowOps = null; ReducerEvent? reducerEvent = default; + if (networkStatsLoggingEnabled) + { + tableRowOps = new Dictionary(); + } // This is all of the inserts Dictionary>? subscriptionInserts = null; @@ -362,6 +410,8 @@ HashSet GetInsertHashSet(System.Type tableType, int tableSize) var hashSet = GetInsertHashSet(table.ClientTableType, (int)update.NumRows); + var newRowBytes = 0; + var insertCount = 0; foreach (var cqu in update.Updates) { var qu = DecompressDecodeQueryUpdate(cqu); @@ -372,6 +422,8 @@ HashSet GetInsertHashSet(System.Type tableType, int tableSize) foreach (var bin in BsatnRowListIter(qu.Inserts)) { + newRowBytes += bin.Length; + insertCount++; if (!hashSet.Add(bin)) { // Ignore duplicate inserts in the same subscription update. @@ -388,6 +440,36 @@ HashSet GetInsertHashSet(System.Type tableType, int tableSize) dbOps.Add(op); } } + + if (networkStatsLoggingEnabled) + { + if (tableRowOps!.TryGetValue(tableName, out var tableOp)) + { + tableRowOps[tableName] = (insertCount, 0, tableOp.Item3 + newRowBytes); + } + else + { + tableRowOps.Add(tableName, (insertCount, 0, newRowBytes)); + } + } + } + + if (networkStatsLoggingEnabled) + { + var builder = new StringBuilder(); + builder.AppendLine( + $"Initial Subscription Received: compressed: {SpacetimeDBUtils.FormatBytes(unprocessed.bytes.Length)} " + + $"decompressed: {SpacetimeDBUtils.FormatBytes(decompressedByteCount)} compression type: {compression}"); + foreach(var update in tableRowOps!) + { + builder.AppendLine( + $"\tTable {update.Key} inserts: {update.Value.Item1} deletes: {update.Value.Item2} inserted row bytes: {update.Value.Item3}"); + } + for(var x = 0;x < decompressedByteCount;x++) + { + builder.Append(decompressedBytes![x].ToString("X2") + " "); + } + Log.Info(builder.ToString()); } break; @@ -429,11 +511,16 @@ HashSet GetInsertHashSet(System.Type tableType, int tableSize) continue; } + var rowByteCount = 0; + var insertedRowsCount = 0; + var deletedRowsCount = 0; foreach (var cqu in update.Updates) { var qu = DecompressDecodeQueryUpdate(cqu); foreach (var row in BsatnRowListIter(qu.Inserts)) { + insertedRowsCount++; + rowByteCount = row.Length; var op = new DbOp { table = table, insert = Decode(table, row, out var pk) }; if (pk != null) { @@ -469,6 +556,8 @@ HashSet GetInsertHashSet(System.Type tableType, int tableSize) foreach (var row in BsatnRowListIter(qu.Deletes)) { + deletedRowsCount++; + rowByteCount += row.Length; var op = new DbOp { table = table, delete = Decode(table, row, out var pk) }; if (pk != null) { @@ -501,14 +590,70 @@ HashSet GetInsertHashSet(System.Type tableType, int tableSize) dbOps.Add(op); } } + + if (networkStatsLoggingEnabled) + { + if (tableRowOps!.TryGetValue(tableName, out var tableOp)) + { + tableRowOps[tableName] = (tableOp.Item1 + insertedRowsCount, tableOp.Item2 + deletedRowsCount, tableOp.Item3 + rowByteCount); + } + else + { + tableRowOps.Add(tableName, (insertedRowsCount, deletedRowsCount, rowByteCount)); + } + } } } // Combine primary key updates and non-primary key updates dbOps.AddRange(primaryKeyChanges.Values); + + if (networkStatsLoggingEnabled) + { + var builder = new StringBuilder(); + builder.AppendLine( + $"Transaction Update Received (Committed): reducer: {transactionUpdate.ReducerCall.ReducerName} compressed: " + + $"{SpacetimeDBUtils.FormatBytes(unprocessed.bytes.Length)} " + + $"decompressed: {SpacetimeDBUtils.FormatBytes(decompressedByteCount)} compression type: {compression}"); + foreach(var update in tableRowOps!) + { + builder.AppendLine( + $"\tTable {update.Key} inserts: {update.Value.Item1} deletes: {update.Value.Item2} inserted row bytes: {update.Value.Item3}"); + } + + builder.Append("Decompressed bytes: "); + for(var x = 0;x < decompressedByteCount;x++) + { + builder.Append(decompressedBytes![x].ToString("X2") + " "); + } + Log.Info(builder.ToString()); + } + } + else + { + if (networkStatsLoggingEnabled) + { + Log.Info($"Transaction Update Received (Failed): reducer: {transactionUpdate.ReducerCall.ReducerName} compressed: " + + $"{SpacetimeDBUtils.FormatBytes(unprocessed.bytes.Length)} decompressed: " + + $"{SpacetimeDBUtils.FormatBytes(decompressedByteCount)} compression type: {compression}"); + } } break; case ServerMessage.IdentityToken(var identityToken): + if (networkStatsLoggingEnabled) + { + var builder = new StringBuilder(); + builder.AppendLine( + $"IdentityToken Message Received: compressed: {SpacetimeDBUtils.FormatBytes(unprocessed.bytes.Length)} " + + $"decompressed: {SpacetimeDBUtils.FormatBytes(decompressedByteCount)} identity: {identityToken.Identity}" + + $" compression type: {compression}"); + builder.Append("Decompressed bytes: "); + for(var x = 0;x < decompressedByteCount;x++) + { + builder.Append(decompressedBytes![x].ToString("X2") + " "); + } + Log.Info(builder.ToString()); + } break; case ServerMessage.OneOffQueryResponse(var resp): /// This case does NOT produce a list of DBOps, because it should not modify the client cache state! @@ -521,6 +666,45 @@ HashSet GetInsertHashSet(System.Type tableType, int tableSize) } resultSource.SetResult(resp); + + if (networkStatsLoggingEnabled) + { + var builder = new StringBuilder(); + builder.AppendLine( + $"One-Off Query Response Received: compressed: {SpacetimeDBUtils.FormatBytes(unprocessed.bytes.Length)} " + + $"decompressed: {SpacetimeDBUtils.FormatBytes(decompressedByteCount)} compression type: {compression}"); + foreach (var table in resp.Tables) + { + var insertedByteCount = 0; + var insertedRowCount = 0; + foreach (var update in BsatnRowListIter(table.Rows)) + { + insertedRowCount++; + insertedByteCount += update.Length; + } + + if (tableRowOps!.TryGetValue(table.TableName, out var value)) + { + tableRowOps[table.TableName] = (value.Item1 + insertedRowCount, 0, value.Item3 + insertedByteCount); + } + else + { + tableRowOps.Add(table.TableName, (insertedRowCount, 0, insertedByteCount)); + } + } + + foreach(var update in tableRowOps!) + { + builder.AppendLine( + $"\tTable {update.Key} inserts: {update.Value.Item1}"); + } + builder.Append("Decompressed bytes: "); + for(var x = 0;x < decompressedByteCount;x++) + { + builder.Append(decompressedBytes![x].ToString("X2") + " "); + } + Log.Info(builder.ToString()); + } break; default: throw new InvalidOperationException(); @@ -894,6 +1078,8 @@ T[] LogAndThrow(string error) .ToArray(); } + public void SetNetworkDebugState(bool enabled) => networkStatsLoggingEnabled = enabled; + public bool IsActive => webSocket.IsConnected; public void FrameTick() diff --git a/src/Utils.cs b/src/Utils.cs index 8e25f201..6d7240c6 100644 --- a/src/Utils.cs +++ b/src/Utils.cs @@ -4,3 +4,21 @@ namespace System.Runtime.CompilerServices internal static class IsExternalInit { } // https://stackoverflow.com/a/64749403/1484415 } #endif + +internal class SpacetimeDBUtils +{ + internal static string FormatBytes(long bytes) + { + string[] units = { "B", "KB", "MB", "GB", "TB" }; + double size = bytes; + int unitIndex = 0; + + while (size >= 1024 && unitIndex < units.Length - 1) + { + size /= 1024; + unitIndex++; + } + + return $"{size:0.##} {units[unitIndex]}"; + } +}