Skip to content

Commit

Permalink
add gzip support (#1506)
Browse files Browse the repository at this point in the history
* add gzip support
  • Loading branch information
jjtyro authored and wu-sheng committed Jul 30, 2018
1 parent 4788ebd commit eed6fa1
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,28 @@
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.RegisterServices;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.ZipkinReceiverConfig;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.cache.CacheFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;

import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.zip.GZIPInputStream;

public class SpanProcessor {
private final Logger logger = LoggerFactory.getLogger(SpanProcessor.class);

void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletRequest request, RegisterServices registerServices) throws IOException {
int len = request.getContentLength();
ServletInputStream iii = request.getInputStream();
byte[] buffer = new byte[len];

int readCntTotal = 0;
InputStream inputStream = getInputStream(request);
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buffer = new byte[2048];
int readCntOnce;
while (readCntTotal < len) {
readCntOnce = iii.read(buffer, readCntTotal, len - readCntTotal);
if (readCntOnce <= 0) {
logger.error("Receive spans data failed.");
throw new IOException();
}
readCntTotal += readCntOnce;

while ((readCntOnce = inputStream.read(buffer)) >= 0) {
out.write(buffer, 0, readCntOnce);
}

List<Span> spanList = decoder.decodeList(buffer);
List<Span> spanList = decoder.decodeList(out.toByteArray());

spanList.forEach(span -> {
// In Zipkin, the local service name represents the application owner.
Expand All @@ -65,4 +57,17 @@ void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletR
CacheFactory.INSTANCE.get(config).addSpan(span);
});
}

private InputStream getInputStream(HttpServletRequest request) throws IOException {
InputStream requestInStream;

String headEncoding = request.getHeader("accept-encoding");
if (headEncoding != null && (headEncoding.indexOf("gzip") != -1)) {
requestInStream = new GZIPInputStream(request.getInputStream());
} else {
requestInStream = request.getInputStream();
}

return requestInStream;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ private SpanObject.Builder initSpan(SpanObject.Builder parentSegmentSpan, Span p
}
// microseconds in Zipkin -> milliseconds in SkyWalking
long startTime = span.timestamp() / 1000;
long duration = span.duration() / 1000;
// Some implement of zipkin client not include duration field in its report
// package when duration's value be 0ms, Causing a null pointer exception here.
Long durationObj = span.duration();
long duration = (durationObj == null) ? 0 : durationObj.longValue() / 1000;
spanBuilder.setStartTime(startTime);
spanBuilder.setEndTime(startTime + duration);

Expand Down

0 comments on commit eed6fa1

Please sign in to comment.