Skip to content

Commit

Permalink
feat(csharp/src/Drivers/Apache): add implementation for AdbcStatement…
Browse files Browse the repository at this point in the history
….SetOption on Spark driver (#1849)

Implement AdbcStatement.SetOption on Spark driver
* `"adbc.statement.polltime_milliseconds"` -> sets the poll time to
check for results to execute a statement.
* `"adbc.statement.batch_size"` -> sets the maximum size of a single
batch to receive.
  • Loading branch information
birschick-bq authored May 13, 2024
1 parent 73b8bda commit a7c1cc6
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 2 deletions.
37 changes: 35 additions & 2 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,21 @@ public override async Task<UpdateResult> ExecuteUpdateAsync()
return new UpdateResult(affectedRows ?? -1);
}

public override void SetOption(string key, string value)
{
switch (key)
{
case Options.PollTimeMilliseconds:
UpdatePollTimeIfValid(key, value);
break;
case Options.BatchSize:
UpdateBatchSizeIfValid(key, value);
break;
default:
throw AdbcException.NotImplemented($"Option '{key}' is not implemented.");
}
}

protected async Task ExecuteStatementAsync()
{
TExecuteStatementReq executeRequest = new TExecuteStatementReq(this.connection.sessionHandle, this.SqlQuery);
Expand Down Expand Up @@ -123,9 +138,27 @@ protected async ValueTask<Schema> GetSchemaAsync()
return SchemaParser.GetArrowSchema(response.Schema);
}

protected internal int PollTimeMilliseconds { get; } = PollTimeMillisecondsDefault;
protected internal int PollTimeMilliseconds { get; private set; } = PollTimeMillisecondsDefault;

protected internal int BatchSize { get; private set; } = BatchSizeDefault;

/// <summary>
/// Provides the constant string key values to the <see cref="AdbcStatement.SetOption(string, string)" /> method.
/// </summary>
public class Options
{
// Options common to all HiveServer2Statement-derived drivers go here
public const string PollTimeMilliseconds = "adbc.statement.polltime_milliseconds";
public const string BatchSize = "adbc.statement.batch_size";
}

private void UpdatePollTimeIfValid(string key, string value) => PollTimeMilliseconds = !string.IsNullOrEmpty(key) && int.TryParse(value, result: out int pollTimeMilliseconds) && pollTimeMilliseconds >= 0
? pollTimeMilliseconds
: throw new ArgumentException($"The value '{value}' for option '{key}' is invalid. Must be a numeric value greater than or equal to zero.", nameof(value));

protected internal int BatchSize { get; } = BatchSizeDefault;
private void UpdateBatchSizeIfValid(string key, string value) => BatchSize = !string.IsNullOrEmpty(value) && int.TryParse(value, out int batchSize) && batchSize > 0
? batchSize
: throw new ArgumentException($"The value '{value}' for option '{key}' is invalid. Must be a numeric value greater than zero.", nameof(value));

public override void Dispose()
{
Expand Down
8 changes: 8 additions & 0 deletions csharp/src/Drivers/Apache/Impala/ImpalaStatement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ public override object GetValue(IArrowArray arrowArray, int index)

protected override IArrowArrayStream NewReader<T>(T statement, Schema schema) => new HiveServer2Reader(statement, schema);

/// <summary>
/// Provides the constant string key values to the <see cref="AdbcStatement.SetOption(string, string)" /> method.
/// </summary>
public new sealed class Options : HiveServer2Statement.Options
{
// options specific to Impala go here
}

class HiveServer2Reader : IArrowArrayStream
{
HiveServer2Statement? statement;
Expand Down
8 changes: 8 additions & 0 deletions csharp/src/Drivers/Apache/Spark/SparkStatement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ protected override void SetStatementProperties(TExecuteStatementReq statement)

protected override IArrowArrayStream NewReader<T>(T statement, Schema schema) => new SparkReader(statement, schema);

/// <summary>
/// Provides the constant string key values to the <see cref="AdbcStatement.SetOption(string, string)" /> method.
/// </summary>
public new sealed class Options : HiveServer2Statement.Options
{
// options specific to Spark go here
}

sealed class SparkReader : IArrowArrayStream
{
HiveServer2Statement? statement;
Expand Down
106 changes: 106 additions & 0 deletions csharp/test/Drivers/Apache/Spark/StatementTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache.Spark;
using Apache.Arrow.Adbc.Tests.Xunit;
using Xunit;
using Xunit.Abstractions;

namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Spark
{
/// <summary>
/// Class for testing the Snowflake ADBC driver connection tests.
/// </summary>
/// <remarks>
/// Tests are ordered to ensure data is created for the other
/// queries to run.
/// </remarks>
[TestCaseOrderer("Apache.Arrow.Adbc.Tests.Xunit.TestOrderer", "Apache.Arrow.Adbc.Tests")]
public class StatementTests : SparkTestBase
{
private static List<string> DefaultTableTypes => new() { "BASE TABLE", "VIEW" };

public StatementTests(ITestOutputHelper? outputHelper) : base(outputHelper)
{
Skip.IfNot(Utils.CanExecuteTestConfig(TestConfigVariable));
}

/// <summary>
/// Validates if the SetOption handle valid/invalid data correctly for the PollTime option.
/// </summary>
[SkippableTheory]
[InlineData("-1", true)]
[InlineData("zero", true)]
[InlineData("-2147483648", true)]
[InlineData("2147483648", true)]
[InlineData("0")]
[InlineData("1")]
[InlineData("2147483647")]
public void CanSetOptionPollTime(string value, bool throws = false)
{
AdbcStatement statement = NewConnection().CreateStatement();
if (throws)
{
Assert.Throws<ArgumentException>(() => statement.SetOption(SparkStatement.Options.PollTimeMilliseconds, value));
}
else
{
statement.SetOption(SparkStatement.Options.PollTimeMilliseconds, value);
}
}

/// <summary>
/// Validates if the SetOption handle valid/invalid data correctly for the BatchSize option.
/// </summary>
[SkippableTheory]
[InlineData("-1", true)]
[InlineData("one", true)]
[InlineData("-2147483648", true)]
[InlineData("2147483648", true)]
[InlineData("0", true)]
[InlineData("1")]
[InlineData("2147483647")]
public void CanSetOptionBatchSize(string value, bool throws = false)
{
AdbcStatement statement = NewConnection().CreateStatement();
if (throws)
{
Assert.Throws<ArgumentException>(() => statement.SetOption(SparkStatement.Options.BatchSize, value));
}
else
{
statement.SetOption(SparkStatement.Options.BatchSize, value);
}
}

/// <summary>
/// Validates if the driver can execute update statements.
/// </summary>
[SkippableFact, Order(1)]
public async Task CanInteractUsingSetOptions()
{
const string columnName = "INDEX";
Statement.SetOption(SparkStatement.Options.PollTimeMilliseconds, "100");
Statement.SetOption(SparkStatement.Options.BatchSize, "10");
using TemporaryTable temporaryTable = await NewTemporaryTableAsync(Statement, $"{columnName} INT");
await ValidateInsertSelectDeleteSingleValueAsync(temporaryTable.TableName, columnName, 1);
}
}
}

0 comments on commit a7c1cc6

Please sign in to comment.