Skip to content

Commit

Permalink
Merge pull request #33 from SumoLogic/byi-plugin-impl
Browse files Browse the repository at this point in the history
implement enhance-k8s-metadata plugin with label reading
  • Loading branch information
Bin Yi authored May 31, 2019
2 parents fc79baf + a01b11c commit b0eaba3
Show file tree
Hide file tree
Showing 14 changed files with 1,069 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
lib = File.expand_path("../lib", __FILE__)
lib = File.expand_path('../lib', __FILE__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)

Gem::Specification.new do |spec|
spec.name = "fluent-plugin-enhance-k8s-metadata"
spec.version = "0.0.0"
spec.authors = ["Sumo Logic"]
spec.email = ["[email protected]"]
spec.name = 'fluent-plugin-enhance-k8s-metadata'
spec.version = '0.0.0'
spec.authors = ['Sumo Logic']
spec.email = ['[email protected]']

spec.summary = "Fluentd plugin for appending extra metadata from Kubernetes."
spec.homepage = "https://github.com/SumoLogic/sumologic-kubernetes-collection"
spec.license = "Apache-2.0"
spec.summary = 'Fluentd plugin for appending extra metadata from Kubernetes.'
spec.homepage = 'https://github.com/SumoLogic/sumologic-kubernetes-collection'
spec.license = 'Apache-2.0'

test_files, files = `git ls-files -z`.split("\x0").partition do |f|
f.match(%r{^(test|spec|features)/})
end
spec.files = files
spec.executables = files.grep(%r{^bin/}) { |f| File.basename(f) }
spec.test_files = test_files
spec.require_paths = ["lib"]
spec.require_paths = ['lib']

spec.add_runtime_dependency "fluentd", [">= 0.14.10", "< 2"]
spec.add_runtime_dependency "kubeclient", "~> 4.4.0"
spec.add_runtime_dependency "lru_redux", "~> 1.1.0"
spec.add_runtime_dependency 'fluentd', ['>= 0.14.10', '< 2']
spec.add_runtime_dependency 'kubeclient', '~> 4.4.0'
spec.add_runtime_dependency 'lru_redux', '~> 1.1.0'

spec.add_development_dependency "bundler", "~> 2.0"
spec.add_development_dependency "rake", "~> 12.0"
spec.add_development_dependency "test-unit", "~> 3.0"
spec.add_development_dependency 'bundler', '~> 2.0'
spec.add_development_dependency 'rake', '~> 12.0'
spec.add_development_dependency 'test-unit', '~> 3.0'
spec.add_development_dependency 'webmock', '~> 3.0'
end
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,91 @@ module Plugin
class EnhanceK8sMetadataFilter < Fluent::Plugin::Filter
Fluent::Plugin.register_filter('enhance_k8s_metadata', self)

require_relative '../../sumologic/kubernetes/cache_strategy.rb'

helpers :record_accessor
include SumoLogic::Kubernetes::Connector
include SumoLogic::Kubernetes::Reader
include SumoLogic::Kubernetes::CacheStrategy

# parameters for read/write record
config_param :in_namespace_path, :string, default: '$.namespace'
config_param :in_pod_path, :string, default: '$.pod'
config_param :out_root, :string, default: 'kubernetes'

# parameters for connecting to k8s api server
config_param :kubernetes_url, :string, default: nil
config_param :apiVersion, :string, default: 'v1'
config_param :client_cert, :string, default: nil
config_param :client_key, :string, default: nil
config_param :ca_file, :string, default: nil
config_param :secret_dir, :string, default: '/var/run/secrets/kubernetes.io/serviceaccount'
config_param :bearer_token_file, :string, default: nil
config_param :verify_ssl, :bool, default: true
# if `ca_file` is for an intermediate CA, or otherwise we do not have the
# root CA and want to trust the intermediate CA certs we do have, set this
# to `true` - this corresponds to the openssl s_client -partial_chain flag
# and X509_V_FLAG_PARTIAL_CHAIN
config_param :ssl_partial_chain, :bool, default: false

config_param :cache_size, :integer, default: 1000
config_param :cache_ttl, :integer, default: 60 * 60

def configure(conf)
super
normalize_param
connect_kubernetes
init_cache
@in_namespace_ac = record_accessor_create(@in_namespace_path)
@in_pod_ac = record_accessor_create(@in_pod_path)
end

def filter(tag, time, record)
decorate_record(record)
record
end

private

def decorate_record(record)
namespace_name = @in_namespace_ac.call(record)
pod_name = @in_pod_ac.call(record)
if namespace_name.nil?
log.debug "Record doesn't have [#{@in_namespace_path}] field"
elsif pod_name.nil?
log.debug "Record doesn't have [#{@in_pod_path}] field"
else
labels = get_pod_labels(namespace_name, pod_name)
if labels.empty?
log.debug "Cannot get labels on pod #{namespace_name}::#{pod_name}, skip."
else
record[@out_root] = {} if record[@out_root].nil?
record[@out_root]['pod.labels'] = labels
end
end
end

def normalize_param
# Use Kubernetes default service account if running in a pod.
if @kubernetes_url.nil?
log.debug 'Kubernetes URL is not set - inspecting environment'
env_host = ENV['KUBERNETES_SERVICE_HOST']
env_port = ENV['KUBERNETES_SERVICE_PORT']
@kubernetes_url = "https://#{env_host}:#{env_port}/api" unless env_host.nil? || env_port.nil?
end
log.info "Kubernetes URL: '#{@kubernetes_url}'"

@ca_file = File.join(@secret_dir, K8_POD_CA_CERT) if @ca_file.nil?
log.info "ca_file: '#{@ca_file}', exist: #{File.exist?(@ca_file)}"

@bearer_token_file = File.join(@secret_dir, K8_POD_TOKEN) if @bearer_token_file.nil?
log.info "bearer_token_file: '#{@bearer_token_file}', exist: #{File.exist?(@bearer_token_file)}"

@cache_ttl = :none if @cache_ttl <= 0
log.info "cache_ttl: #{cache_ttl}, cache_size: #{@cache_size}"

@out_root = 'kubernetes' if @out_root.nil? || @out_root.empty?
log.info "out_root: #{@out_root}"
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
module SumoLogic
module Kubernetes
# module for caching strategy
module CacheStrategy
require 'lru_redux'
require_relative 'reader.rb'

CACHE_TYPE_POD_LABELS = 'pod_labels'.freeze

def init_cache
@all_caches = {
CACHE_TYPE_POD_LABELS => LruRedux::TTL::ThreadSafeCache.new(@cache_size, @cache_ttl),
}
end

def get_pod_labels(namespace_name, pod_name)
key = "#{namespace_name}::#{pod_name}"
cache = @all_caches[CACHE_TYPE_POD_LABELS]
labels = cache[key]
if labels.nil?
labels = fetch_pod_labels(namespace_name, pod_name)
cache[key] = labels
end
labels
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
module SumoLogic
module Kubernetes
# module for connecting to Kubernetes cluster
module Connector
require 'kubeclient'

K8_POD_CA_CERT = 'ca.crt'.freeze
K8_POD_TOKEN = 'token'.freeze

def connect_kubernetes
@client = Kubeclient::Client.new(
@kubernetes_url, @apiVersion,
ssl_options: ssl_options,
auth_options: auth_options,
as: :parsed
)
@client.api_valid?
rescue Exception => e
log.error e
end

def ssl_store
require 'openssl'
ssl_store = OpenSSL::X509::Store.new
ssl_store.set_default_paths
# if version of ruby does not define OpenSSL::X509::V_FLAG_PARTIAL_CHAIN
flagval = 0x80000
flagval = OpenSSL::X509::V_FLAG_PARTIAL_CHAIN if defined? OpenSSL::X509::V_FLAG_PARTIAL_CHAIN
ssl_store.flags = OpenSSL::X509::V_FLAG_CRL_CHECK_ALL | flagval
ssl_store
end

def ssl_options
ssl_options = {}
ssl_options[:verify_ssl] = @verify_ssl ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE
if !@ca_file.nil? && File.exist?(@ca_file)
ssl_options[:ca_file] = @ca_file
end
if !@client_cert.nil? && File.exist?(@client_cert)
ssl_options[:client_cert] = OpenSSL::X509::Certificate.new(File.read(@client_cert))
end
if !@client_key.nil? && File.exist?(@client_key)
ssl_options[:client_key] = OpenSSL::PKey::RSA.new(File.read(@client_key))
end
ssl_options[:cert_store] = ssl_store if @ssl_partial_chain
log.info "ssl_options: #{ssl_options}"
ssl_options
end

def auth_options
auth_options = {}
if !@bearer_token_file.nil? && File.exist?(@bearer_token_file)
auth_options[:bearer_token] = File.read(@bearer_token_file)
end
log.info "auth_options: #{ssl_options}"
auth_options
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
module SumoLogic
module Kubernetes
# module for reading from Kubernetes API server
module Reader
require_relative 'connector.rb'

def fetch_pod(namespace_name, pod_name)
log.info "fetching pod metadata: #{namespace_name}::#{pod_name}"
pod = @client.get_pod(pod_name, namespace_name)
log.debug "raw metadata for #{namespace_name}::#{pod_name}: #{pod}"
pod
end

def extract_pod_labels(pod)
if pod.nil?
log.warn 'pod is nil'
elsif pod['metadata'].nil?
log.warn 'metadata is nil'
elsif pod['metadata']['labels'].nil?
log.warn 'labels is nil'
else
pod['metadata']['labels']
end
end

def fetch_pod_labels(namespace_name, pod_name)
extract_pod_labels(fetch_pod(namespace_name, pod_name))
rescue Kubeclient::ResourceNotFoundError => e
log.error e
# TODO: we now cache empty if not found since some namespace/pod not matching
{}
end
end
end
end
41 changes: 36 additions & 5 deletions fluent-plugin-enhance-k8s-metadata/test/helper.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,39 @@
$LOAD_PATH.unshift(File.expand_path("../../", __FILE__))
require "test-unit"
require "fluent/test"
require "fluent/test/driver/filter"
require "fluent/test/helpers"
$LOAD_PATH.unshift(File.expand_path('../../', __FILE__))
require 'test-unit'
require 'fluent/test'
require 'fluent/test/driver/filter'
require 'fluent/test/helpers'
require 'webmock/test_unit'
require 'kubeclient'

Test::Unit::TestCase.include(Fluent::Test::Helpers)
Test::Unit::TestCase.extend(Fluent::Test::Helpers)

def test_resource(name)
File.new("test/resources/#{name}")
end

def stub_apis
init_globals
stub_request(:get, %r{/api/v1$})
.to_return(body: test_resource('api_list.json'), status: 200)
stub_request(:get, %r{/api/v1/namespaces/sumologic/pods})
.to_return(body: test_resource('pod_sumologic.json'), status: 200)
stub_request(:get, %r{/api/v1/namespaces/kube-system/pods})
.to_return(body: test_resource('pod_kube-system.json'), status: 200)
stub_request(:get, %r{/api/v1/namespaces/non-exist/pods})
.to_raise(Kubeclient::ResourceNotFoundError.new(404, nil, nil))
end

def init_globals
@kubernetes_url = 'http://localhost:8080/api/'
@apiVersion = 'v1'
@verify_ssl = false
@ca_file = nil
@client_cert = nil
@client_key = nil
@ssl_partial_chain = false
@bearer_token_file = nil
@cache_size = 1000
@cache_ttl = 60 * 60
end
Loading

0 comments on commit b0eaba3

Please sign in to comment.