Skip to content

Commit

Permalink
Add Elasticsearch Log Exporter with tests (#444)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkSeufert authored Dec 22, 2020
1 parent ddec444 commit f462e41
Show file tree
Hide file tree
Showing 8 changed files with 667 additions and 0 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ option(WITH_OTLP "Whether to include the OpenTelemetry Protocol in the SDK" OFF)
option(WITH_PROMETHEUS "Whether to include the Prometheus Client in the SDK"
OFF)

option(WITH_ELASTICSEARCH
"Whether to include the Elasticsearch Client in the SDK" OFF)

option(BUILD_TESTING "Whether to enable tests" ON)
option(WITH_EXAMPLES "Whether to build examples" ON)

Expand Down
4 changes: 4 additions & 0 deletions exporters/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ add_subdirectory(memory)
if(WITH_PROMETHEUS)
add_subdirectory(prometheus)
endif()

if(WITH_ELASTICSEARCH)
add_subdirectory(elasticsearch)
endif()
40 changes: 40 additions & 0 deletions exporters/elasticsearch/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package(default_visibility = ["//visibility:public"])

cc_library(
name = "es_log_exporter",
srcs = [
"src/es_log_exporter.cc",
],
hdrs = [
"include/opentelemetry/exporters/elasticsearch/es_log_exporter.h",
"include/opentelemetry/exporters/elasticsearch/es_log_recordable.h",
],
copts = [
"-DCURL_STATICLIB",
],
linkopts = select({
"//bazel:windows": [
"-DEFAULTLIB:advapi32.lib",
"-DEFAULTLIB:crypt32.lib",
"-DEFAULTLIB:Normaliz.lib",
],
"//conditions:default": [],
}),
strip_include_prefix = "include",
deps = [
"//ext:headers",
"//sdk/src/logs",
"@curl",
"@github_nlohmann_json//:json",
],
)

cc_test(
name = "es_log_exporter_test",
srcs = ["test/es_log_exporter_test.cc"],
deps = [
":es_log_exporter",
"@com_google_googletest//:gtest_main",
"@curl",
],
)
17 changes: 17 additions & 0 deletions exporters/elasticsearch/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
include_directories(include)
include_directories(${CMAKE_SOURCE_DIR}/ext/include)

add_library(opentelemetry_exporter_elasticsearch_logs src/es_log_exporter.cc)

if(BUILD_TESTING)
add_executable(es_log_exporter_test test/es_log_exporter_test.cc)

target_link_libraries(
es_log_exporter_test ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}
opentelemetry_exporter_elasticsearch_logs)

gtest_add_tests(
TARGET es_log_exporter_test
TEST_PREFIX exporter.
TEST_LIST es_log_exporter_test)
endif() # BUILD_TESTING
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "nlohmann/json.hpp"
#include "opentelemetry/ext/http/client/curl/http_client_curl.h"
#include "opentelemetry/nostd/type_traits.h"
#include "opentelemetry/sdk/logs/exporter.h"
#include "opentelemetry/sdk/logs/log_record.h"

#include <time.h>
#include <iostream>

namespace nostd = opentelemetry::nostd;
namespace sdklogs = opentelemetry::sdk::logs;

OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
{
namespace logs
{
/**
* Struct to hold Elasticsearch exporter configuration options.
*/
struct ElasticsearchExporterOptions
{
// Configuration options to establish Elasticsearch connection
std::string host_;
int port_;
std::string index_;

// Maximum time to wait for response after sending request to Elasticsearch
int response_timeout_;

// Whether to print the status of the exporter in the console
bool console_debug_;

/**
* Constructor for the ElasticsearchExporterOptions. By default, the endpoint is
* localhost:9200/logs with a timeout of 30 seconds and disabled console debugging
* @param host The host of the Elasticsearch instance
* @param port The port of the Elasticsearch instance
* @param index The index/shard that the logs will be written to
* @param response_timeout The maximum time in seconds the exporter should wait for a response
* from elasticsearch
* @param console_debug If true, print the status of the exporter methods in the console
*/
ElasticsearchExporterOptions(std::string host = "localhost",
int port = 9200,
std::string index = "logs",
int response_timeout = 30,
bool console_debug = false)
: host_{host},
port_{port},
index_{index},
response_timeout_{response_timeout},
console_debug_{console_debug}
{}
};

/**
* The ElasticsearchLogExporter exports logs to Elasticsearch in JSON format
*/
class ElasticsearchLogExporter final : public sdklogs::LogExporter
{
public:
/**
* Create an ElasticsearchLogExporter with default exporter options.
*/
ElasticsearchLogExporter();

/**
* Create an ElasticsearchLogExporter with user specified options.
* @param options An object containing the user's configuration options.
*/
ElasticsearchLogExporter(const ElasticsearchExporterOptions &options);

/**
* Creates a recordable that stores the data in a JSON object
*/
std::unique_ptr<sdk::logs::Recordable> MakeRecordable() noexcept override;

/**
* Exports a vector of log records to the Elasticsearch instance. Guaranteed to return after a
* timeout specified from the options passed from the constructor.
* @param records A list of log records to send to Elasticsearch.
*/
sdklogs::ExportResult Export(
const nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records) noexcept override;

/**
* Shutdown this exporter.
* @param timeout The maximum time to wait for the shutdown method to return
*/
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

private:
// Stores if this exporter had its Shutdown() method called
bool is_shutdown_ = false;

// Configuration options for the exporter
ElasticsearchExporterOptions options_;

// Object that stores the HTTP sessions that have been created
std::unique_ptr<ext::http::client::SessionManager> session_manager_;
};
} // namespace logs
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <map>
#include <unordered_map>
#include "nlohmann/json.hpp"
#include "opentelemetry/sdk/common/attribute_utils.h" // same as traces/attribute_utils
#include "opentelemetry/sdk/logs/recordable.h"
#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
{
namespace logs
{

/**
* An Elasticsearch Recordable implemenation that stores the 10 fields of the Log Data Model inside
* a JSON object,
*/
class ElasticSearchRecordable final : public sdk::logs::Recordable
{
private:
// Define a JSON object that will be populated with the log data
nlohmann::json json_;

/**
* A helper method that writes a key/value pair under a specified name, the two names used here
* being "attributes" and "resources"
*/
void WriteKeyValue(nostd::string_view key,
const opentelemetry::common::AttributeValue &value,
std::string name)
{
switch (value.index())
{
case common::AttributeType::TYPE_BOOL:
json_[name][key.data()] = opentelemetry::nostd::get<bool>(value) ? true : false;
return;
case common::AttributeType::TYPE_INT:
json_[name][key.data()] = opentelemetry::nostd::get<int>(value);
return;
case common::AttributeType::TYPE_INT64:
json_[name][key.data()] = opentelemetry::nostd::get<int64_t>(value);
return;
case common::AttributeType::TYPE_UINT:
json_[name][key.data()] = opentelemetry::nostd::get<unsigned int>(value);
return;
case common::AttributeType::TYPE_UINT64:
json_[name][key.data()] = opentelemetry::nostd::get<uint64_t>(value);
return;
case common::AttributeType::TYPE_DOUBLE:
json_[name][key.data()] = opentelemetry::nostd::get<double>(value);
return;
case common::AttributeType::TYPE_STRING:
#ifdef HAVE_CSTRING_TYPE
case common::AttributeType::TYPE_CSTRING:
#endif
json_[name][key.data()] =
opentelemetry::nostd::get<opentelemetry::nostd::string_view>(value).data();
return;
default:
return;
}
}

public:
/**
* Set the severity for this log.
* @param severity the severity of the event
*/
void SetSeverity(opentelemetry::logs::Severity severity) noexcept override
{
// Convert the severity enum to a string
json_["severity"] = opentelemetry::logs::SeverityNumToText[static_cast<int>(severity)];
}

/**
* Set name for this log
* @param name the name to set
*/
void SetName(nostd::string_view name) noexcept override { json_["name"] = name.data(); }

/**
* Set body field for this log.
* @param message the body to set
*/
void SetBody(nostd::string_view message) noexcept override { json_["body"] = message.data(); }

/**
* Set a resource for this log.
* @param name the name of the resource
* @param value the resource value
*/
void SetResource(nostd::string_view key,
const opentelemetry::common::AttributeValue &value) noexcept override
{
WriteKeyValue(key, value, "resource");
}

/**
* Set an attribute of a log.
* @param key the key of the attribute
* @param value the attribute value
*/
void SetAttribute(nostd::string_view key,
const opentelemetry::common::AttributeValue &value) noexcept override
{
WriteKeyValue(key, value, "attributes");
}

/**
* Set trace id for this log.
* @param trace_id the trace id to set
*/
void SetTraceId(opentelemetry::trace::TraceId trace_id) noexcept override
{
char trace_buf[32];
trace_id.ToLowerBase16(trace_buf);
json_["traceid"] = std::string(trace_buf, sizeof(trace_buf));
}

/**
* Set span id for this log.
* @param span_id the span id to set
*/
virtual void SetSpanId(opentelemetry::trace::SpanId span_id) noexcept override
{
char span_buf[16];
span_id.ToLowerBase16(span_buf);
json_["spanid"] = std::string(span_buf, sizeof(span_buf));
}

/**
* Inject a trace_flags for this log.
* @param trace_flags the span id to set
*/
void SetTraceFlags(opentelemetry::trace::TraceFlags trace_flags) noexcept override
{
char flag_buf[2];
trace_flags.ToLowerBase16(flag_buf);
json_["traceflags"] = std::string(flag_buf, sizeof(flag_buf));
}

/**
* Set the timestamp for this log.
* @param timestamp the timestamp of the event
*/
void SetTimestamp(core::SystemTimestamp timestamp) noexcept override
{
json_["timestamp"] = timestamp.time_since_epoch().count();
}

/**
* Returns a JSON object contain the log information
*/
nlohmann::json GetJSON() noexcept { return json_; };
};
} // namespace logs
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
Loading

0 comments on commit f462e41

Please sign in to comment.