Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(csharp/src/Drivers/Apache/Spark): poc - Support for Apache Spark over HTTP (non-Arrow) #2018

Merged
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
7e189d1
Add support for username and password authentication
birschick-bq Jul 15, 2024
95a3fd9
Merge branch 'apache:main' into dev/birschick-bq/user-password-auth
birschick-bq Jul 15, 2024
e297113
work-in-progress - support non-Arrow results version 3.3.1
birschick-bq Jul 16, 2024
e4c5f30
make protocolVersion a parameter
birschick-bq Jul 16, 2024
586edcc
updates for HiveServer2Reader and test
birschick-bq Jul 17, 2024
308a78f
remove commented code
birschick-bq Jul 17, 2024
f83c7a6
small refactor/rename
birschick-bq Jul 17, 2024
e320463
Merge remote-tracking branch 'origin/main' into dev/birschick-bq/spar…
birschick-bq Jul 17, 2024
68ee762
refactor more GetRowSet
birschick-bq Jul 17, 2024
df54095
case-insensitive compare
birschick-bq Jul 17, 2024
137caf7
Merge branch 'apache:main' into dev/birschick-bq/user-password-auth
birschick-bq Jul 19, 2024
3e716b9
improvements from code review comments
birschick-bq Jul 19, 2024
c55abaf
improvements from code review comments
birschick-bq Jul 19, 2024
b23b6de
Merge branch 'dev/birschick-bq/user-password-auth' into dev/birschick…
birschick-bq Jul 22, 2024
d359c78
Fix merge error
birschick-bq Jul 22, 2024
92490c7
Fixed compile and test issues
birschick-bq Jul 22, 2024
ba0a966
Merge branch 'dev/birschick-bq/user-password-auth' into dev/birschick…
birschick-bq Jul 22, 2024
69131fa
Many more tests are passing
birschick-bq Jul 24, 2024
4af848c
Detect and handle null values in array columns.
birschick-bq Jul 25, 2024
8e35f95
Detect and handle null values in array columns.
birschick-bq Jul 25, 2024
ed56eb1
Add license.
birschick-bq Jul 25, 2024
828fd95
Improve clarity of tests with protocol/server versions.
birschick-bq Jul 26, 2024
c02241a
Fix test incompatibility.
birschick-bq Jul 26, 2024
bbe81a3
Add negative connection test for authentication and host.
birschick-bq Jul 26, 2024
33799d7
Merge branch 'dev/birschick-bq/user-password-auth' into dev/birschick…
birschick-bq Jul 26, 2024
344cf2a
Update documentation.
birschick-bq Jul 26, 2024
67c14e2
Refactor
birschick-bq Aug 12, 2024
42365d7
Refactor
birschick-bq Aug 12, 2024
c57ed56
correct project change
birschick-bq Aug 12, 2024
442d1c6
self code review #1
birschick-bq Aug 12, 2024
90c0538
self code review #2
birschick-bq Aug 12, 2024
6873bfa
self code review #3
birschick-bq Aug 12, 2024
942c07d
workaround Unsafe.As bug in .net 4.7.2
birschick-bq Aug 12, 2024
c42e5c0
fix alignment
birschick-bq Aug 12, 2024
068d5aa
fix alignment
birschick-bq Aug 12, 2024
be368d1
remove redundant
birschick-bq Aug 12, 2024
46e4c8d
formatting updates
birschick-bq Aug 12, 2024
e549e4a
remove unnecessary using
birschick-bq Aug 12, 2024
8c51e9c
add variant specific validation
birschick-bq Aug 14, 2024
199e558
invert compiler coditional
birschick-bq Aug 14, 2024
0253b64
update documentation
birschick-bq Aug 15, 2024
0b3b6ab
update for code review comments
birschick-bq Aug 30, 2024
9c6f3ed
update for code review comments #2
birschick-bq Aug 30, 2024
3c5a4db
update for code review comments #3
birschick-bq Sep 3, 2024
8102d4b
update for code review comments #4 - data type conversion option
birschick-bq Sep 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 47 additions & 37 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Ipc;
Expand All @@ -27,21 +26,21 @@

namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
{
public abstract class HiveServer2Connection : AdbcConnection
internal abstract class HiveServer2Connection : AdbcConnection
{
const string userAgent = "AdbcExperimental/0.0";
internal const long BatchSizeDefault = 50000;
internal const int PollTimeMillisecondsDefault = 500;
private const string userAgent = "AdbcExperimental/0.0";

protected TOperationHandle? operationHandle;
protected readonly IReadOnlyDictionary<string, string> properties;
internal TTransport? transport;
internal TCLIService.Client? client;
internal TSessionHandle? sessionHandle;
internal TTransport? _transport;
private TCLIService.Client? _client;
private readonly Lazy<string> _vendorVersion;
private readonly Lazy<string> _vendorName;

internal HiveServer2Connection(IReadOnlyDictionary<string, string> properties)
public HiveServer2Connection(IReadOnlyDictionary<string, string> properties)
{
this.properties = properties;
Properties = properties;
// Note: "LazyThreadSafetyMode.PublicationOnly" is thread-safe initialization where
// the first successful thread sets the value. If an exception is thrown, initialization
// will retry until it successfully returns a value without an exception.
Expand All @@ -50,29 +49,40 @@ internal HiveServer2Connection(IReadOnlyDictionary<string, string> properties)
_vendorName = new Lazy<string>(() => GetInfoTypeStringValue(TGetInfoType.CLI_DBMS_NAME), LazyThreadSafetyMode.PublicationOnly);
}

internal TCLIService.Client Client
public TCLIService.Client Client
{
get { return this.client ?? throw new InvalidOperationException("connection not open"); }
get { return _client ?? throw new InvalidOperationException("connection not open"); }
}

protected string VendorVersion => _vendorVersion.Value;
public string VendorVersion => _vendorVersion.Value;

protected string VendorName => _vendorName.Value;
public string VendorName => _vendorName.Value;

internal async Task OpenAsync()
{
TProtocol protocol = await CreateProtocolAsync();
this.transport = protocol.Transport;
this.client = new TCLIService.Client(protocol);
public IReadOnlyDictionary<string, string> Properties { get; }

var s0 = await this.client.OpenSession(CreateSessionRequest());
this.sessionHandle = s0.SessionHandle;
public async Task OpenAsync()
{
TTransport transport = await CreateTransportAsync();
TProtocol protocol = await CreateProtocolAsync(transport);
_transport = protocol.Transport;
_client = new TCLIService.Client(protocol);
TOpenSessionReq request = CreateSessionRequest();
TOpenSessionResp? session = await Client.OpenSession(request);
SessionHandle = session.SessionHandle;
}

protected abstract ValueTask<TProtocol> CreateProtocolAsync();
public TSessionHandle? SessionHandle { get; private set; }

protected abstract Task<TTransport> CreateTransportAsync();

protected abstract Task<TProtocol> CreateProtocolAsync(TTransport transport);

protected abstract TOpenSessionReq CreateSessionRequest();

public abstract SchemaParser SchemaParser { get; }

public abstract IArrowArrayStream NewReader<T>(T statement, Schema schema) where T : HiveServer2Statement;

public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string? catalogPattern, string? dbSchemaPattern, string? tableNamePattern, IReadOnlyList<string>? tableTypes, string? columnNamePattern)
{
throw new NotImplementedException();
Expand All @@ -83,22 +93,22 @@ public override IArrowArrayStream GetTableTypes()
throw new NotImplementedException();
}

protected void PollForResponse()
internal static async Task PollForResponseAsync(TOperationHandle operationHandle, TCLIService.IAsync client, int pollTimeMilliseconds)
{
TGetOperationStatusResp? statusResponse = null;
do
{
if (statusResponse != null) { Thread.Sleep(500); }
TGetOperationStatusReq request = new TGetOperationStatusReq(this.operationHandle);
statusResponse = this.Client.GetOperationStatus(request).Result;
if (statusResponse != null) { await Task.Delay(pollTimeMilliseconds); }
TGetOperationStatusReq request = new(operationHandle);
statusResponse = await client.GetOperationStatus(request);
} while (statusResponse.OperationState == TOperationState.PENDING_STATE || statusResponse.OperationState == TOperationState.RUNNING_STATE);
}

private string GetInfoTypeStringValue(TGetInfoType infoType)
{
TGetInfoReq req = new()
{
SessionHandle = this.sessionHandle ?? throw new InvalidOperationException("session not created"),
SessionHandle = SessionHandle ?? throw new InvalidOperationException("session not created"),
InfoType = infoType,
};

Expand All @@ -115,23 +125,23 @@ private string GetInfoTypeStringValue(TGetInfoType infoType)

public override void Dispose()
{
if (this.client != null)
if (_client != null)
{
TCloseSessionReq r6 = new TCloseSessionReq(this.sessionHandle);
this.client.CloseSession(r6).Wait();
TCloseSessionReq r6 = new TCloseSessionReq(SessionHandle);
_client.CloseSession(r6).Wait();

this.transport?.Close();
this.client.Dispose();
this.transport = null;
this.client = null;
_transport?.Close();
_client.Dispose();
_transport = null;
_client = null;
}
}

protected Schema GetSchema()
public static async Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TOperationHandle operationHandle, TCLIService.IAsync client, CancellationToken cancellationToken = default)
{
TGetResultSetMetadataReq request = new TGetResultSetMetadataReq(this.operationHandle);
TGetResultSetMetadataResp response = this.Client.GetResultSetMetadata(request).Result;
return SchemaParser.GetArrowSchema(response.Schema);
TGetResultSetMetadataReq request = new(operationHandle);
TGetResultSetMetadataResp response = await client.GetResultSetMetadata(request, cancellationToken);
return response;
}
}
}
87 changes: 87 additions & 0 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift.Protocol;
using Thrift.Transport.Client;
using Thrift;

namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
{
internal class HiveServer2Reader : IArrowArrayStream
{
HiveServer2Statement? _statement;
private readonly long _batchSize;
//int _counter;

public HiveServer2Reader(HiveServer2Statement statement, Schema schema, long batchSize = HiveServer2Connection.BatchSizeDefault)
{
_statement = statement;
Schema = schema;
_batchSize = batchSize;
}

public Schema Schema { get; }

public async ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
{
if (_statement == null)
{
return null;
}

var request = new TFetchResultsReq(_statement.OperationHandle, TFetchOrientation.FETCH_NEXT, _batchSize);
TFetchResultsResp response = await _statement.Connection.Client.FetchResults(request, cancellationToken);

int length = response.Results.Columns.Count > 0 ? GetArray(response.Results.Columns[0]).Length : 0;
var result = new RecordBatch(
Schema,
response.Results.Columns.Select(GetArray),
length);

if (!response.HasMoreRows)
{
_statement = null;
}

return result;
}

public void Dispose()
{
}

static IArrowArray GetArray(TColumn column)
{
return
(IArrowArray?)column.BoolVal?.Values ??
(IArrowArray?)column.ByteVal?.Values ??
(IArrowArray?)column.I16Val?.Values ??
(IArrowArray?)column.I32Val?.Values ??
(IArrowArray?)column.I64Val?.Values ??
(IArrowArray?)column.DoubleVal?.Values ??
(IArrowArray?)column.StringVal?.Values ??
(IArrowArray?)column.BinaryVal?.Values ??
throw new InvalidOperationException("unsupported data type");
}
}
}
65 changes: 27 additions & 38 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,42 @@
*/

using System;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;

namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
{
public abstract class HiveServer2Statement : AdbcStatement
internal abstract class HiveServer2Statement : AdbcStatement
{
private const int PollTimeMillisecondsDefault = 500;
private const int BatchSizeDefault = 50000;
protected internal HiveServer2Connection connection;
protected internal TOperationHandle? operationHandle;

protected HiveServer2Statement(HiveServer2Connection connection)
{
this.connection = connection;
Connection = connection;
}

protected virtual void SetStatementProperties(TExecuteStatementReq statement)
{
}

protected abstract IArrowArrayStream NewReader<T>(T statement, Schema schema) where T : HiveServer2Statement;

public override QueryResult ExecuteQuery() => ExecuteQueryAsync().AsTask().Result;

public override UpdateResult ExecuteUpdate() => ExecuteUpdateAsync().Result;

public override async ValueTask<QueryResult> ExecuteQueryAsync()
{
await ExecuteStatementAsync();
await PollForResponseAsync();
Schema schema = await GetSchemaAsync();
await HiveServer2Connection.PollForResponseAsync(OperationHandle!, Connection.Client, PollTimeMilliseconds);
Schema schema = await GetResultSetSchemaAsync(OperationHandle!, Connection.Client);

// TODO: Ensure this is set dynamically based on server capabilities
return new QueryResult(-1, NewReader(this, schema));
return new QueryResult(-1, Connection.NewReader(this, schema));
}

private async Task<Schema> GetResultSetSchemaAsync(TOperationHandle operationHandle, TCLIService.IAsync client, CancellationToken cancellationToken = default)
{
TGetResultSetMetadataResp response = await HiveServer2Connection.GetResultSetMetadataAsync(operationHandle, client, cancellationToken);
return Connection.SchemaParser.GetArrowSchema(response.Schema);
}

public override async Task<UpdateResult> ExecuteUpdateAsync()
Expand All @@ -73,7 +73,9 @@ public override async Task<UpdateResult> ExecuteUpdateAsync()
throw new AdbcException($"Unexpected data type for column: '{NumberOfAffectedRowsColumnName}'", new ArgumentException(NumberOfAffectedRowsColumnName));
}

// If no altered rows, i.e. DDC statements, then -1 is the default.
// The default is -1.
if (affectedRowsField == null) return new UpdateResult(-1);

long? affectedRows = null;
while (true)
{
Expand All @@ -88,6 +90,7 @@ public override async Task<UpdateResult> ExecuteUpdateAsync()
}
}

// If no altered rows, i.e. DDC statements, then -1 is the default.
return new UpdateResult(affectedRows ?? -1);
}

Expand All @@ -108,39 +111,25 @@ public override void SetOption(string key, string value)

protected async Task ExecuteStatementAsync()
{
TExecuteStatementReq executeRequest = new TExecuteStatementReq(this.connection.sessionHandle, this.SqlQuery);
TExecuteStatementReq executeRequest = new TExecuteStatementReq(Connection.SessionHandle, SqlQuery);
SetStatementProperties(executeRequest);
TExecuteStatementResp executeResponse = await this.connection.Client.ExecuteStatement(executeRequest);
TExecuteStatementResp executeResponse = await Connection.Client.ExecuteStatement(executeRequest);
if (executeResponse.Status.StatusCode == TStatusCode.ERROR_STATUS)
{
throw new HiveServer2Exception(executeResponse.Status.ErrorMessage)
.SetSqlState(executeResponse.Status.SqlState)
.SetNativeError(executeResponse.Status.ErrorCode);
}
this.operationHandle = executeResponse.OperationHandle;
OperationHandle = executeResponse.OperationHandle;
}

protected async Task PollForResponseAsync()
{
TGetOperationStatusResp? statusResponse = null;
do
{
if (statusResponse != null) { await Task.Delay(PollTimeMilliseconds); }
TGetOperationStatusReq request = new TGetOperationStatusReq(this.operationHandle);
statusResponse = await this.connection.Client.GetOperationStatus(request);
} while (statusResponse.OperationState == TOperationState.PENDING_STATE || statusResponse.OperationState == TOperationState.RUNNING_STATE);
}
protected internal int PollTimeMilliseconds { get; private set; } = HiveServer2Connection.PollTimeMillisecondsDefault;

protected async ValueTask<Schema> GetSchemaAsync()
{
TGetResultSetMetadataReq request = new TGetResultSetMetadataReq(this.operationHandle);
TGetResultSetMetadataResp response = await this.connection.Client.GetResultSetMetadata(request);
return SchemaParser.GetArrowSchema(response.Schema);
}
protected internal long BatchSize { get; private set; } = HiveServer2Connection.BatchSizeDefault;

protected internal int PollTimeMilliseconds { get; private set; } = PollTimeMillisecondsDefault;
public HiveServer2Connection Connection { get; private set; }

protected internal int BatchSize { get; private set; } = BatchSizeDefault;
public TOperationHandle? OperationHandle { get; private set; }

/// <summary>
/// Provides the constant string key values to the <see cref="AdbcStatement.SetOption(string, string)" /> method.
Expand All @@ -162,11 +151,11 @@ private void UpdateBatchSizeIfValid(string key, string value) => BatchSize = !st

public override void Dispose()
{
if (this.operationHandle != null)
if (OperationHandle != null)
{
TCloseOperationReq request = new TCloseOperationReq(this.operationHandle);
this.connection.Client.CloseOperation(request).Wait();
this.operationHandle = null;
TCloseOperationReq request = new TCloseOperationReq(OperationHandle);
Connection.Client.CloseOperation(request).Wait();
OperationHandle = null;
}

base.Dispose();
Expand Down
Loading
Loading