Skip to content

Commit 1cf700e

Browse files
committed
Revert wrong changes.
1 parent 623fc9e commit 1cf700e

File tree

8 files changed

+54
-74
lines changed

8 files changed

+54
-74
lines changed

src/tarantool.client/Box.cs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public async Task Connect()
3434
await _physicalConnection.Connect(_clientOptions);
3535

3636
var greetingsResponseBytes = new byte[128];
37-
var readCount = await _physicalConnection.Read(greetingsResponseBytes, 0, greetingsResponseBytes.Length);
37+
var readCount = await _physicalConnection.ReadAsync(greetingsResponseBytes, 0, greetingsResponseBytes.Length);
3838
if (readCount != greetingsResponseBytes.Length)
3939
{
4040
throw ExceptionHelper.UnexpectedGreetingBytesCount(readCount);
@@ -82,15 +82,15 @@ public ISchema GetSchema()
8282
return new Schema(_logicalConnection);
8383
}
8484

85-
public Task Call_1_6(string functionName)
85+
public async Task Call_1_6(string functionName)
8686
{
87-
return Call_1_6<TarantoolTuple, TarantoolTuple>(functionName, TarantoolTuple.Empty);
87+
await Call_1_6<TarantoolTuple, TarantoolTuple>(functionName, TarantoolTuple.Empty);
8888
}
8989

90-
public Task Call_1_6<TTuple>(string functionName, TTuple parameters)
90+
public async Task Call_1_6<TTuple>(string functionName, TTuple parameters)
9191
where TTuple : ITarantoolTuple
9292
{
93-
return _logicalConnection.SendRequestWithEmptyResponse(new CallRequest<TTuple>(functionName, parameters, false));
93+
await Call_1_6<TTuple, TarantoolTuple>(functionName, parameters);
9494
}
9595

9696
public Task<DataResponse<TResponse[]>> Call_1_6<TResponse>(string functionName)
@@ -99,42 +99,42 @@ public Task<DataResponse<TResponse[]>> Call_1_6<TResponse>(string functionName)
9999
return Call_1_6<TarantoolTuple, TResponse>(functionName, TarantoolTuple.Empty);
100100
}
101101

102-
public Task<DataResponse<TResponse[]>> Call_1_6<TTuple, TResponse>(string functionName, TTuple parameters)
102+
public async Task<DataResponse<TResponse[]>> Call_1_6<TTuple, TResponse>(string functionName, TTuple parameters)
103103
where TTuple : ITarantoolTuple
104104
where TResponse : ITarantoolTuple
105105
{
106106
var callRequest = new CallRequest<TTuple>(functionName, parameters, false);
107-
return _logicalConnection.SendRequest<CallRequest<TTuple>, TResponse>(callRequest);
107+
return await _logicalConnection.SendRequest<CallRequest<TTuple>, TResponse>(callRequest);
108108
}
109109

110-
public Task Call(string functionName)
110+
public async Task Call(string functionName)
111111
{
112-
return Call(functionName, TarantoolTuple.Empty);
112+
await Call<TarantoolTuple, TarantoolTuple>(functionName, TarantoolTuple.Empty);
113113
}
114114

115-
public Task Call<TTuple>(string functionName, TTuple parameters)
115+
public async Task Call<TTuple>(string functionName, TTuple parameters)
116116
where TTuple : ITarantoolTuple
117117
{
118-
return _logicalConnection.SendRequestWithEmptyResponse(new CallRequest<TTuple>(functionName, parameters));
118+
await Call<TTuple, TarantoolTuple>(functionName, parameters);
119119
}
120120

121121
public Task<DataResponse<TResponse[]>> Call<TResponse>(string functionName)
122122
{
123123
return Call<TarantoolTuple, TResponse>(functionName, TarantoolTuple.Empty);
124124
}
125125

126-
public Task<DataResponse<TResponse[]>> Call<TTuple, TResponse>(string functionName, TTuple parameters)
126+
public async Task<DataResponse<TResponse[]>> Call<TTuple, TResponse>(string functionName, TTuple parameters)
127127
where TTuple : ITarantoolTuple
128128
{
129129
var callRequest = new CallRequest<TTuple>(functionName, parameters);
130-
return _logicalConnection.SendRequest<CallRequest<TTuple>, TResponse>(callRequest);
130+
return await _logicalConnection.SendRequest<CallRequest<TTuple>, TResponse>(callRequest);
131131
}
132132

133-
public Task<DataResponse<TResponse[]>> Eval<TTuple, TResponse>(string expression, TTuple parameters)
133+
public async Task<DataResponse<TResponse[]>> Eval<TTuple, TResponse>(string expression, TTuple parameters)
134134
where TTuple : ITarantoolTuple
135135
{
136136
var evalRequest = new EvalRequest<TTuple>(expression, parameters);
137-
return _logicalConnection.SendRequest<EvalRequest<TTuple>, TResponse>(evalRequest);
137+
return await _logicalConnection.SendRequest<EvalRequest<TTuple>, TResponse>(evalRequest);
138138
}
139139

140140
public Task<DataResponse<TResponse[]>> Eval<TResponse>(string expression)

src/tarantool.client/ILogicalConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,6 @@ Task<DataResponse<TResponse[]>> SendRequest<TRequest, TResponse>(TRequest reques
1818

1919
TaskCompletionSource<MemoryStream> PopResponseCompletionSource(RequestId requestId, MemoryStream resultStream);
2020

21-
IReadOnlyList<TaskCompletionSource<MemoryStream>> PopAllResponseCompletionSources();
21+
IEnumerable<TaskCompletionSource<MemoryStream>> PopAllResponseCompletionSources();
2222
}
2323
}

src/tarantool.client/INetworkStreamPhysicalConnection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ public interface INetworkStreamPhysicalConnection : IDisposable
88
{
99
Task Connect(ClientOptions options);
1010
Task Flush();
11-
Task<int> Read(byte[] buffer, int offset, int count);
12-
Task Write(byte[] buffer, int offset, int count);
11+
Task<int> ReadAsync(byte[] buffer, int offset, int count);
12+
void Write(byte[] buffer, int offset, int count);
1313
}
1414
}

src/tarantool.client/LogicalConnection.cs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,16 @@ public LogicalConnection(ClientOptions options, INetworkStreamPhysicalConnection
3636
_physicalConnection = physicalConnection;
3737
}
3838

39-
public Task SendRequestWithEmptyResponse<TRequest>(TRequest request)
39+
public async Task SendRequestWithEmptyResponse<TRequest>(TRequest request)
4040
where TRequest : IRequest
4141
{
42-
return SendRequestImpl<TRequest, EmptyResponse>(request);
42+
await SendRequestImpl<TRequest, EmptyResponse>(request);
4343
}
4444

45-
public Task<DataResponse<TResponse[]>> SendRequest<TRequest, TResponse>(TRequest request)
45+
public async Task<DataResponse<TResponse[]>> SendRequest<TRequest, TResponse>(TRequest request)
4646
where TRequest : IRequest
4747
{
48-
return SendRequestImpl<TRequest, DataResponse<TResponse[]>>(request);
48+
return await SendRequestImpl<TRequest, DataResponse<TResponse[]>>(request);
4949
}
5050

5151
public TaskCompletionSource<MemoryStream> PopResponseCompletionSource(RequestId requestId, MemoryStream resultStream)
@@ -57,7 +57,22 @@ public TaskCompletionSource<MemoryStream> PopResponseCompletionSource(RequestId
5757
: null;
5858
}
5959

60-
public IReadOnlyList<TaskCompletionSource<MemoryStream>> PopAllResponseCompletionSources()
60+
public static byte[] ReadFully(Stream input)
61+
{
62+
input.Position = 0;
63+
var buffer = new byte[16*1024];
64+
using (var ms = new MemoryStream())
65+
{
66+
int read;
67+
while ((read = input.Read(buffer, 0, buffer.Length)) > 0)
68+
{
69+
ms.Write(buffer, 0, read);
70+
}
71+
return ms.ToArray();
72+
}
73+
}
74+
75+
public IEnumerable<TaskCompletionSource<MemoryStream>> PopAllResponseCompletionSources()
6176
{
6277
var result = _pendingRequests.Values.ToArray();
6378
_pendingRequests.Clear();
@@ -78,10 +93,10 @@ private async Task<TResponse> SendRequestImpl<TRequest, TResponse>(TRequest requ
7893
lock (_physicalConnection)
7994
{
8095
_logWriter?.WriteLine($"Begin sending request header buffer, requestId: {requestId}, code: {request.Code}, length: {headerBuffer.Length}");
81-
_physicalConnection.Write(headerBuffer, 0, Constants.PacketSizeBufferSize + (int) headerLength).GetAwaiter().GetResult();
96+
_physicalConnection.Write(headerBuffer, 0, Constants.PacketSizeBufferSize + (int) headerLength);
8297

8398
_logWriter?.WriteLine($"Begin sending request body buffer, length: {bodyBuffer.Length}");
84-
_physicalConnection.Write(bodyBuffer, 0, bodyBuffer.Length).GetAwaiter().GetResult();
99+
_physicalConnection.Write(bodyBuffer, 0, bodyBuffer.Length);
85100
}
86101

87102
try

src/tarantool.client/Model/ConnectionOptions.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,9 @@ private void Parse(string replicationSource, ILog log)
3030
}
3131
}
3232

33-
public int ReceiveBufferSize { get; set; } = 8192;
34-
public int SendBufferSize { get; set; } = 8192;
35-
public int SendTimeout { get; set; } = -1;
36-
public int ReceiveTimeout { get; set; } = -1;
33+
public int ReadStreamBufferSize { get; set; } = 4096;
34+
public int WriteNetworkTimeout { get; set; } = -1;
35+
public int ReadNetworkTimeout { get; set; } = -1;
3736
public List<TarantoolNode> Nodes { get; set; } = new List<TarantoolNode>();
3837
}
3938
}

src/tarantool.client/NetworkStreamPhysicalConnection.cs

Lines changed: 8 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
using System.IO;
33
using System.Linq;
44
using System.Net.Sockets;
5-
using System.Threading;
65
using System.Threading.Tasks;
76

87
using ProGaudi.Tarantool.Client.Model;
@@ -33,18 +32,16 @@ public async Task Connect(ClientOptions options)
3332
options.LogWriter?.WriteLine("Starting socket connection...");
3433
var singleNode = options.ConnectionOptions.Nodes.Single();
3534

36-
_socket = CreateSocket(options);
37-
35+
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
3836
await ConnectAsync(_socket, singleNode.Uri.Host, singleNode.Uri.Port);
3937
_stream = new NetworkStream(_socket, true);
4038
options.LogWriter?.WriteLine("Socket connection established.");
4139
}
4240

43-
public async Task Write(byte[] buffer, int offset, int count)
41+
public void Write(byte[] buffer, int offset, int count)
4442
{
4543
CheckConnectionStatus();
46-
47-
await _stream.WriteAsync(buffer, offset, count);
44+
_stream.Write(buffer, offset, count);
4845
}
4946

5047
public async Task Flush()
@@ -54,18 +51,10 @@ public async Task Flush()
5451
await _stream.FlushAsync();
5552
}
5653

57-
public async Task<int> Read(byte[] buffer, int offset, int count)
54+
public async Task<int> ReadAsync(byte[] buffer, int offset, int count)
5855
{
5956
CheckConnectionStatus();
60-
61-
var result = await _stream.ReadAsync(buffer, offset, count);
62-
63-
if (result == 0)
64-
{
65-
Interlocked.Exchange(ref _stream, null)?.Dispose();
66-
}
67-
68-
return result;
57+
return await _stream.ReadAsync(buffer, offset, count);
6958
}
7059

7160
#if PROGAUDI_NETCORE
@@ -109,38 +98,15 @@ private static Task ConnectAsync(Socket socket, string host, int port)
10998

11099
private void CheckConnectionStatus()
111100
{
112-
if (_disposed)
113-
{
114-
throw new ObjectDisposedException(nameof(NetworkStreamPhysicalConnection));
115-
}
116-
117101
if (_stream == null)
118102
{
119103
throw ExceptionHelper.NotConnected();
120104
}
121-
}
122105

123-
private static Socket CreateSocket(ClientOptions options)
124-
{
125-
var socket = new Socket(SocketType.Stream, ProtocolType.Tcp)
106+
if (_disposed)
126107
{
127-
ReceiveTimeout = options.ConnectionOptions.ReceiveTimeout,
128-
SendTimeout = options.ConnectionOptions.SendTimeout,
129-
ReceiveBufferSize = options.ConnectionOptions.ReceiveBufferSize,
130-
SendBufferSize = options.ConnectionOptions.SendBufferSize
131-
};
132-
133-
var size = sizeof(uint);
134-
uint on = 1;
135-
uint keepAliveInterval = 5000; //Send a packet once every 10 seconds.
136-
uint retryInterval = 500; //If no response, resend every second.
137-
var inArray = new byte[size * 3];
138-
Array.Copy(BitConverter.GetBytes(on), 0, inArray, 0, size);
139-
Array.Copy(BitConverter.GetBytes(keepAliveInterval), 0, inArray, size, size);
140-
Array.Copy(BitConverter.GetBytes(retryInterval), 0, inArray, size * 2, size);
141-
socket.IOControl(IOControlCode.KeepAliveValues, inArray, null);
142-
143-
return socket;
108+
throw new ObjectDisposedException(nameof(NetworkStreamPhysicalConnection));
109+
}
144110
}
145111
}
146112
}

src/tarantool.client/ResponseReader.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public ResponseReader(ILogicalConnection logicalConnection, ClientOptions client
3434
_physicalConnection = physicalConnection;
3535
_logicalConnection = logicalConnection;
3636
_clientOptions = clientOptions;
37-
_buffer = new byte[clientOptions.ConnectionOptions.ReceiveBufferSize];
37+
_buffer = new byte[clientOptions.ConnectionOptions.ReadStreamBufferSize];
3838
}
3939

4040
public void BeginReading()
@@ -43,7 +43,7 @@ public void BeginReading()
4343

4444
_clientOptions.LogWriter?.WriteLine($"Begin reading from connection to buffer, bytes count: {freeBufferSpace}");
4545

46-
var readingTask = _physicalConnection.Read(_buffer, _readingOffset, freeBufferSpace);
46+
var readingTask = _physicalConnection.ReadAsync(_buffer, _readingOffset, freeBufferSpace);
4747
readingTask.ContinueWith(EndReading);
4848
}
4949

src/tarantool.client/project.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"version": "0.4.2",
2+
"version": "0.4.3",
33
"name": "Tarantool.CSharp",
44
"description": "tarantool-client low-level client Library",
55
"authors": [

0 commit comments

Comments
 (0)