From acbbaddca89b48395064215210d4d3e37e56959b Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Mon, 27 Jul 2015 11:39:28 +0900 Subject: [PATCH] Support compression in forward transfer --- lib/fluent/plugin/in_forward.rb | 9 +++++- lib/fluent/plugin/out_forward.rb | 54 ++++++++++++++++++++++---------- 2 files changed, 46 insertions(+), 17 deletions(-) diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index 4242975479..ab51afef6b 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -20,6 +20,8 @@ class ForwardInput < Input def initialize super + require 'zlib' + require 'stringio' require 'fluent/plugin/socket_util' end @@ -148,9 +150,14 @@ def on_message(msg, chunk_size, source) if entries.class == String # PackedForward + option = msg[2] + if option && option['compress'] + Zlib::GzipReader.wrap(StringIO.new(entries)) { |gz| + entries = gz.read + } + end es = MessagePackEventStream.new(entries) router.emit_stream(tag, es) - option = msg[2] elsif entries.class == Array # Forward diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 571237e544..bf357c9202 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -29,6 +29,7 @@ class ForwardOutput < ObjectBufferedOutput def initialize super + require 'zlib' require "base64" require 'socket' require 'fileutils' @@ -63,6 +64,7 @@ def initialize config_param :phi_threshold, :integer, :default => 16 desc 'Use the "Phi accrual failure detector" to detect server failure.' config_param :phi_failure_detector, :bool, :default => true + config_param :compress, :bool, :default => false # if any options added that requires extended forward api, fix @extend_internal_protocol @@ -265,7 +267,7 @@ def rebuild_weight_array FORWARD_HEADER = [0x92].pack('C').freeze FORWARD_HEADER_EXT = [0x93].pack('C').freeze def forward_header - if @extend_internal_protocol + if @extend_internal_protocol || @compress FORWARD_HEADER_EXT else FORWARD_HEADER @@ -304,24 +306,43 @@ def send_data(node, tag, chunk) # writeRaw(tag) sock.write tag.to_msgpack # tag - # beginRaw(size) - sz = chunk.size - #if sz < 32 - # # FixRaw - # sock.write [0xa0 | sz].pack('C') - #elsif sz < 65536 - # # raw 16 - # sock.write [0xda, sz].pack('Cn') - #else - # raw 32 - sock.write [0xdb, sz].pack('CN') - #end - # writeRawBody(packed_es) - chunk.write_to(sock) + option = nil + if @compress + tmp = Tempfile.new("forward-#{chunk.key}") + gz = Zlib::GzipWriter.new(tmp) + chunk.write_to(gz) + gz.finish + gz = nil + + sock.write [0xdb, tmp.pos].pack('CN') + + tmp.pos = 0 + FileUtils.copy_stream(tmp, sock) + + option = {'compress' => true} + unless @extend_internal_protocol + sock.write option.to_msgpack + end + else + # beginRaw(size) + sz = chunk.size + #if sz < 32 + # # FixRaw + # sock.write [0xa0 | sz].pack('C') + #elsif sz < 65536 + # # raw 16 + # sock.write [0xda, sz].pack('Cn') + #else + # raw 32 + sock.write [0xdb, sz].pack('CN') + #end + + chunk.write_to(sock) + end if @extend_internal_protocol - option = {} + option ||= {} option['chunk'] = Base64.encode64(chunk.unique_id) if @require_ack_response sock.write option.to_msgpack @@ -363,6 +384,7 @@ def send_data(node, tag, chunk) node.heartbeat(false) return res # for test ensure + tmp.close(true) if tmp sock.close_write sock.close end