diff --git a/csharp/src/Drivers/Apache/ApacheParameters.cs b/csharp/src/Drivers/Apache/ApacheParameters.cs index 750bf29b88..a839290a72 100644 --- a/csharp/src/Drivers/Apache/ApacheParameters.cs +++ b/csharp/src/Drivers/Apache/ApacheParameters.cs @@ -24,6 +24,7 @@ public class ApacheParameters { public const string PollTimeMilliseconds = "adbc.apache.statement.polltime_ms"; public const string BatchSize = "adbc.apache.statement.batch_size"; + public const string BatchSizeStopCondition = "adbc.apache.statement.batch_size_stop_condition"; public const string QueryTimeoutSeconds = "adbc.apache.statement.query_timeout_s"; /// diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs index 08506b3bf3..f448d66835 100644 --- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs +++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs @@ -35,6 +35,7 @@ using Apache.Arrow.Ipc; using Apache.Arrow.Types; using Apache.Hive.Service.Rpc.Thrift; +using Thrift; using Thrift.Protocol; using Thrift.Transport; @@ -44,6 +45,7 @@ internal abstract class HiveServer2Connection : TracingConnection { internal const bool InfoVendorSql = true; internal const long BatchSizeDefault = 50000; + internal const bool EnableBatchSizeStopConditionDefault = false; internal const int PollTimeMillisecondsDefault = 500; internal static readonly string s_assemblyName = ApacheUtility.GetAssemblyName(typeof(HiveServer2Connection)); internal static readonly string s_assemblyVersion = ApacheUtility.GetAssemblyVersion(typeof(HiveServer2Connection)); @@ -1717,5 +1719,23 @@ private static void ThrowErrorResponse(TStatus status, AdbcStatusCode adbcStatus throw new HiveServer2Exception(status.ErrorMessage, adbcStatusCode) .SetSqlState(status.SqlState) .SetNativeError(status.ErrorCode); + + protected TConfiguration GetTconfiguration() + { + var thriftConfig = new TConfiguration(); + + Properties.TryGetValue(ThriftTransportSizeConstants.MaxMessageSize, out string? maxMessageSize); + if (int.TryParse(maxMessageSize, out int maxMessageSizeValue) && maxMessageSizeValue > 0) + { + thriftConfig.MaxMessageSize = maxMessageSizeValue; + } + + Properties.TryGetValue(ThriftTransportSizeConstants.MaxFrameSize, out string? maxFrameSize); + if (int.TryParse(maxFrameSize, out int maxFrameSizeValue) && maxFrameSizeValue > 0) + { + thriftConfig.MaxFrameSize = maxFrameSizeValue; + } + return thriftConfig; + } } } diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2ExtendedConnection.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2ExtendedConnection.cs index c45ace9289..2c3bff3232 100644 --- a/csharp/src/Drivers/Apache/Hive2/HiveServer2ExtendedConnection.cs +++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2ExtendedConnection.cs @@ -66,8 +66,7 @@ internal override IArrowArrayStream NewReader(T statement, Schema schema, IRe statement, schema, response, - dataTypeConversion: statement.Connection.DataTypeConversion, - enableBatchSizeStopCondition: false); + dataTypeConversion: statement.Connection.DataTypeConversion); internal override void SetPrecisionScaleAndTypeName( short colType, diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs index 56eabe479a..6cc80c51b2 100644 --- a/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs +++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2HttpConnection.cs @@ -150,7 +150,7 @@ protected override TTransport CreateTransport() httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity")); httpClient.DefaultRequestHeaders.ExpectContinue = false; - TConfiguration config = new(); + TConfiguration config = GetTconfiguration(); THttpTransport transport = new(httpClient, config) { // This value can only be set before the first call/request. So if a new value for query timeout diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Parameters.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2Parameters.cs index e4ae62069b..67f8136977 100644 --- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Parameters.cs +++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Parameters.cs @@ -65,6 +65,12 @@ public static class StandardTlsOptions public const string DisableServerCertificateValidation = "adbc.standard_options.tls.disable_server_certificate_validation"; } + public static class ThriftTransportSizeConstants + { + public const string MaxMessageSize = "adbc.apache.thrift.client.max.message.size"; + public const string MaxFrameSize = "adbc.apache.thrift.client.max.frame.size"; + } + public static class HttpProxyOptions { /// diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs index 80baadde5d..8adf7ee386 100644 --- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs +++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs @@ -60,8 +60,6 @@ internal class HiveServer2Reader : TracingReader private bool _disposed; private bool _hasNoMoreData = false; private readonly DataTypeConversion _dataTypeConversion; - // Flag to enable/disable stopping reading based on batch size condition - private readonly bool _enableBatchSizeStopCondition; private static readonly IReadOnlyDictionary> s_arrowStringConverters = new Dictionary>() { @@ -79,14 +77,12 @@ public HiveServer2Reader( IHiveServer2Statement statement, Schema schema, IResponse response, - DataTypeConversion dataTypeConversion, - bool enableBatchSizeStopCondition = true) : base(statement) + DataTypeConversion dataTypeConversion) : base(statement) { _statement = statement; Schema = schema; _response = response; _dataTypeConversion = dataTypeConversion; - _enableBatchSizeStopCondition = enableBatchSizeStopCondition; } public override Schema Schema { get; } @@ -114,7 +110,7 @@ public HiveServer2Reader( int rowCount = GetRowCount(response.Results, columnCount); activity?.AddEvent(SemanticConventions.Messaging.Batch.Response, [new(SemanticConventions.Db.Response.ReturnedRows, rowCount)]); - if ((_enableBatchSizeStopCondition && _statement.BatchSize > 0 && rowCount < _statement.BatchSize) || rowCount == 0) + if ((_statement.EnableBatchSizeStopCondition && _statement.BatchSize > 0 && rowCount < _statement.BatchSize) || rowCount == 0) { // This is the last batch _hasNoMoreData = true; diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2StandardConnection.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2StandardConnection.cs index 0387f980b4..974c07a1fd 100644 --- a/csharp/src/Drivers/Apache/Hive2/HiveServer2StandardConnection.cs +++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2StandardConnection.cs @@ -23,6 +23,7 @@ using System.Threading; using System.Threading.Tasks; using Apache.Hive.Service.Rpc.Thrift; +using Thrift; using Thrift.Protocol; using Thrift.Transport; using Thrift.Transport.Client; @@ -108,6 +109,8 @@ protected override TTransport CreateTransport() bool connectClient = false; int portValue = int.Parse(port!); + TConfiguration thriftConfig = GetTconfiguration(); + // TLS setup TTransport baseTransport; if (TlsOptions.IsTlsEnabled) @@ -120,16 +123,16 @@ protected override TTransport CreateTransport() if (IPAddress.TryParse(hostName!, out var ipAddress)) { - baseTransport = new TTlsSocketTransport(ipAddress, portValue, config: new(), 0, trustedCert, certValidator); + baseTransport = new TTlsSocketTransport(ipAddress, portValue, config: thriftConfig, 0, trustedCert, certValidator); } else { - baseTransport = new TTlsSocketTransport(hostName!, portValue, config: new(), 0, trustedCert, certValidator); + baseTransport = new TTlsSocketTransport(hostName!, portValue, config: thriftConfig, 0, trustedCert, certValidator); } } else { - baseTransport = new TSocketTransport(hostName!, portValue, connectClient, config: new()); + baseTransport = new TSocketTransport(hostName!, portValue, connectClient, config: thriftConfig); } TBufferedTransport bufferedTransport = new TBufferedTransport(baseTransport); @@ -148,7 +151,7 @@ protected override TTransport CreateTransport() } PlainSaslMechanism saslMechanism = new(username, password); - TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: new()); + TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: thriftConfig); return new TFramedTransport(saslTransport); default: diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs index bbaee0c0bf..25e14f1cc4 100644 --- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs +++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs @@ -271,6 +271,12 @@ public override void SetOption(string key, string value) case ApacheParameters.BatchSize: UpdateBatchSizeIfValid(key, value); break; + case ApacheParameters.BatchSizeStopCondition: + if (ApacheUtility.BooleanIsValid(key, value, out bool enableBatchSizeStopCondition)) + { + EnableBatchSizeStopCondition = enableBatchSizeStopCondition; + } + break; case ApacheParameters.QueryTimeoutSeconds: if (ApacheUtility.QueryTimeoutIsValid(key, value, out int queryTimeoutSeconds)) { @@ -352,6 +358,8 @@ protected async Task ExecuteStatementAsync(CancellationToken cancella public virtual long BatchSize { get; protected set; } = HiveServer2Connection.BatchSizeDefault; + public bool EnableBatchSizeStopCondition { get; protected set; } = HiveServer2Connection.EnableBatchSizeStopConditionDefault; + public int QueryTimeoutSeconds { // Coordinate updates with the connection diff --git a/csharp/src/Drivers/Apache/Hive2/IHiveServer2Statement.cs b/csharp/src/Drivers/Apache/Hive2/IHiveServer2Statement.cs index c405f40b64..19a5609117 100644 --- a/csharp/src/Drivers/Apache/Hive2/IHiveServer2Statement.cs +++ b/csharp/src/Drivers/Apache/Hive2/IHiveServer2Statement.cs @@ -54,6 +54,11 @@ internal interface IHiveServer2Statement : ITracingStatement /// long BatchSize { get; } + /// + /// Flag to enable/disable stopping reading based on batch size condition + /// + bool EnableBatchSizeStopCondition { get; } + /// /// Gets the connection associated with this statement. /// diff --git a/csharp/src/Drivers/Apache/Hive2/README.md b/csharp/src/Drivers/Apache/Hive2/README.md index 9404cdf88b..0bdf127168 100644 --- a/csharp/src/Drivers/Apache/Hive2/README.md +++ b/csharp/src/Drivers/Apache/Hive2/README.md @@ -65,6 +65,7 @@ but can also be passed in the call to `AdbcDatabase.Connect`. | `adbc.proxy_options.proxy_uid` | Username for proxy authentication. Only feature-complete in Spark driver. Required when proxy_auth is True | | | `adbc.proxy_options.proxy_pwd` | Password for proxy authentication. Only feature-complete in Spark driver. Required when proxy_auth is True | | | `adbc.telemetry.trace_parent` | The [trace parent](https://www.w3.org/TR/trace-context/#traceparent-header) identifier for an existing [trace context](https://www.w3.org/TR/trace-context/) \(span/activity\) in a tracing system. This option is most likely to be set using `Statement.SetOption` to set the trace parent for driver interaction with a specific `Statement`. However, it can also be set using `Driver.Open`, `Database.Connect` or `Connection.SetOption` to set the trace parent for all interactions with the driver on that specific `Connection`. | | +| `adbc.apache.statement.batch_size_stop_condition` | Flag to enable/disable stopping reading based on batch size condition | `False` | ## Timeout Configuration diff --git a/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs b/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs index 91dd3aee93..8458cd50f0 100644 --- a/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs +++ b/csharp/src/Drivers/Apache/Impala/ImpalaHttpConnection.cs @@ -155,7 +155,7 @@ protected override TTransport CreateTransport() httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity")); httpClient.DefaultRequestHeaders.ExpectContinue = false; - TConfiguration config = new(); + TConfiguration config = GetTconfiguration(); THttpTransport transport = new(httpClient, config) { // This value can only be set before the first call/request. So if a new value for query timeout diff --git a/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs b/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs index 0e1bef1113..457915d03a 100644 --- a/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs +++ b/csharp/src/Drivers/Apache/Impala/ImpalaStandardConnection.cs @@ -25,6 +25,7 @@ using Apache.Arrow.Adbc.Drivers.Apache.Hive2; using Apache.Arrow.Ipc; using Apache.Hive.Service.Rpc.Thrift; +using Thrift; using Thrift.Protocol; using Thrift.Transport; using Thrift.Transport.Client; @@ -113,22 +114,23 @@ protected override TTransport CreateTransport() // Delay the open connection until later. bool connectClient = false; + TConfiguration thriftConfig = GetTconfiguration(); TTransport transport; if (TlsOptions.IsTlsEnabled) { RemoteCertificateValidationCallback certValidator = (sender, cert, chain, errors) => HiveServer2TlsImpl.ValidateCertificate(cert, errors, TlsOptions); if (IPAddress.TryParse(hostName!, out var address)) { - transport = new TTlsSocketTransport(address!, int.Parse(port!), config: new(), 0, null, certValidator: certValidator); + transport = new TTlsSocketTransport(address!, int.Parse(port!), config: thriftConfig, 0, null, certValidator: certValidator); } else { - transport = new TTlsSocketTransport(hostName!, int.Parse(port!), config: new(), 0, null, certValidator: certValidator); + transport = new TTlsSocketTransport(hostName!, int.Parse(port!), config: thriftConfig, 0, null, certValidator: certValidator); } } else { - transport = new TSocketTransport(hostName!, int.Parse(port!), connectClient, config: new()); + transport = new TSocketTransport(hostName!, int.Parse(port!), connectClient, config: thriftConfig); } TBufferedTransport bufferedTransport = new(transport); @@ -146,7 +148,7 @@ protected override TTransport CreateTransport() } PlainSaslMechanism saslMechanism = new(username, password); - TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: new()); + TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: thriftConfig); return new TFramedTransport(saslTransport); default: diff --git a/csharp/src/Drivers/Apache/Impala/README.md b/csharp/src/Drivers/Apache/Impala/README.md index f2567f56f1..318c824cf1 100644 --- a/csharp/src/Drivers/Apache/Impala/README.md +++ b/csharp/src/Drivers/Apache/Impala/README.md @@ -63,6 +63,7 @@ but can also be passed in the call to `AdbcDatabase.Connect`. Options beginning | `adbc.standard_options.tls.allow_hostname_mismatch` | If hostname mismatch is allowed for ssl. One of `True`, `False` | `False` | | `adbc.standard_options.tls.trusted_certificate_path` | The full path of the tls/ssl certificate .pem file containing custom CA certificates for verifying the server when connecting over TLS | | | `adbc.telemetry.trace_parent` | The [trace parent](https://www.w3.org/TR/trace-context/#traceparent-header) identifier for an existing [trace context](https://www.w3.org/TR/trace-context/) \(span/activity\) in a tracing system. This option is most likely to be set using `Statement.SetOption` to set the trace parent for driver interaction with a specific `Statement`. However, it can also be set using `Driver.Open`, `Database.Connect` or `Connection.SetOption` to set the trace parent for all interactions with the driver on that specific `Connection`. | | +| `adbc.apache.statement.batch_size_stop_condition` | Flag to enable/disable stopping reading based on batch size condition | `False` | ## Timeout Configuration diff --git a/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs b/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs index de5492175a..1aff9cc70b 100644 --- a/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs +++ b/csharp/src/Drivers/Apache/Spark/SparkHttpConnection.cs @@ -182,7 +182,7 @@ protected override TTransport CreateTransport() httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity")); httpClient.DefaultRequestHeaders.ExpectContinue = false; - TConfiguration config = new(); + TConfiguration config = GetTconfiguration(); THttpTransport transport = new(httpClient, config) { // This value can only be set before the first call/request. So if a new value for query timeout diff --git a/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs b/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs index 46faab96e0..d7c61ead9d 100644 --- a/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs +++ b/csharp/src/Drivers/Apache/Spark/SparkStandardConnection.cs @@ -25,6 +25,7 @@ using Apache.Arrow.Adbc.Drivers.Apache.Hive2; using Apache.Arrow.Ipc; using Apache.Hive.Service.Rpc.Thrift; +using Thrift; using Thrift.Protocol; using Thrift.Transport; using Thrift.Transport.Client; @@ -116,6 +117,7 @@ protected override TTransport CreateTransport() // Delay the open connection until later. bool connectClient = false; int portValue = int.Parse(port!); + TConfiguration thriftConfig = GetTconfiguration(); // TLS setup TTransport baseTransport; @@ -129,16 +131,16 @@ protected override TTransport CreateTransport() if (IPAddress.TryParse(hostName!, out var ipAddress)) { - baseTransport = new TTlsSocketTransport(ipAddress, portValue, config: new(), 0, trustedCert, certValidator); + baseTransport = new TTlsSocketTransport(ipAddress, portValue, config: thriftConfig, 0, trustedCert, certValidator); } else { - baseTransport = new TTlsSocketTransport(hostName!, portValue, config: new(), 0, trustedCert, certValidator); + baseTransport = new TTlsSocketTransport(hostName!, portValue, config: thriftConfig, 0, trustedCert, certValidator); } } else { - baseTransport = new TSocketTransport(hostName!, portValue, connectClient, config: new()); + baseTransport = new TSocketTransport(hostName!, portValue, connectClient, config: thriftConfig); } TBufferedTransport bufferedTransport = new TBufferedTransport(baseTransport); switch (authTypeValue) @@ -156,7 +158,7 @@ protected override TTransport CreateTransport() } PlainSaslMechanism saslMechanism = new(username, password); - TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: new()); + TSaslTransport saslTransport = new(bufferedTransport, saslMechanism, config: thriftConfig); return new TFramedTransport(saslTransport); default: