Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update out_forward using updated service_discovery plugin helper API #3300

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 13 additions & 30 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -227,31 +227,14 @@ def configure(conf)
socket_cache: socket_cache,
)

configs = []

# rewrite for using server as sd_static
conf.elements(name: 'server').each do |s|
s.name = 'service'
end

unless conf.elements(name: 'service').empty?
# To copy `services` element only
new_elem = Fluent::Config::Element.new('static_service_discovery', {}, {}, conf.elements(name: 'service'))
configs << { type: :static, conf: new_elem }
end

conf.elements(name: 'service_discovery').each_with_index do |c, i|
configs << { type: @service_discovery[i][:@type], conf: c }
end

service_discovery_create_manager(
service_discovery_configure(
:out_forward_service_discovery_watcher,
configurations: configs,
static_default_service_directive: 'server',
load_balancer: LoadBalancer.new(log),
custom_build_method: method(:build_node),
)

discovery_manager.services.each do |server|
service_discovery_services.each do |server|
# it's only for test
@nodes << server
unless @heartbeat_type == :none
Expand All @@ -273,7 +256,7 @@ def configure(conf)
end
end

if discovery_manager.services.empty?
if service_discovery_services.empty?
raise Fluent::ConfigError, "forward output plugin requires at least one node is required. Add <server> or <service_discovery>"
end

Expand Down Expand Up @@ -306,7 +289,7 @@ def start

unless @heartbeat_type == :none
if @heartbeat_type == :udp
@usock = socket_create_udp(discovery_manager.services.first.host, discovery_manager.services.first.port, nonblock: true)
@usock = socket_create_udp(service_discovery_services.first.host, service_discovery_services.first.port, nonblock: true)
server_create_udp(:out_forward_heartbeat_receiver, 0, socket: @usock, max_bytes: @read_length, &method(:on_udp_heatbeat_response_recv))
end
timer_execute(:out_forward_heartbeat_request, @heartbeat_interval, &method(:on_heartbeat_timer))
Expand All @@ -318,7 +301,7 @@ def start
end

if @verify_connection_at_startup
discovery_manager.services.each do |node|
service_discovery_services.each do |node|
begin
node.verify_connection
rescue StandardError => e
Expand Down Expand Up @@ -374,7 +357,7 @@ def write(chunk)
return if chunk.empty?
tag = chunk.metadata.tag

discovery_manager.select_service { |node| node.send_data(tag, chunk) }
service_discovery_select_service { |node| node.send_data(tag, chunk) }
end

def try_write(chunk)
Expand All @@ -384,7 +367,7 @@ def try_write(chunk)
return
end
tag = chunk.metadata.tag
discovery_manager.select_service { |node| node.send_data(tag, chunk) }
service_discovery_select_service { |node| node.send_data(tag, chunk) }
last_ack if @require_ack_response && @suspend_flush
end

Expand Down Expand Up @@ -434,7 +417,7 @@ def create_transfer_socket(host, port, hostname, &block)

def statistics
stats = super
services = discovery_manager.services
services = service_discovery_services
healthy_nodes_count = 0
registed_nodes_count = services.size
services.each do |s|
Expand Down Expand Up @@ -471,7 +454,7 @@ def build_node(server)

def on_heartbeat_timer
need_rebuild = false
discovery_manager.services.each do |n|
service_discovery_services.each do |n|
begin
log.trace "sending heartbeat", host: n.host, port: n.port, heartbeat_type: @heartbeat_type
n.usock = @usock if @usock
Expand All @@ -486,16 +469,16 @@ def on_heartbeat_timer
end

if need_rebuild
discovery_manager.rebalance
service_discovery_rebalance
end
end

def on_udp_heatbeat_response_recv(data, sock)
sockaddr = Socket.pack_sockaddr_in(sock.remote_port, sock.remote_host)
if node = discovery_manager.services.find { |n| n.sockaddr == sockaddr }
if node = service_discovery_services.find { |n| n.sockaddr == sockaddr }
# log.trace "heartbeat arrived", name: node.name, host: node.host, port: node.port
if node.heartbeat
discovery_manager.rebalance
service_discovery_rebalance
end
else
log.warn("Unknown heartbeat response received from #{sock.remote_host}:#{sock.remote_port}. It may service out")
Expand Down