Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(csharp/src/Drivers/Apache): add implementation for AdbcStatement.SetOption on Spark driver #1849

Merged
merged 5 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,21 @@ public override async Task<UpdateResult> ExecuteUpdateAsync()
return new UpdateResult(affectedRows ?? -1);
}

public override void SetOption(string key, string value)
{
switch (key)
{
case Options.PollTimeMilliseconds:
UpdatePollTimeIfValid(key, value);
break;
case Options.BatchSize:
UpdateBatchSizeIfValid(key, value);
break;
default:
throw AdbcException.NotImplemented($"Option '{key}' is not implemented.");
}
}

protected async Task ExecuteStatementAsync()
{
TExecuteStatementReq executeRequest = new TExecuteStatementReq(this.connection.sessionHandle, this.SqlQuery);
Expand Down Expand Up @@ -123,9 +138,27 @@ protected async ValueTask<Schema> GetSchemaAsync()
return SchemaParser.GetArrowSchema(response.Schema);
}

protected internal int PollTimeMilliseconds { get; } = PollTimeMillisecondsDefault;
protected internal int PollTimeMilliseconds { get; private set; } = PollTimeMillisecondsDefault;

protected internal int BatchSize { get; private set; } = BatchSizeDefault;

/// <summary>
/// Provides the constant string key values to the <see cref="AdbcStatement.SetOption(string, string)" /> method.
/// </summary>
public class Options
{
// Options common to all HiveServer2Statement-derived drivers go here
public const string PollTimeMilliseconds = "adbc.statement.polltime_milliseconds";
public const string BatchSize = "adbc.statement.batch_size";
}

private void UpdatePollTimeIfValid(string key, string value) => PollTimeMilliseconds = !string.IsNullOrEmpty(key) && int.TryParse(value, result: out int pollTimeMilliseconds) && pollTimeMilliseconds >= 0
? pollTimeMilliseconds
: throw new ArgumentException($"The value '{value}' for option '{key}' is invalid. Must be a numeric value greater than or equal to zero.", nameof(value));

protected internal int BatchSize { get; } = BatchSizeDefault;
private void UpdateBatchSizeIfValid(string key, string value) => BatchSize = !string.IsNullOrEmpty(value) && int.TryParse(value, out int batchSize) && batchSize > 0
? batchSize
: throw new ArgumentException($"The value '{value}' for option '{key}' is invalid. Must be a numeric value greater than zero.", nameof(value));

public override void Dispose()
{
Expand Down
8 changes: 8 additions & 0 deletions csharp/src/Drivers/Apache/Impala/ImpalaStatement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ public override object GetValue(IArrowArray arrowArray, int index)

protected override IArrowArrayStream NewReader<T>(T statement, Schema schema) => new HiveServer2Reader(statement, schema);

/// <summary>
/// Provides the constant string key values to the <see cref="AdbcStatement.SetOption(string, string)" /> method.
/// </summary>
public new sealed class Options : HiveServer2Statement.Options
{
// options specific to Impala go here
}

class HiveServer2Reader : IArrowArrayStream
{
HiveServer2Statement? statement;
Expand Down
8 changes: 8 additions & 0 deletions csharp/src/Drivers/Apache/Spark/SparkStatement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ protected override void SetStatementProperties(TExecuteStatementReq statement)

protected override IArrowArrayStream NewReader<T>(T statement, Schema schema) => new SparkReader(statement, schema);

/// <summary>
/// Provides the constant string key values to the <see cref="AdbcStatement.SetOption(string, string)" /> method.
/// </summary>
public new sealed class Options : HiveServer2Statement.Options
{
// options specific to Spark go here
}

sealed class SparkReader : IArrowArrayStream
{
HiveServer2Statement? statement;
Expand Down
106 changes: 106 additions & 0 deletions csharp/test/Drivers/Apache/Spark/StatementTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache.Spark;
using Apache.Arrow.Adbc.Tests.Xunit;
using Xunit;
using Xunit.Abstractions;

namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Spark
{
/// <summary>
/// Class for testing the Snowflake ADBC driver connection tests.
/// </summary>
/// <remarks>
/// Tests are ordered to ensure data is created for the other
/// queries to run.
/// </remarks>
[TestCaseOrderer("Apache.Arrow.Adbc.Tests.Xunit.TestOrderer", "Apache.Arrow.Adbc.Tests")]
public class StatementTests : SparkTestBase
{
private static List<string> DefaultTableTypes => new() { "BASE TABLE", "VIEW" };

public StatementTests(ITestOutputHelper? outputHelper) : base(outputHelper)
{
Skip.IfNot(Utils.CanExecuteTestConfig(TestConfigVariable));
}

/// <summary>
/// Validates if the SetOption handle valid/invalid data correctly for the PollTime option.
/// </summary>
[SkippableTheory]
[InlineData("-1", true)]
[InlineData("zero", true)]
[InlineData("-2147483648", true)]
[InlineData("2147483648", true)]
[InlineData("0")]
[InlineData("1")]
[InlineData("2147483647")]
public void CanSetOptionPollTime(string value, bool throws = false)
{
AdbcStatement statement = NewConnection().CreateStatement();
if (throws)
{
Assert.Throws<ArgumentException>(() => statement.SetOption(SparkStatement.Options.PollTimeMilliseconds, value));
}
else
{
statement.SetOption(SparkStatement.Options.PollTimeMilliseconds, value);
}
}

/// <summary>
/// Validates if the SetOption handle valid/invalid data correctly for the BatchSize option.
/// </summary>
[SkippableTheory]
[InlineData("-1", true)]
[InlineData("one", true)]
[InlineData("-2147483648", true)]
[InlineData("2147483648", true)]
[InlineData("0", true)]
[InlineData("1")]
[InlineData("2147483647")]
public void CanSetOptionBatchSize(string value, bool throws = false)
{
AdbcStatement statement = NewConnection().CreateStatement();
if (throws)
{
Assert.Throws<ArgumentException>(() => statement.SetOption(SparkStatement.Options.BatchSize, value));
}
else
{
statement.SetOption(SparkStatement.Options.BatchSize, value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these tests roundtrip and verify the option is set correctly?

}
}

/// <summary>
/// Validates if the driver can execute update statements.
/// </summary>
[SkippableFact, Order(1)]
public async Task CanInteractUsingSetOptions()
{
const string columnName = "INDEX";
Statement.SetOption(SparkStatement.Options.PollTimeMilliseconds, "100");
Statement.SetOption(SparkStatement.Options.BatchSize, "10");
using TemporaryTable temporaryTable = await NewTemporaryTableAsync(Statement, $"{columnName} INT");
await ValidateInsertSelectDeleteSingleValueAsync(temporaryTable.TableName, columnName, 1);
}
}
}
Loading