Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions csharp/src/Drivers/Apache/ApacheParameters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class ApacheParameters
{
public const string PollTimeMilliseconds = "adbc.apache.statement.polltime_ms";
public const string BatchSize = "adbc.apache.statement.batch_size";
public const string BatchSizeStopCondition = "adbc.apache.statement.batch_size_stop_condition";
public const string QueryTimeoutSeconds = "adbc.apache.statement.query_timeout_s";

/// <summary>
Expand Down
20 changes: 20 additions & 0 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
using Apache.Arrow.Ipc;
using Apache.Arrow.Types;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift;
using Thrift.Protocol;
using Thrift.Transport;

Expand All @@ -44,6 +45,7 @@ internal abstract class HiveServer2Connection : TracingConnection
{
internal const bool InfoVendorSql = true;
internal const long BatchSizeDefault = 50000;
internal const bool EnableBatchSizeStopConditionDefault = false;
internal const int PollTimeMillisecondsDefault = 500;
internal static readonly string s_assemblyName = ApacheUtility.GetAssemblyName(typeof(HiveServer2Connection));
internal static readonly string s_assemblyVersion = ApacheUtility.GetAssemblyVersion(typeof(HiveServer2Connection));
Expand Down Expand Up @@ -1717,5 +1719,23 @@ private static void ThrowErrorResponse(TStatus status, AdbcStatusCode adbcStatus
throw new HiveServer2Exception(status.ErrorMessage, adbcStatusCode)
.SetSqlState(status.SqlState)
.SetNativeError(status.ErrorCode);

protected TConfiguration GetTconfiguration()
{
var thriftConfig = new TConfiguration();

Properties.TryGetValue(ThriftTransportSizeConstants.MaxMessageSize, out string? maxMessageSize);
if (int.TryParse(maxMessageSize, out int maxMessageSizeValue) && maxMessageSizeValue > 0)
{
thriftConfig.MaxMessageSize = maxMessageSizeValue;
}

Properties.TryGetValue(ThriftTransportSizeConstants.MaxFrameSize, out string? maxFrameSize);
if (int.TryParse(maxFrameSize, out int maxFrameSizeValue) && maxFrameSizeValue > 0)
{
thriftConfig.MaxFrameSize = maxFrameSizeValue;
}
return thriftConfig;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ internal override IArrowArrayStream NewReader<T>(T statement, Schema schema, IRe
statement,
schema,
response,
dataTypeConversion: statement.Connection.DataTypeConversion,
enableBatchSizeStopCondition: false);
dataTypeConversion: statement.Connection.DataTypeConversion);

internal override void SetPrecisionScaleAndTypeName(
short colType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ protected override TTransport CreateTransport()
httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity"));
httpClient.DefaultRequestHeaders.ExpectContinue = false;

TConfiguration config = new();
TConfiguration config = GetTconfiguration();
THttpTransport transport = new(httpClient, config)
{
// This value can only be set before the first call/request. So if a new value for query timeout
Expand Down
6 changes: 6 additions & 0 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Parameters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ public static class StandardTlsOptions
public const string DisableServerCertificateValidation = "adbc.standard_options.tls.disable_server_certificate_validation";
}

public static class ThriftTransportSizeConstants
{
public const string MaxMessageSize = "adbc.apache.thrift.client.max.message.size";
public const string MaxFrameSize = "adbc.apache.thrift.client.max.frame.size";
}

public static class HttpProxyOptions
{
/// <summary>
Expand Down
8 changes: 2 additions & 6 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ internal class HiveServer2Reader : TracingReader
private bool _disposed;
private bool _hasNoMoreData = false;
private readonly DataTypeConversion _dataTypeConversion;
// Flag to enable/disable stopping reading based on batch size condition
private readonly bool _enableBatchSizeStopCondition;
private static readonly IReadOnlyDictionary<ArrowTypeId, Func<StringArray, IArrowType, IArrowArray>> s_arrowStringConverters =
new Dictionary<ArrowTypeId, Func<StringArray, IArrowType, IArrowArray>>()
{
Expand All @@ -79,14 +77,12 @@ public HiveServer2Reader(
IHiveServer2Statement statement,
Schema schema,
IResponse response,
DataTypeConversion dataTypeConversion,
bool enableBatchSizeStopCondition = true) : base(statement)
DataTypeConversion dataTypeConversion) : base(statement)
{
_statement = statement;
Schema = schema;
_response = response;
_dataTypeConversion = dataTypeConversion;
_enableBatchSizeStopCondition = enableBatchSizeStopCondition;
}

public override Schema Schema { get; }
Expand Down Expand Up @@ -114,7 +110,7 @@ public HiveServer2Reader(
int rowCount = GetRowCount(response.Results, columnCount);
activity?.AddEvent(SemanticConventions.Messaging.Batch.Response, [new(SemanticConventions.Db.Response.ReturnedRows, rowCount)]);

if ((_enableBatchSizeStopCondition && _statement.BatchSize > 0 && rowCount < _statement.BatchSize) || rowCount == 0)
if ((_statement.EnableBatchSizeStopCondition && _statement.BatchSize > 0 && rowCount < _statement.BatchSize) || rowCount == 0)
{
// This is the last batch
_hasNoMoreData = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
using System.Threading;
using System.Threading.Tasks;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift;
using Thrift.Protocol;
using Thrift.Transport;
using Thrift.Transport.Client;
Expand Down Expand Up @@ -108,6 +109,8 @@ protected override TTransport CreateTransport()
bool connectClient = false;
int portValue = int.Parse(port!);

TConfiguration thriftConfig = GetTconfiguration();

// TLS setup
TTransport baseTransport;
if (TlsOptions.IsTlsEnabled)
Expand All @@ -120,16 +123,16 @@ protected override TTransport CreateTransport()

if (IPAddress.TryParse(hostName!, out var ipAddress))
{
baseTransport = new TTlsSocketTransport(ipAddress, portValue, config: new(), 0, trustedCert, certValidator);
baseTransport = new TTlsSocketTransport(ipAddress, portValue, config: thriftConfig, 0, trustedCert, certValidator);
}
else
{
baseTransport = new TTlsSocketTransport(hostName!, portValue, config: new(), 0, trustedCert, certValidator);
baseTransport = new TTlsSocketTransport(hostName!, portValue, config: thriftConfig, 0, trustedCert, certValidator);
}
}
else
{
baseTransport = new TSocketTransport(hostName!, portValue, connectClient, config: new());
baseTransport = new TSocketTransport(hostName!, portValue, connectClient, config: thriftConfig);
}

TBufferedTransport bufferedTransport = new TBufferedTransport(baseTransport);
Expand All @@ -148,7 +151,7 @@ protected override TTransport CreateTransport()
}

PlainSaslMechanism saslMechanism = new(username, password);
TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: new());
TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: thriftConfig);
return new TFramedTransport(saslTransport);

default:
Expand Down
8 changes: 8 additions & 0 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,12 @@ public override void SetOption(string key, string value)
case ApacheParameters.BatchSize:
UpdateBatchSizeIfValid(key, value);
break;
case ApacheParameters.BatchSizeStopCondition:
if (ApacheUtility.BooleanIsValid(key, value, out bool enableBatchSizeStopCondition))
{
EnableBatchSizeStopCondition = enableBatchSizeStopCondition;
}
break;
case ApacheParameters.QueryTimeoutSeconds:
if (ApacheUtility.QueryTimeoutIsValid(key, value, out int queryTimeoutSeconds))
{
Expand Down Expand Up @@ -352,6 +358,8 @@ protected async Task<IResponse> ExecuteStatementAsync(CancellationToken cancella

public virtual long BatchSize { get; protected set; } = HiveServer2Connection.BatchSizeDefault;

public bool EnableBatchSizeStopCondition { get; protected set; } = HiveServer2Connection.EnableBatchSizeStopConditionDefault;

public int QueryTimeoutSeconds
{
// Coordinate updates with the connection
Expand Down
5 changes: 5 additions & 0 deletions csharp/src/Drivers/Apache/Hive2/IHiveServer2Statement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ internal interface IHiveServer2Statement : ITracingStatement
/// </summary>
long BatchSize { get; }

/// <summary>
/// Flag to enable/disable stopping reading based on batch size condition
/// </summary>
bool EnableBatchSizeStopCondition { get; }

/// <summary>
/// Gets the connection associated with this statement.
/// </summary>
Expand Down
1 change: 1 addition & 0 deletions csharp/src/Drivers/Apache/Hive2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ but can also be passed in the call to `AdbcDatabase.Connect`.
| `adbc.proxy_options.proxy_uid` | Username for proxy authentication. Only feature-complete in Spark driver. Required when proxy_auth is True | |
| `adbc.proxy_options.proxy_pwd` | Password for proxy authentication. Only feature-complete in Spark driver. Required when proxy_auth is True | |
| `adbc.telemetry.trace_parent` | The [trace parent](https://www.w3.org/TR/trace-context/#traceparent-header) identifier for an existing [trace context](https://www.w3.org/TR/trace-context/) \(span/activity\) in a tracing system. This option is most likely to be set using `Statement.SetOption` to set the trace parent for driver interaction with a specific `Statement`. However, it can also be set using `Driver.Open`, `Database.Connect` or `Connection.SetOption` to set the trace parent for all interactions with the driver on that specific `Connection`. | |
| `adbc.apache.statement.batch_size_stop_condition` | Flag to enable/disable stopping reading based on batch size condition | `False` |

## Timeout Configuration

Expand Down
2 changes: 1 addition & 1 deletion csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ protected override TTransport CreateTransport()
httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity"));
httpClient.DefaultRequestHeaders.ExpectContinue = false;

TConfiguration config = new();
TConfiguration config = GetTconfiguration();
THttpTransport transport = new(httpClient, config)
{
// This value can only be set before the first call/request. So if a new value for query timeout
Expand Down
10 changes: 6 additions & 4 deletions csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift;
using Thrift.Protocol;
using Thrift.Transport;
using Thrift.Transport.Client;
Expand Down Expand Up @@ -113,22 +114,23 @@ protected override TTransport CreateTransport()

// Delay the open connection until later.
bool connectClient = false;
TConfiguration thriftConfig = GetTconfiguration();
TTransport transport;
if (TlsOptions.IsTlsEnabled)
{
RemoteCertificateValidationCallback certValidator = (sender, cert, chain, errors) => HiveServer2TlsImpl.ValidateCertificate(cert, errors, TlsOptions);
if (IPAddress.TryParse(hostName!, out var address))
{
transport = new TTlsSocketTransport(address!, int.Parse(port!), config: new(), 0, null, certValidator: certValidator);
transport = new TTlsSocketTransport(address!, int.Parse(port!), config: thriftConfig, 0, null, certValidator: certValidator);
}
else
{
transport = new TTlsSocketTransport(hostName!, int.Parse(port!), config: new(), 0, null, certValidator: certValidator);
transport = new TTlsSocketTransport(hostName!, int.Parse(port!), config: thriftConfig, 0, null, certValidator: certValidator);
}
}
else
{
transport = new TSocketTransport(hostName!, int.Parse(port!), connectClient, config: new());
transport = new TSocketTransport(hostName!, int.Parse(port!), connectClient, config: thriftConfig);
}

TBufferedTransport bufferedTransport = new(transport);
Expand All @@ -146,7 +148,7 @@ protected override TTransport CreateTransport()
}

PlainSaslMechanism saslMechanism = new(username, password);
TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: new());
TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: thriftConfig);
return new TFramedTransport(saslTransport);

default:
Expand Down
1 change: 1 addition & 0 deletions csharp/src/Drivers/Apache/Impala/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ but can also be passed in the call to `AdbcDatabase.Connect`. Options beginning
| `adbc.standard_options.tls.allow_hostname_mismatch` | If hostname mismatch is allowed for ssl. One of `True`, `False` | `False` |
| `adbc.standard_options.tls.trusted_certificate_path` | The full path of the tls/ssl certificate .pem file containing custom CA certificates for verifying the server when connecting over TLS | |
| `adbc.telemetry.trace_parent` | The [trace parent](https://www.w3.org/TR/trace-context/#traceparent-header) identifier for an existing [trace context](https://www.w3.org/TR/trace-context/) \(span/activity\) in a tracing system. This option is most likely to be set using `Statement.SetOption` to set the trace parent for driver interaction with a specific `Statement`. However, it can also be set using `Driver.Open`, `Database.Connect` or `Connection.SetOption` to set the trace parent for all interactions with the driver on that specific `Connection`. | |
| `adbc.apache.statement.batch_size_stop_condition` | Flag to enable/disable stopping reading based on batch size condition | `False` |

## Timeout Configuration

Expand Down
2 changes: 1 addition & 1 deletion csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ protected override TTransport CreateTransport()
httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity"));
httpClient.DefaultRequestHeaders.ExpectContinue = false;

TConfiguration config = new();
TConfiguration config = GetTconfiguration();
THttpTransport transport = new(httpClient, config)
{
// This value can only be set before the first call/request. So if a new value for query timeout
Expand Down
10 changes: 6 additions & 4 deletions csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift;
using Thrift.Protocol;
using Thrift.Transport;
using Thrift.Transport.Client;
Expand Down Expand Up @@ -116,6 +117,7 @@ protected override TTransport CreateTransport()
// Delay the open connection until later.
bool connectClient = false;
int portValue = int.Parse(port!);
TConfiguration thriftConfig = GetTconfiguration();

// TLS setup
TTransport baseTransport;
Expand All @@ -129,16 +131,16 @@ protected override TTransport CreateTransport()

if (IPAddress.TryParse(hostName!, out var ipAddress))
{
baseTransport = new TTlsSocketTransport(ipAddress, portValue, config: new(), 0, trustedCert, certValidator);
baseTransport = new TTlsSocketTransport(ipAddress, portValue, config: thriftConfig, 0, trustedCert, certValidator);
}
else
{
baseTransport = new TTlsSocketTransport(hostName!, portValue, config: new(), 0, trustedCert, certValidator);
baseTransport = new TTlsSocketTransport(hostName!, portValue, config: thriftConfig, 0, trustedCert, certValidator);
}
}
else
{
baseTransport = new TSocketTransport(hostName!, portValue, connectClient, config: new());
baseTransport = new TSocketTransport(hostName!, portValue, connectClient, config: thriftConfig);
}
TBufferedTransport bufferedTransport = new TBufferedTransport(baseTransport);
switch (authTypeValue)
Expand All @@ -156,7 +158,7 @@ protected override TTransport CreateTransport()
}

PlainSaslMechanism saslMechanism = new(username, password);
TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: new());
TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: thriftConfig);
return new TFramedTransport(saslTransport);

default:
Expand Down
Loading