Skip to content
This repository has been archived by the owner on Nov 30, 2021. It is now read-only.

Commit

Permalink
kong/plugins/zipkin/opentracing.lua: Port to stream subsystem
Browse files Browse the repository at this point in the history
  • Loading branch information
james-callahan committed Jan 15, 2019
1 parent d26520b commit 00368ed
Showing 1 changed file with 133 additions and 86 deletions.
219 changes: 133 additions & 86 deletions kong/plugins/zipkin/opentracing.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ A plugin that derives this should:
]]

local BasePlugin = require "kong.plugins.base_plugin"
local subsystem = ngx.config.subsystem

local OpenTracingHandler = BasePlugin:extend()
OpenTracingHandler.VERSION = "scm"
Expand All @@ -33,6 +34,15 @@ function OpenTracingHandler:get_tracer(conf)
return tracer
end

function OpenTracingHandler:get_context(conf, ctx)
local opentracing = ctx.opentracing
if not opentracing then
self:initialise_request(conf, ctx)
opentracing = ctx.opentracing
end
return opentracing
end

-- Utility function to set either ipv4 or ipv6 tags
-- nginx apis don't have a flag to indicate whether an address is v4 or v6
local function ip_tag(addr)
Expand All @@ -44,112 +54,145 @@ local function ip_tag(addr)
end
end

function OpenTracingHandler:initialise_request(conf, ctx)
local tracer = self:get_tracer(conf)
local req = kong.request
local wire_context = tracer:extract("http_headers", req.get_headers()) -- could be nil
local method, url
local path_with_query = req.get_path_with_query()
if path_with_query ~= "" then
method = req.get_method()
url = req.get_scheme() .. "://" .. req.get_host() .. ":"
.. req.get_port() .. path_with_query
end
local forwarded_ip = kong.client.get_forwarded_ip()
local request_span = tracer:start_span("kong.request", {
child_of = wire_context;
start_timestamp = ngx.req.start_time(),
tags = {
component = "kong";
["span.kind"] = "server";
["http.method"] = method;
["http.url"] = url;
[ip_tag(forwarded_ip)] = forwarded_ip;
["peer.port"] = kong.client.get_forwarded_port();
if subsystem == "http" then
function OpenTracingHandler:initialise_request(conf, ctx)
local tracer = self:get_tracer(conf)
local req = kong.request
local wire_context = tracer:extract("http_headers", req.get_headers()) -- could be nil
local method, url
local path_with_query = req.get_path_with_query()
if path_with_query ~= "" then
method = req.get_method()
url = req.get_scheme() .. "://" .. req.get_host() .. ":"
.. req.get_port() .. path_with_query
end
local forwarded_ip = kong.client.get_forwarded_ip()
local request_span = tracer:start_span("kong.request", {
child_of = wire_context;
start_timestamp = ngx.req.start_time(),
tags = {
component = "kong";
["span.kind"] = "server";
["http.method"] = method;
["http.url"] = url;
[ip_tag(forwarded_ip)] = forwarded_ip;
["peer.port"] = kong.client.get_forwarded_port();
}
})
ctx.opentracing = {
tracer = tracer;
wire_context = wire_context;
request_span = request_span;
rewrite_span = nil;
access_span = nil;
proxy_span = nil;
header_filter_span = nil;
header_filter_finished = false;
body_filter_span = nil;
}
})
ctx.opentracing = {
tracer = tracer;
wire_context = wire_context;
request_span = request_span;
rewrite_span = nil;
access_span = nil;
proxy_span = nil;
header_filter_span = nil;
header_filter_finished = false;
body_filter_span = nil;
}
end

function OpenTracingHandler:get_context(conf, ctx)
local opentracing = ctx.opentracing
if not opentracing then
self:initialise_request(conf, ctx)
opentracing = ctx.opentracing
end
return opentracing
end

function OpenTracingHandler:access(conf)
OpenTracingHandler.super.access(self, conf)
function OpenTracingHandler:access(conf)
OpenTracingHandler.super.access(self, conf)

local ctx = ngx.ctx
local opentracing = self:get_context(conf, ctx)
local ctx = ngx.ctx
local opentracing = self:get_context(conf, ctx)

opentracing.proxy_span = opentracing.request_span:start_child_span(
"kong.proxy",
ctx.KONG_ACCESS_START / 1000
)
opentracing.proxy_span = opentracing.request_span:start_child_span(
"kong.proxy",
ctx.KONG_ACCESS_START / 1000
)

opentracing.access_span = opentracing.proxy_span:start_child_span(
"kong.access",
ctx.KONG_ACCESS_START / 1000
)
opentracing.access_span = opentracing.proxy_span:start_child_span(
"kong.access",
ctx.KONG_ACCESS_START / 1000
)

-- Want to send headers to upstream
local outgoing_headers = {}
opentracing.tracer:inject(opentracing.proxy_span, "http_headers", outgoing_headers)
local set_header = kong.service.request.set_header
for k, v in pairs(outgoing_headers) do
set_header(k, v)
-- Want to send headers to upstream
local outgoing_headers = {}
opentracing.tracer:inject(opentracing.proxy_span, "http_headers", outgoing_headers)
local set_header = kong.service.request.set_header
for k, v in pairs(outgoing_headers) do
set_header(k, v)
end
end
end

function OpenTracingHandler:header_filter(conf)
OpenTracingHandler.super.header_filter(self, conf)
function OpenTracingHandler:header_filter(conf)
OpenTracingHandler.super.header_filter(self, conf)

local ctx = ngx.ctx
local opentracing = self:get_context(conf, ctx)
local ctx = ngx.ctx
local opentracing = self:get_context(conf, ctx)

local header_started = ctx.KONG_HEADER_FILTER_STARTED_AT and ctx.KONG_HEADER_FILTER_STARTED_AT / 1000 or ngx.now()
local header_started = ctx.KONG_HEADER_FILTER_STARTED_AT and ctx.KONG_HEADER_FILTER_STARTED_AT / 1000 or ngx.now()

if not opentracing.proxy_span then
opentracing.proxy_span = opentracing.request_span:start_child_span(
"kong.proxy",
if not opentracing.proxy_span then
opentracing.proxy_span = opentracing.request_span:start_child_span(
"kong.proxy",
header_started
)
end

opentracing.header_filter_span = opentracing.proxy_span:start_child_span(
"kong.header_filter",
header_started
)
end

opentracing.header_filter_span = opentracing.proxy_span:start_child_span(
"kong.header_filter",
header_started
)
end
function OpenTracingHandler:body_filter(conf)
OpenTracingHandler.super.body_filter(self, conf)

function OpenTracingHandler:body_filter(conf)
OpenTracingHandler.super.body_filter(self, conf)
local ctx = ngx.ctx
local opentracing = self:get_context(conf, ctx)

local ctx = ngx.ctx
local opentracing = self:get_context(conf, ctx)
-- Finish header filter when body filter starts
if not opentracing.header_filter_finished then
local now = ngx.now()

-- Finish header filter when body filter starts
if not opentracing.header_filter_finished then
local now = ngx.now()
opentracing.header_filter_span:finish(now)
opentracing.header_filter_finished = true

opentracing.header_filter_span:finish(now)
opentracing.header_filter_finished = true
opentracing.body_filter_span = opentracing.proxy_span:start_child_span("kong.body_filter", now)
end
end
elseif subsystem == "stream" then
function OpenTracingHandler:initialise_request(conf, ctx)
local tracer = self:get_tracer(conf)
local wire_context = nil
local forwarded_ip = kong.client.get_forwarded_ip()
local request_span = tracer:start_span("kong.stream", {
child_of = wire_context;
start_timestamp = ngx.req.start_time(),
tags = {
component = "kong";
["span.kind"] = "server";
[ip_tag(forwarded_ip)] = forwarded_ip;
["peer.port"] = kong.client.get_forwarded_port();
}
})
ctx.opentracing = {
tracer = tracer;
wire_context = wire_context;
request_span = request_span;
preread_span = nil;
proxy_span = nil;
}
end

function OpenTracingHandler:preread(conf)
OpenTracingHandler.super.preread(self, conf)

local ctx = ngx.ctx
local opentracing = self:get_context(conf, ctx)

opentracing.body_filter_span = opentracing.proxy_span:start_child_span("kong.body_filter", now)
opentracing.proxy_span = opentracing.request_span:start_child_span(
"kong.proxy",
ctx.KONG_PREREAD_START / 1000
)

opentracing.preread_span = opentracing.proxy_span:start_child_span(
"kong.preread",
ctx.KONG_PREREAD_START / 1000
)
end
end

Expand Down Expand Up @@ -180,6 +223,8 @@ function OpenTracingHandler:log(conf)

if opentracing.access_span then
opentracing.access_span:finish(ctx.KONG_ACCESS_ENDED_AT and ctx.KONG_ACCESS_ENDED_AT/1000 or proxy_end)
elseif opentracing.preread_span then
opentracing.preread_span:finish(ctx.KONG_PREREAD_ENDED_AT and ctx.KONG_PREREAD_ENDED_AT/1000 or proxy_end)
end

local balancer_data = ctx.balancer_data
Expand Down Expand Up @@ -214,7 +259,9 @@ function OpenTracingHandler:log(conf)
opentracing.body_filter_span:finish(proxy_end)
end

request_span:set_tag("http.status_code", kong.response.get_status())
if subsystem == "http" then
request_span:set_tag("http.status_code", kong.response.get_status())
end
if ctx.authenticated_consumer then
request_span:set_tag("kong.consumer", ctx.authenticated_consumer.id)
end
Expand Down

0 comments on commit 00368ed

Please sign in to comment.