diff --git a/LICENSES.txt b/LICENSES.txt index f79ff1a87d..32ccf8590d 100644 --- a/LICENSES.txt +++ b/LICENSES.txt @@ -10,12 +10,19 @@ Apache-2.0 WMI4Java-1.6.3.jar accessors-smart-2.4.9.jar annotations-17.0.0.jar +<<<<<<< HEAD apiguardian-api-1.1.0.jar +======= +>>>>>>> a312e4495 ([TFMonhHx] Forces new version of netty (#450)) arrow-format-12.0.1.jar arrow-memory-core-12.0.1.jar arrow-memory-netty-12.0.1.jar arrow-vector-12.0.1.jar +<<<<<<< HEAD assertj-core-3.18.1.jar +======= + assertj-core-3.24.2.jar +>>>>>>> a312e4495 ([TFMonhHx] Forces new version of netty (#450)) audience-annotations-0.5.0.jar avro-1.7.7.jar awaitility-4.1.0.jar @@ -101,6 +108,7 @@ Apache-2.0 jackson-core-2.13.4.jar jackson-core-2.15.1.jar jackson-core-asl-1.9.13.jar +<<<<<<< HEAD jackson-databind-2.13.4.2.jar jackson-databind-2.15.1.jar jackson-dataformat-cbor-2.15.1.jar @@ -110,6 +118,15 @@ Apache-2.0 jackson-jaxrs-base-2.15.1.jar jackson-jaxrs-json-provider-2.13.4.jar jackson-jaxrs-json-provider-2.15.1.jar +======= + jackson-databind-2.15.2.jar + jackson-dataformat-cbor-2.15.2.jar + jackson-dataformat-csv-2.15.2.jar + jackson-datatype-jsr310-2.15.1.jar + jackson-datatype-jsr310-2.15.2.jar + jackson-jaxrs-base-2.15.2.jar + jackson-jaxrs-json-provider-2.15.2.jar +>>>>>>> a312e4495 ([TFMonhHx] Forces new version of netty (#450)) jackson-mapper-asl-1.9.13.jar jackson-module-jaxb-annotations-2.13.4.jar jackson-module-jaxb-annotations-2.15.1.jar @@ -173,10 +190,16 @@ Apache-2.0 metrics-core-3.2.4.jar netty-3.10.6.Final.jar netty-all-4.1.89.Final.jar +<<<<<<< HEAD netty-buffer-4.1.86.Final.jar netty-buffer-4.1.89.Final.jar netty-codec-4.1.86.Final.jar netty-codec-4.1.89.Final.jar +======= + netty-buffer-4.1.93.Final.jar + netty-buffer-4.1.94.Final.jar + netty-codec-4.1.93.Final.jar +>>>>>>> a312e4495 ([TFMonhHx] Forces new version of netty (#450)) netty-codec-dns-4.1.89.Final.jar netty-codec-haproxy-4.1.89.Final.jar netty-codec-http-4.1.86.Final.jar @@ -189,10 +212,16 @@ Apache-2.0 netty-codec-socks-4.1.89.Final.jar netty-codec-stomp-4.1.89.Final.jar netty-codec-xml-4.1.89.Final.jar +<<<<<<< HEAD netty-common-4.1.86.Final.jar netty-common-4.1.89.Final.jar netty-handler-4.1.86.Final.jar netty-handler-4.1.89.Final.jar +======= + netty-common-4.1.93.Final.jar + netty-common-4.1.94.Final.jar + netty-handler-4.1.93.Final.jar +>>>>>>> a312e4495 ([TFMonhHx] Forces new version of netty (#450)) netty-handler-proxy-4.1.89.Final.jar netty-handler-ssl-ocsp-4.1.89.Final.jar netty-resolver-4.1.86.Final.jar diff --git a/NOTICE.txt b/NOTICE.txt index e704170d0f..5f309646cb 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -40,12 +40,19 @@ Apache-2.0 WMI4Java-1.6.3.jar accessors-smart-2.4.9.jar annotations-17.0.0.jar +<<<<<<< HEAD apiguardian-api-1.1.0.jar +======= +>>>>>>> a312e4495 ([TFMonhHx] Forces new version of netty (#450)) arrow-format-12.0.1.jar arrow-memory-core-12.0.1.jar arrow-memory-netty-12.0.1.jar arrow-vector-12.0.1.jar +<<<<<<< HEAD assertj-core-3.18.1.jar +======= + assertj-core-3.24.2.jar +>>>>>>> a312e4495 ([TFMonhHx] Forces new version of netty (#450)) audience-annotations-0.5.0.jar avro-1.7.7.jar awaitility-4.1.0.jar @@ -131,6 +138,7 @@ Apache-2.0 jackson-core-2.13.4.jar jackson-core-2.15.1.jar jackson-core-asl-1.9.13.jar +<<<<<<< HEAD jackson-databind-2.13.4.2.jar jackson-databind-2.15.1.jar jackson-dataformat-cbor-2.15.1.jar @@ -140,6 +148,15 @@ Apache-2.0 jackson-jaxrs-base-2.15.1.jar jackson-jaxrs-json-provider-2.13.4.jar jackson-jaxrs-json-provider-2.15.1.jar +======= + jackson-databind-2.15.2.jar + jackson-dataformat-cbor-2.15.2.jar + jackson-dataformat-csv-2.15.2.jar + jackson-datatype-jsr310-2.15.1.jar + jackson-datatype-jsr310-2.15.2.jar + jackson-jaxrs-base-2.15.2.jar + jackson-jaxrs-json-provider-2.15.2.jar +>>>>>>> a312e4495 ([TFMonhHx] Forces new version of netty (#450)) jackson-mapper-asl-1.9.13.jar jackson-module-jaxb-annotations-2.13.4.jar jackson-module-jaxb-annotations-2.15.1.jar @@ -203,10 +220,16 @@ Apache-2.0 metrics-core-3.2.4.jar netty-3.10.6.Final.jar netty-all-4.1.89.Final.jar +<<<<<<< HEAD netty-buffer-4.1.86.Final.jar netty-buffer-4.1.89.Final.jar netty-codec-4.1.86.Final.jar netty-codec-4.1.89.Final.jar +======= + netty-buffer-4.1.93.Final.jar + netty-buffer-4.1.94.Final.jar + netty-codec-4.1.93.Final.jar +>>>>>>> a312e4495 ([TFMonhHx] Forces new version of netty (#450)) netty-codec-dns-4.1.89.Final.jar netty-codec-haproxy-4.1.89.Final.jar netty-codec-http-4.1.86.Final.jar @@ -219,10 +242,16 @@ Apache-2.0 netty-codec-socks-4.1.89.Final.jar netty-codec-stomp-4.1.89.Final.jar netty-codec-xml-4.1.89.Final.jar +<<<<<<< HEAD netty-common-4.1.86.Final.jar netty-common-4.1.89.Final.jar netty-handler-4.1.86.Final.jar netty-handler-4.1.89.Final.jar +======= + netty-common-4.1.93.Final.jar + netty-common-4.1.94.Final.jar + netty-handler-4.1.93.Final.jar +>>>>>>> a312e4495 ([TFMonhHx] Forces new version of netty (#450)) netty-handler-proxy-4.1.89.Final.jar netty-handler-ssl-ocsp-4.1.89.Final.jar netty-resolver-4.1.86.Final.jar diff --git a/core/build.gradle b/core/build.gradle index b9154c4a25..ca4627a112 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -144,16 +144,19 @@ dependencies { compile group: 'xerces', name: 'xercesImpl', version: '2.12.2' - compile group: 'org.apache.arrow', name: 'arrow-vector', version: '12.0.1', { + def arrowExclusions = { exclude group: 'com.fasterxml.jackson.core', module: 'jackson-core' exclude group: 'com.fasterxml.jackson.core', module: 'jackson-annotations' exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind' exclude group: 'io.netty', module: 'netty-common' } - compile group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '12.0.1', { - exclude group: 'com.google.guava', module: 'guava' - exclude group: 'io.netty', module: 'netty-common' - exclude group: 'io.netty', module: 'netty-buffer' + + compile group: 'org.apache.arrow', name: 'arrow-vector', version: '12.0.1', arrowExclusions + compile group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '12.0.1', arrowExclusions + compile group: 'io.netty', name: 'netty-buffer', { + version { + strictly '4.1.94.Final' + } } testCompile group: 'org.apache.arrow', name: 'arrow-vector', version: '12.0.1' testCompile group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '12.0.1' diff --git a/core/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/core/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java new file mode 100644 index 0000000000..0fcf5b3e3e --- /dev/null +++ b/core/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java @@ -0,0 +1,278 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.arrow.memory.OutOfMemoryException; +import org.apache.arrow.memory.util.LargeMemoryUtil; + +import io.netty.util.internal.OutOfDirectMemoryError; +import io.netty.util.internal.StringUtil; + +/** + * The base allocator that we use for all of Arrow's memory management. Returns + * UnsafeDirectLittleEndian buffers. + */ +public class PooledByteBufAllocatorL { + + private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator"); + + private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60; + public final UnsafeDirectLittleEndian empty; + private final AtomicLong hugeBufferSize = new AtomicLong(0); + private final AtomicLong hugeBufferCount = new AtomicLong(0); + private final AtomicLong normalBufferSize = new AtomicLong(0); + private final AtomicLong normalBufferCount = new AtomicLong(0); + private final InnerAllocator allocator; + + public PooledByteBufAllocatorL() { + allocator = new InnerAllocator(); + empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER)); + } + + /** + * Returns a {@linkplain io.netty.buffer.UnsafeDirectLittleEndian} of the given size. + */ + public UnsafeDirectLittleEndian allocate(long size) { + try { + return allocator.directBuffer(LargeMemoryUtil.checkedCastToInt(size), Integer.MAX_VALUE); + } catch (OutOfMemoryError e) { + /* + * OutOfDirectMemoryError is thrown by Netty when we exceed the direct memory limit defined by + * -XX:MaxDirectMemorySize. OutOfMemoryError with "Direct buffer memory" message is thrown by + * java.nio.Bits when we exceed the direct memory limit. This should never be hit in practice + * as Netty is expected to throw an OutOfDirectMemoryError first. + */ + if (e instanceof OutOfDirectMemoryError || "Direct buffer memory".equals(e.getMessage())) { + throw new OutOfMemoryException("Failure allocating buffer.", e); + } + throw e; + } + } + + public int getChunkSize() { + return allocator.chunkSize; + } + + public long getHugeBufferSize() { + return hugeBufferSize.get(); + } + + public long getHugeBufferCount() { + return hugeBufferCount.get(); + } + + public long getNormalBufferSize() { + return normalBufferSize.get(); + } + + public long getNormalBufferCount() { + return normalBufferSize.get(); + } + + private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian { + + private final long initialCapacity; + private final AtomicLong count; + private final AtomicLong size; + + private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, AtomicLong size) { + super(buf); + this.initialCapacity = buf.capacity(); + this.count = count; + this.size = size; + } + + private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count, + AtomicLong size) { + super(buf); + this.initialCapacity = buf.capacity(); + this.count = count; + this.size = size; + } + + @Override + public ByteBuf copy() { + throw new UnsupportedOperationException("copy method is not supported"); + } + + @Override + public ByteBuf copy(int index, int length) { + throw new UnsupportedOperationException("copy method is not supported"); + } + + @Override + public boolean release(int decrement) { + boolean released = super.release(decrement); + if (released) { + count.decrementAndGet(); + size.addAndGet(-initialCapacity); + } + return released; + } + + } + + private class InnerAllocator extends PooledByteBufAllocator { + + private final PoolArena[] directArenas; + private final MemoryStatusThread statusThread; + private final int chunkSize; + + public InnerAllocator() { + super(true); + + try { + Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas"); + f.setAccessible(true); + this.directArenas = (PoolArena[]) f.get(this); + } catch (Exception e) { + throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e); + } + + this.chunkSize = directArenas[0].chunkSize; + + if (memoryLogger.isTraceEnabled()) { + statusThread = new MemoryStatusThread(); + statusThread.start(); + } else { + statusThread = null; + } + } + + private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) { + PoolArenasCache cache = threadCache(); + PoolArena directArena = cache.directArena; + + if (directArena != null) { + + if (initialCapacity > directArena.chunkSize) { + // This is beyond chunk size so we'll allocate separately. + ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity); + + hugeBufferSize.addAndGet(buf.capacity()); + hugeBufferCount.incrementAndGet(); + + // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception()); + return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount, + hugeBufferSize); + } else { + // within chunk, use arena. + ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity); + if (!(buf instanceof PooledUnsafeDirectByteBuf)) { + fail(); + } + + if (!ASSERT_ENABLED) { + return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf); + } + + normalBufferSize.addAndGet(buf.capacity()); + normalBufferCount.incrementAndGet(); + + return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, + normalBufferCount, normalBufferSize); + } + + } else { + throw fail(); + } + } + + private UnsupportedOperationException fail() { + return new UnsupportedOperationException( + "Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform " + + "didn't provide that functionality."); + } + + @Override + public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) { + if (initialCapacity == 0 && maxCapacity == 0) { + newDirectBuffer(initialCapacity, maxCapacity); + } + validate(initialCapacity, maxCapacity); + return newDirectBufferL(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { + throw new UnsupportedOperationException("Arrow doesn't support using heap buffers."); + } + + + private void validate(int initialCapacity, int maxCapacity) { + if (initialCapacity < 0) { + throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expected: 0+)"); + } + if (initialCapacity > maxCapacity) { + throw new IllegalArgumentException(String.format( + "initialCapacity: %d (expected: not greater than maxCapacity(%d)", + initialCapacity, maxCapacity)); + } + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append(directArenas.length); + buf.append(" direct arena(s):"); + buf.append(StringUtil.NEWLINE); + for (PoolArena a : directArenas) { + buf.append(a); + } + + buf.append("Large buffers outstanding: "); + buf.append(hugeBufferCount.get()); + buf.append(" totaling "); + buf.append(hugeBufferSize.get()); + buf.append(" bytes."); + buf.append('\n'); + buf.append("Normal buffers outstanding: "); + buf.append(normalBufferCount.get()); + buf.append(" totaling "); + buf.append(normalBufferSize.get()); + buf.append(" bytes."); + return buf.toString(); + } + + private class MemoryStatusThread extends Thread { + + public MemoryStatusThread() { + super("allocation.logger"); + this.setDaemon(true); + } + + @Override + public void run() { + while (true) { + memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString()); + try { + Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000); + } catch (InterruptedException e) { + return; + } + } + } + } + + + } +}