Skip to content

Commit

Permalink
add comments about listener lifetimes
Browse files Browse the repository at this point in the history
  • Loading branch information
birschick-bq committed Nov 8, 2024
1 parent 4af814a commit 9bf476c
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 12 deletions.
27 changes: 24 additions & 3 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Reflection;
using System.Text;
using System.Text.Json;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
using Apache.Arrow.Adbc.Drivers.Apache.Spark;
using Apache.Arrow.Adbc.Drivers.Apache.Thrift;
using Apache.Arrow.Adbc.Extensions;
using Apache.Arrow.Ipc;
using Apache.Arrow.Types;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift.Protocol;
using Thrift.Transport;
Expand All @@ -34,13 +44,15 @@ internal abstract class HiveServer2Connection : AdbcConnection
{
internal const long BatchSizeDefault = 50000;
internal const int PollTimeMillisecondsDefault = 500;
internal const string ProductVersionDefault = "1.0.0";

private TTransport? _transport;
private TCLIService.Client? _client;
private readonly Lazy<string> _vendorVersion;
private readonly Lazy<string> _vendorName;
private static readonly string s_activitySourceName = typeof(HiveServer2Connection).Assembly.GetName().Name!;
private static readonly string s_assemblyVersion = FileVersionInfo.GetVersionInfo(Assembly.GetExecutingAssembly().Location).ProductVersion ?? "1.0.0";

private static readonly string s_activitySourceName = Assembly.GetExecutingAssembly().GetName().Name!;
private static readonly string s_assemblyVersion = GetProductVersion();
private static readonly ConcurrentDictionary<string, ActivityListener> s_listeners = new();
private readonly ActivityListener? _listener;
private readonly ConcurrentQueue<Activity> _activityQueue = new();
Expand All @@ -52,6 +64,7 @@ internal HiveServer2Connection(IReadOnlyDictionary<string, string> properties)
// Key of listeners collection should be ouput file location
if (true)
{
// TODO: Determine the best handling of listener lifetimes.
_listener = s_listeners.GetOrAdd(s_activitySourceName, (_) => new()
{
ShouldListenTo = (source) => source.Name == s_activitySourceName,
Expand Down Expand Up @@ -202,7 +215,8 @@ public override void Dispose()
_transport = null;
_client = null;
}
_listener?.Dispose();
// TODO: Determine best approach to lifetime of ActivityListener
//_listener?.Dispose();
ActivitySource?.Dispose();
}

Expand Down Expand Up @@ -253,5 +267,12 @@ private Task DequeueAndWrite(string state)
private Activity? StartActivity(string methodName) => StartActivity(ActivitySource, methodName);

private static Activity? StartActivity(ActivitySource? activitySource, string methodName) => activitySource?.StartActivity(typeof(HiveServer2Connection).FullName + "." + methodName);


protected internal static string GetProductVersion()
{
FileVersionInfo fileVersionInfo = FileVersionInfo.GetVersionInfo(Assembly.GetExecutingAssembly().Location);
return fileVersionInfo.ProductVersion ?? ProductVersionDefault;
}
}
}
9 changes: 0 additions & 9 deletions csharp/src/Drivers/Apache/Spark/SparkConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ internal abstract class SparkConnection : HiveServer2Connection
AdbcInfoCode.VendorSql,
AdbcInfoCode.VendorVersion,
};

const string ProductVersionDefault = "1.0.0";
const string InfoDriverName = "ADBC Spark Driver";
const string InfoDriverArrowVersion = "1.0.0";
const bool InfoVendorSql = true;
Expand Down Expand Up @@ -957,13 +955,6 @@ private static string PatternToRegEx(string? pattern)
return builder.ToString();
}


private static string GetProductVersion()
{
FileVersionInfo fileVersionInfo = FileVersionInfo.GetVersionInfo(Assembly.GetExecutingAssembly().Location);
return fileVersionInfo.ProductVersion ?? ProductVersionDefault;
}

protected static Uri GetBaseAddress(string? uri, string? hostName, string? path, string? port)
{
// Uri property takes precedent.
Expand Down
26 changes: 26 additions & 0 deletions go/dice/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
module dice

go 1.23.2

require (
go.opentelemetry.io/contrib/bridges/otelslog v0.6.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0
go.opentelemetry.io/otel v1.31.0
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.7.0
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.31.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.31.0
go.opentelemetry.io/otel/log v0.7.0
go.opentelemetry.io/otel/metric v1.31.0
go.opentelemetry.io/otel/sdk v1.31.0
go.opentelemetry.io/otel/sdk/log v0.7.0
go.opentelemetry.io/otel/sdk/metric v1.31.0
)

require (
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
golang.org/x/sys v0.26.0 // indirect
)
45 changes: 45 additions & 0 deletions go/dice/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/contrib/bridges/otelslog v0.6.0 h1:V/XtFJ8mMisAO2E0tXcgwi40wJUxbiz8I2/RtgaZ8AU=
go.opentelemetry.io/contrib/bridges/otelslog v0.6.0/go.mod h1:g7kkoEznNXb0li+YvlwPWoqxTbpC3BtmZtZutB39G4M=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM=
go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY=
go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE=
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.7.0 h1:TwmL3O3fRR80m8EshBrd8YydEZMcUCsZXzOUlnFohwM=
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.7.0/go.mod h1:tH98dDv5KPmPThswbXA0fr0Lwfs+OhK8HgaCo7PjRrk=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.31.0 h1:HZgBIps9wH0RDrwjrmNa3DVbNRW60HEhdzqZFyAp3fI=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.31.0/go.mod h1:RDRhvt6TDG0eIXmonAx5bd9IcwpqCkziwkOClzWKwAQ=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.31.0 h1:UGZ1QwZWY67Z6BmckTU+9Rxn04m2bD3gD6Mk0OIOCPk=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.31.0/go.mod h1:fcwWuDuaObkkChiDlhEpSq9+X1C0omv+s5mBtToAQ64=
go.opentelemetry.io/otel/log v0.7.0 h1:d1abJc0b1QQZADKvfe9JqqrfmPYQCz2tUSO+0XZmuV4=
go.opentelemetry.io/otel/log v0.7.0/go.mod h1:2jf2z7uVfnzDNknKTO9G+ahcOAyWcp1fJmk/wJjULRo=
go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE=
go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY=
go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk=
go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0=
go.opentelemetry.io/otel/sdk/log v0.7.0 h1:dXkeI2S0MLc5g0/AwxTZv6EUEjctiH8aG14Am56NTmQ=
go.opentelemetry.io/otel/sdk/log v0.7.0/go.mod h1:oIRXpW+WD6M8BuGj5rtS0aRu/86cbDV/dAfNaZBIjYM=
go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc=
go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8=
go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys=
go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
84 changes: 84 additions & 0 deletions go/dice/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package main

import (
"context"
"errors"
"log"
"net"
"net/http"
"os"
"os/signal"
"time"

"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

func main() {
if err := run(); err != nil {
log.Fatalln(err)
}
}

func run() (err error) {
// Handle SIGINT (CTRL+C) gracefully.
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()

// Set up OpenTelemetry.
otelShutdown, err := setupOTelSDK(ctx)
if err != nil {
return
}
// Handle shutdown properly so nothing leaks.
defer func() {
err = errors.Join(err, otelShutdown(context.Background()))
}()

// Start HTTP server.
srv := &http.Server{
Addr: ":8080",
BaseContext: func(_ net.Listener) context.Context { return ctx },
ReadTimeout: time.Second,
WriteTimeout: 10 * time.Second,
Handler: newHTTPHandler(),
}
srvErr := make(chan error, 1)
go func() {
srvErr <- srv.ListenAndServe()
}()

// Wait for interruption.
select {
case err = <-srvErr:
// Error when starting HTTP server.
return
case <-ctx.Done():
// Wait for first CTRL+C.
// Stop receiving signal notifications as soon as possible.
stop()
}

// When Shutdown is called, ListenAndServe immediately returns ErrServerClosed.
err = srv.Shutdown(context.Background())
return
}

func newHTTPHandler() http.Handler {
mux := http.NewServeMux()

// handleFunc is a replacement for mux.HandleFunc
// which enriches the handler's HTTP instrumentation with the pattern as the http.route.
handleFunc := func(pattern string, handlerFunc func(http.ResponseWriter, *http.Request)) {
// Configure the "http.route" for the HTTP instrumentation.
handler := otelhttp.WithRouteTag(pattern, http.HandlerFunc(handlerFunc))
mux.Handle(pattern, handler)
}

// Register handlers.
handleFunc("/rolldice/", rolldice)
handleFunc("/rolldice/{player}", rolldice)

// Add HTTP instrumentation for the whole server.
handler := otelhttp.NewHandler(mux, "/")
return handler
}
122 changes: 122 additions & 0 deletions go/dice/otel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package main

import (
"context"
"errors"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/stdout/stdoutlog"
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/log"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/exporters/"
)

// setupOTelSDK bootstraps the OpenTelemetry pipeline.
// If it does not return an error, make sure to call shutdown for proper cleanup.
func setupOTelSDK(ctx context.Context) (shutdown func(context.Context) error, err error) {
var shutdownFuncs []func(context.Context) error

// shutdown calls cleanup functions registered via shutdownFuncs.
// The errors from the calls are joined.
// Each registered cleanup will be invoked once.
shutdown = func(ctx context.Context) error {
var err error
for _, fn := range shutdownFuncs {
err = errors.Join(err, fn(ctx))
}
shutdownFuncs = nil
return err
}

// handleErr calls shutdown for cleanup and makes sure that all errors are returned.
handleErr := func(inErr error) {
err = errors.Join(inErr, shutdown(ctx))
}

// Set up propagator.
prop := newPropagator()
otel.SetTextMapPropagator(prop)

// Set up trace provider.
tracerProvider, err := newTraceProvider()
if err != nil {
handleErr(err)
return
}
shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown)
otel.SetTracerProvider(tracerProvider)

// Set up meter provider.
meterProvider, err := newMeterProvider()
if err != nil {
handleErr(err)
return
}
shutdownFuncs = append(shutdownFuncs, meterProvider.Shutdown)
otel.SetMeterProvider(meterProvider)

// Set up logger provider.
loggerProvider, err := newLoggerProvider()
if err != nil {
handleErr(err)
return
}
shutdownFuncs = append(shutdownFuncs, loggerProvider.Shutdown)
global.SetLoggerProvider(loggerProvider)

return
}

func newPropagator() propagation.TextMapPropagator {
return propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
)
}

func newTraceProvider() (*trace.TracerProvider, error) {
traceExporter, err := stdouttrace.New(
stdouttrace.WithPrettyPrint())
if err != nil {
return nil, err
}

traceProvider := trace.NewTracerProvider(
trace.WithBatcher(traceExporter,
// Default is 5s. Set to 1s for demonstrative purposes.
trace.WithBatchTimeout(time.Second)),
)
return traceProvider, nil
}

func newMeterProvider() (*metric.MeterProvider, error) {
metricExporter, err := stdoutmetric.New()
if err != nil {
return nil, err
}

meterProvider := metric.NewMeterProvider(
metric.WithReader(metric.NewPeriodicReader(metricExporter,
// Default is 1m. Set to 3s for demonstrative purposes.
metric.WithInterval(3*time.Second))),
)
return meterProvider, nil
}

func newLoggerProvider() (*log.LoggerProvider, error) {
logExporter, err := stdoutlog.New()
if err != nil {
return nil, err
}

loggerProvider := log.NewLoggerProvider(
log.WithProcessor(log.NewBatchProcessor(logExporter)),
)
return loggerProvider, nil
}
Loading

0 comments on commit 9bf476c

Please sign in to comment.