From 0229077a692e8c12bf570c9398407caee7e0da43 Mon Sep 17 00:00:00 2001 From: chenhang Date: Fri, 4 Mar 2022 09:12:03 +0800 Subject: [PATCH 1/3] skip checkpoint compare and reply --- .../org/apache/bookkeeper/bookie/Journal.java | 8 +- .../bookkeeper/bookie/JournalChannel.java | 137 ++++++++++-------- .../apache/bookkeeper/bookie/SyncThread.java | 3 + .../bookie/storage/ldb/DbLedgerStorage.java | 2 + .../ldb/SingleDirectoryDbLedgerStorage.java | 24 ++- 5 files changed, 112 insertions(+), 62 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 3e3b12392c1..602dcd2d150 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -799,6 +799,7 @@ public long scanJournal(long journalId, long journalPos, JournalScanner scanner) recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, journalPos, conf, fileChannelProvider); } + recLog.skipHeader(); int journalVersion = recLog.getFormatVersion(); try { ByteBuffer lenBuff = ByteBuffer.allocate(4); @@ -809,11 +810,15 @@ public long scanJournal(long journalId, long journalPos, JournalScanner scanner) // start reading entry lenBuff.clear(); fullRead(recLog, lenBuff); - if (lenBuff.remaining() != 0) { + LOG.info("[hangc] offset: {}, remaining: {}", offset, lenBuff.remaining()); + if (lenBuff.remaining() == 0) { break; } + lenBuff.flip(); int len = lenBuff.getInt(); + + LOG.info("[hangc]offset: {}, len: {}", offset, len); if (len == 0) { break; } @@ -978,6 +983,7 @@ public void run() { logFile = new JournalChannel(journalDirectory, logId, journalPreAllocSize, journalWriteBufferSize, journalAlignmentSize, removePagesFromCache, journalFormatVersionToWrite, getBufferedChannelBuilder(), conf, fileChannelProvider); + logFile.writeHeader(); journalStats.getJournalCreationStats().registerSuccessfulEvent( journalCreationWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java index 4e0c1720e8b..8c00f53e268 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java @@ -52,7 +52,8 @@ class JournalChannel implements Closeable { final int fd; final FileChannel fc; final BufferedChannel bc; - final int formatVersion; + int formatVersion; + final long position; long nextPrealloc = 0; final byte[] magicWord = "BKLG".getBytes(UTF_8); @@ -163,6 +164,7 @@ private JournalChannel(File journalDirectory, long logId, this.fRemoveFromPageCache = fRemoveFromPageCache; this.configuration = configuration; this.fileChannelProvider = provider; + this.position = position; File fn = new File(journalDirectory, Long.toHexString(logId) + ".txn"); channel = fileChannelProvider.open(fn, configuration); @@ -182,6 +184,7 @@ private JournalChannel(File journalDirectory, long logId, fc = channel.getFileChannel(); formatVersion = formatVersionToWrite; + /* int headerSize = (V4 == formatVersion) ? VERSION_HEADER_SIZE : HEADER_SIZE; ByteBuffer bb = ByteBuffer.allocate(headerSize); ZeroBuffer.put(bb); @@ -191,78 +194,24 @@ private JournalChannel(File journalDirectory, long logId, bb.clear(); fc.write(bb); + */ + bc = bcBuilder.create(fc, writeBufferSize); + /* forceWrite(true); nextPrealloc = this.preAllocSize; fc.write(zeros, nextPrealloc - journalAlignSize); + + */ } else { // open an existing file if (channel instanceof DefaultFileChannel) { fc = channel.getFileChannel(); bc = null; // readonly - - ByteBuffer bb = ByteBuffer.allocate(VERSION_HEADER_SIZE); - int c = fc.read(bb); - bb.flip(); - - if (c == VERSION_HEADER_SIZE) { - byte[] first4 = new byte[4]; - bb.get(first4); - - if (Arrays.equals(first4, magicWord)) { - formatVersion = bb.getInt(); - } else { - // pre magic word journal, reset to 0; - formatVersion = V1; - } - } else { - // no header, must be old version - formatVersion = V1; - } - - if (formatVersion < MIN_COMPAT_JOURNAL_FORMAT_VERSION - || formatVersion > CURRENT_JOURNAL_FORMAT_VERSION) { - String err = String.format("Invalid journal version, unable to read." - + " Expected between (%d) and (%d), got (%d)", - MIN_COMPAT_JOURNAL_FORMAT_VERSION, CURRENT_JOURNAL_FORMAT_VERSION, - formatVersion); - LOG.error(err); - throw new IOException(err); - } - - try { - if (position == START_OF_FILE) { - if (formatVersion >= V5) { - fc.position(HEADER_SIZE); - } else if (formatVersion >= V2) { - fc.position(VERSION_HEADER_SIZE); - } else { - fc.position(0); - } - } else { - fc.position(position); - } - } catch (IOException e) { - LOG.error("Bookie journal file can seek to position :", e); - throw e; - } } else { // rewrite the existing file fc = channel.getFileChannel(); formatVersion = formatVersionToWrite; - - int headerSize = (V4 == formatVersion) ? VERSION_HEADER_SIZE : HEADER_SIZE; - ByteBuffer bb = ByteBuffer.allocate(headerSize); - ZeroBuffer.put(bb); - bb.clear(); - bb.put(magicWord); - bb.putInt(formatVersion); - bb.clear(); - fc.write(bb); - bc = bcBuilder.create(fc, writeBufferSize); - forceWrite(true); - nextPrealloc = this.preAllocSize; - fc.write(zeros, nextPrealloc - journalAlignSize); } } if (fRemoveFromPageCache) { @@ -272,6 +221,74 @@ private JournalChannel(File journalDirectory, long logId, } } + public void writeHeader() throws IOException { + try { + int headerSize = (V4 == formatVersion) ? VERSION_HEADER_SIZE : HEADER_SIZE; + ByteBuffer bb = ByteBuffer.allocate(headerSize); + ZeroBuffer.put(bb); + bb.clear(); + bb.put(magicWord); + bb.putInt(formatVersion); + bb.clear(); + fc.write(bb); + + forceWrite(true); + nextPrealloc = this.preAllocSize; + fc.write(zeros, nextPrealloc - journalAlignSize); + } catch (IOException e) { + LOG.error("Failed to write journal header. ", e); + throw e; + } + } + + public void skipHeader() throws IOException { + ByteBuffer bb = ByteBuffer.allocate(VERSION_HEADER_SIZE); + int c = fc.read(bb); + bb.flip(); + + if (c == VERSION_HEADER_SIZE) { + byte[] first4 = new byte[4]; + bb.get(first4); + + if (Arrays.equals(first4, magicWord)) { + formatVersion = bb.getInt(); + } else { + // pre magic word journal, reset to 0; + formatVersion = V1; + } + } else { + // no header, must be old version + formatVersion = V1; + } + + if (formatVersion < MIN_COMPAT_JOURNAL_FORMAT_VERSION + || formatVersion > CURRENT_JOURNAL_FORMAT_VERSION) { + String err = String.format("Invalid journal version, unable to read." + + " Expected between (%d) and (%d), got (%d)", + MIN_COMPAT_JOURNAL_FORMAT_VERSION, CURRENT_JOURNAL_FORMAT_VERSION, + formatVersion); + LOG.error(err); + throw new IOException(err); + } + + try { + if (position == START_OF_FILE) { + if (formatVersion >= V5) { + fc.position(HEADER_SIZE); + } else if (formatVersion >= V2) { + fc.position(VERSION_HEADER_SIZE); + } else { + fc.position(0); + } + } else { + fc.position(position); + } + } catch (IOException e) { + LOG.error("Bookie journal file can seek to position :", e); + throw e; + } + } + int getFormatVersion() { return formatVersion; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java index 00288dc27d6..3aa853424cf 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java @@ -118,6 +118,7 @@ public Future requestFlush() { private void flush() { Checkpoint checkpoint = checkpointSource.newCheckpoint(); try { + log.info("[hangc] syncThread flush..."); ledgerStorage.flush(); } catch (NoWritableLedgerDirException e) { log.error("No writeable ledger directories", e); @@ -149,7 +150,9 @@ public void checkpoint(Checkpoint checkpoint) { } try { + log.info("[hangc] SyncThread start ..."); ledgerStorage.checkpoint(checkpoint); + log.info("[hangc] SyncThread completed ..."); } catch (NoWritableLedgerDirException e) { log.error("No writeable ledger directories", e); dirsListener.allDisksFull(true); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java index 9b84db43cae..7361be56821 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java @@ -228,7 +228,9 @@ public void flush() throws IOException { @Override public void checkpoint(Checkpoint checkpoint) throws IOException { for (LedgerStorage ls : ledgerStorageList) { + log.info("[hangc] DbLedgerStorage start..."); ls.checkpoint(checkpoint); + log.info("[hangc] DbLedgerStorage completed..."); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 94904befb40..fd2e0f5d692 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -359,6 +359,7 @@ private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry) log.info("Write cache is full, triggering flush"); executor.execute(() -> { try { + log.info("[hangc] full write cache..."); flush(); } catch (IOException e) { log.error("Error during flush", e); @@ -366,17 +367,22 @@ private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry) }); } + log.info("[hangc] request write lock..."); long stamp = writeCacheRotationLock.readLock(); try { + log.info("[hangc] get write lock..."); if (writeCache.put(ledgerId, entryId, entry)) { // We succeeded in putting the entry in write cache in the + log.info("[hangc] put complete..."); return; } } finally { + log.info("[hangc] release write lock..."); writeCacheRotationLock.unlockRead(stamp); } // Wait some time and try again + log.info("[hangc] go into sleep..."); try { Thread.sleep(1); } catch (InterruptedException e) { @@ -581,10 +587,15 @@ boolean isFlushRequired() { @Override public void checkpoint(Checkpoint checkpoint) throws IOException { + log.info("[hangc] aaa"); Checkpoint thisCheckpoint = checkpointSource.newCheckpoint(); + /* if (lastCheckpoint.compareTo(checkpoint) > 0) { + log.info("[hangc] xxx"); + log.info("[hangc] lastCheckpoint: {}, checkpoint: {}, this checkpoint: {}", + lastCheckpoint, checkpoint, thisCheckpoint); return; - } + }*/ long startTime = MathUtils.nowInNano(); @@ -592,10 +603,12 @@ public void checkpoint(Checkpoint checkpoint) throws IOException { flushMutex.lock(); try { + log.info("[hangc] bbb"); // Swap the write cache so that writes can continue to happen while the flush is // ongoing swapWriteCache(); + log.info("[hangc] ccc"); long sizeToFlush = writeCacheBeingFlushed.size(); if (log.isDebugEnabled()) { log.debug("Flushing entries. count: {} -- size {} Mb", writeCacheBeingFlushed.count(), @@ -614,9 +627,12 @@ public void checkpoint(Checkpoint checkpoint) throws IOException { throw new RuntimeException(e); } }); + log.info("[hangc] ddd"); entryLogger.flush(); + log.info("[hangc] eee"); + long batchFlushStarTime = System.nanoTime(); batch.flush(); batch.close(); @@ -624,9 +640,11 @@ public void checkpoint(Checkpoint checkpoint) throws IOException { log.debug("DB batch flushed time : {} s", MathUtils.elapsedNanos(batchFlushStarTime) / (double) TimeUnit.SECONDS.toNanos(1)); } + log.info("[hangc] fff"); ledgerIndex.flush(); + log.info("[hangc] ggg"); cleanupExecutor.execute(() -> { // There can only be one single cleanup task running because the cleanupExecutor // is single-threaded @@ -656,6 +674,7 @@ public void checkpoint(Checkpoint checkpoint) throws IOException { recordSuccessfulEvent(dbLedgerStorageStats.getFlushStats(), startTime); dbLedgerStorageStats.getFlushSizeStats().registerSuccessfulValue(sizeToFlush); + log.info("[hangc] hhh"); } catch (IOException e) { // Leave IOExecption as it is throw e; @@ -696,9 +715,11 @@ private void swapWriteCache() { @Override public void flush() throws IOException { + log.info("[hangc] flush..."); Checkpoint cp = checkpointSource.newCheckpoint(); checkpoint(cp); checkpointSource.checkpointComplete(cp, true); + log.info("[hangc] flush completed..."); } @Override @@ -737,6 +758,7 @@ public Iterable getActiveLedgersInRange(long firstLedgerId, long lastLedge @Override public void updateEntriesLocations(Iterable locations) throws IOException { // Trigger a flush to have all the entries being compacted in the db storage + log.info("[hangc] updateEntriesLocations...."); flush(); entryLocationIndex.updateLocations(locations); From 339b77540ff773f34257fafc2640e9dafc180195 Mon Sep 17 00:00:00 2001 From: chenhang Date: Fri, 4 Mar 2022 15:14:35 +0800 Subject: [PATCH 2/3] fix reuse journal file bug, which lead to checkpoint can't complete --- .../org/apache/bookkeeper/bookie/Bookie.java | 10 ++++--- .../org/apache/bookkeeper/bookie/Journal.java | 23 +++++++++------- .../apache/bookkeeper/bookie/SyncThread.java | 3 --- .../bookie/storage/ldb/DbLedgerStorage.java | 2 -- .../ldb/SingleDirectoryDbLedgerStorage.java | 26 ++----------------- .../bookkeeper/conf/ServerConfiguration.java | 21 ++++++++------- conf/bk_server.conf | 4 +++ 7 files changed, 38 insertions(+), 51 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index 28c76f63296..ee856749475 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -973,13 +973,15 @@ public void process(int journalVersion, long offset, ByteBuffer recBuff) throws */ private void replay(Journal journal, JournalScanner scanner) throws IOException { final LogMark markedLog = journal.getLastLogMark().getCurMark(); + long markedLogFileId = conf.getJournalReuseFiles() ? + markedLog.getLogFileId() % journal.maxBackupJournals : markedLog.getLogFileId(); List logs = Journal.listJournalIds(journal.getJournalDirectory(), journalId -> - journalId >= markedLog.getLogFileId()); + journalId >= markedLogFileId); // last log mark may be missed due to no sync up before // validate filtered log ids only when we have markedLogId if (markedLog.getLogFileId() > 0) { - if (logs.size() == 0 || logs.get(0) != markedLog.getLogFileId()) { - throw new IOException("Recovery log " + markedLog.getLogFileId() + " is missing"); + if (logs.size() == 0 || logs.get(0) != markedLogFileId) { + throw new IOException("Recovery log " + markedLogFileId + " is missing"); } } @@ -988,7 +990,7 @@ private void replay(Journal journal, JournalScanner scanner) throws IOException // system calls done. for (Long id : logs) { long logPosition = 0L; - if (id == markedLog.getLogFileId()) { + if (id == markedLogFileId) { logPosition = markedLog.getLogFileOffset(); } LOG.info("Replaying journal {} from position {}", id, logPosition); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 602dcd2d150..47c319c736a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -616,6 +616,9 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour // Should data be fsynced on disk before triggering the callback private final boolean syncData; + // Whether should we reuse journal files. + private final boolean reuseJournalFiles; + private final LastLogMark lastLogMark = new LastLogMark(0, 0); private static final String LAST_MARK_DEFAULT_NAME = "lastMark"; @@ -635,7 +638,6 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour private final LedgerDirsManager ledgerDirsManager; private final ByteBufAllocator allocator; private final MemoryLimitController memoryLimitController; - private final int journalMaxPoolSize; // Expose Stats private final JournalStats journalStats; @@ -678,6 +680,7 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf this.journalFormatVersionToWrite = conf.getJournalFormatVersionToWrite(); this.journalAlignmentSize = conf.getJournalAlignmentSize(); this.journalPageCacheFlushIntervalMSec = conf.getJournalPageCacheFlushIntervalMSec(); + this.reuseJournalFiles = conf.getJournalReuseFiles(); if (conf.getNumJournalCallbackThreads() > 0) { this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumJournalCallbackThreads(), new DefaultThreadFactory("bookie-journal-callback")); @@ -708,7 +711,6 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf conf.getJournalChannelProvider()); throw new RuntimeException("Failed to initiate fileChannel provider"); } - this.journalMaxPoolSize = conf.getJournalMaxPoolSize(); // Expose Stats this.journalStats = new JournalStats(statsLogger); @@ -793,6 +795,10 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO public long scanJournal(long journalId, long journalPos, JournalScanner scanner) throws IOException { JournalChannel recLog; + if (reuseJournalFiles) { + journalId = journalId % maxBackupJournals; + } + if (journalPos <= 0) { recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, conf, fileChannelProvider); } else { @@ -810,7 +816,6 @@ public long scanJournal(long journalId, long journalPos, JournalScanner scanner) // start reading entry lenBuff.clear(); fullRead(recLog, lenBuff); - LOG.info("[hangc] offset: {}, remaining: {}", offset, lenBuff.remaining()); if (lenBuff.remaining() == 0) { break; } @@ -818,7 +823,6 @@ public long scanJournal(long journalId, long journalPos, JournalScanner scanner) lenBuff.flip(); int len = lenBuff.getInt(); - LOG.info("[hangc]offset: {}, len: {}", offset, len); if (len == 0) { break; } @@ -961,6 +965,7 @@ public void run() { // could only be used to measure elapsed time. // http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29 long logId = journalIds.isEmpty() ? System.currentTimeMillis() : journalIds.get(journalIds.size() - 1); + long actualLogId; long lastFlushPosition = 0; boolean groupWhenTimeout = false; @@ -971,16 +976,16 @@ public void run() { while (true) { // new journal file to write if (null == logFile) { - - if (fileChannelProvider instanceof DefaultFileChannelProvider) { - logId = logId + 1; + logId = logId + 1; + if (reuseJournalFiles) { + actualLogId = logId % maxBackupJournals; } else { - logId = (logId + 1) % journalMaxPoolSize; + actualLogId = logId; } journalCreationWatcher.reset().start(); LOG.info("Start generate new journal log file."); - logFile = new JournalChannel(journalDirectory, logId, journalPreAllocSize, journalWriteBufferSize, + logFile = new JournalChannel(journalDirectory, actualLogId, journalPreAllocSize, journalWriteBufferSize, journalAlignmentSize, removePagesFromCache, journalFormatVersionToWrite, getBufferedChannelBuilder(), conf, fileChannelProvider); logFile.writeHeader(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java index 3aa853424cf..00288dc27d6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java @@ -118,7 +118,6 @@ public Future requestFlush() { private void flush() { Checkpoint checkpoint = checkpointSource.newCheckpoint(); try { - log.info("[hangc] syncThread flush..."); ledgerStorage.flush(); } catch (NoWritableLedgerDirException e) { log.error("No writeable ledger directories", e); @@ -150,9 +149,7 @@ public void checkpoint(Checkpoint checkpoint) { } try { - log.info("[hangc] SyncThread start ..."); ledgerStorage.checkpoint(checkpoint); - log.info("[hangc] SyncThread completed ..."); } catch (NoWritableLedgerDirException e) { log.error("No writeable ledger directories", e); dirsListener.allDisksFull(true); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java index 7361be56821..9b84db43cae 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java @@ -228,9 +228,7 @@ public void flush() throws IOException { @Override public void checkpoint(Checkpoint checkpoint) throws IOException { for (LedgerStorage ls : ledgerStorageList) { - log.info("[hangc] DbLedgerStorage start..."); ls.checkpoint(checkpoint); - log.info("[hangc] DbLedgerStorage completed..."); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index fd2e0f5d692..530232951a7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -359,7 +359,6 @@ private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry) log.info("Write cache is full, triggering flush"); executor.execute(() -> { try { - log.info("[hangc] full write cache..."); flush(); } catch (IOException e) { log.error("Error during flush", e); @@ -367,22 +366,17 @@ private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry) }); } - log.info("[hangc] request write lock..."); long stamp = writeCacheRotationLock.readLock(); try { - log.info("[hangc] get write lock..."); if (writeCache.put(ledgerId, entryId, entry)) { // We succeeded in putting the entry in write cache in the - log.info("[hangc] put complete..."); return; } } finally { - log.info("[hangc] release write lock..."); writeCacheRotationLock.unlockRead(stamp); } // Wait some time and try again - log.info("[hangc] go into sleep..."); try { Thread.sleep(1); } catch (InterruptedException e) { @@ -587,15 +581,11 @@ boolean isFlushRequired() { @Override public void checkpoint(Checkpoint checkpoint) throws IOException { - log.info("[hangc] aaa"); Checkpoint thisCheckpoint = checkpointSource.newCheckpoint(); - /* + if (lastCheckpoint.compareTo(checkpoint) > 0) { - log.info("[hangc] xxx"); - log.info("[hangc] lastCheckpoint: {}, checkpoint: {}, this checkpoint: {}", - lastCheckpoint, checkpoint, thisCheckpoint); return; - }*/ + } long startTime = MathUtils.nowInNano(); @@ -603,12 +593,10 @@ public void checkpoint(Checkpoint checkpoint) throws IOException { flushMutex.lock(); try { - log.info("[hangc] bbb"); // Swap the write cache so that writes can continue to happen while the flush is // ongoing swapWriteCache(); - log.info("[hangc] ccc"); long sizeToFlush = writeCacheBeingFlushed.size(); if (log.isDebugEnabled()) { log.debug("Flushing entries. count: {} -- size {} Mb", writeCacheBeingFlushed.count(), @@ -627,12 +615,9 @@ public void checkpoint(Checkpoint checkpoint) throws IOException { throw new RuntimeException(e); } }); - log.info("[hangc] ddd"); entryLogger.flush(); - log.info("[hangc] eee"); - long batchFlushStarTime = System.nanoTime(); batch.flush(); batch.close(); @@ -640,11 +625,8 @@ public void checkpoint(Checkpoint checkpoint) throws IOException { log.debug("DB batch flushed time : {} s", MathUtils.elapsedNanos(batchFlushStarTime) / (double) TimeUnit.SECONDS.toNanos(1)); } - log.info("[hangc] fff"); - ledgerIndex.flush(); - log.info("[hangc] ggg"); cleanupExecutor.execute(() -> { // There can only be one single cleanup task running because the cleanupExecutor // is single-threaded @@ -674,7 +656,6 @@ public void checkpoint(Checkpoint checkpoint) throws IOException { recordSuccessfulEvent(dbLedgerStorageStats.getFlushStats(), startTime); dbLedgerStorageStats.getFlushSizeStats().registerSuccessfulValue(sizeToFlush); - log.info("[hangc] hhh"); } catch (IOException e) { // Leave IOExecption as it is throw e; @@ -715,11 +696,9 @@ private void swapWriteCache() { @Override public void flush() throws IOException { - log.info("[hangc] flush..."); Checkpoint cp = checkpointSource.newCheckpoint(); checkpoint(cp); checkpointSource.checkpointComplete(cp, true); - log.info("[hangc] flush completed..."); } @Override @@ -758,7 +737,6 @@ public Iterable getActiveLedgersInRange(long firstLedgerId, long lastLedge @Override public void updateEntriesLocations(Iterable locations) throws IOException { // Trigger a flush to have all the entries being compacted in the db storage - log.info("[hangc] updateEntriesLocations...."); flush(); entryLocationIndex.updateLocations(locations); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index acb086d61a1..8d0a360378a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -148,7 +148,7 @@ public class ServerConfiguration extends AbstractConfiguration Date: Fri, 4 Mar 2022 15:20:36 +0800 Subject: [PATCH 3/3] fix check style --- .../main/java/org/apache/bookkeeper/bookie/Bookie.java | 4 ++-- .../main/java/org/apache/bookkeeper/bookie/Journal.java | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index ee856749475..32c1a4ec602 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -973,8 +973,8 @@ public void process(int journalVersion, long offset, ByteBuffer recBuff) throws */ private void replay(Journal journal, JournalScanner scanner) throws IOException { final LogMark markedLog = journal.getLastLogMark().getCurMark(); - long markedLogFileId = conf.getJournalReuseFiles() ? - markedLog.getLogFileId() % journal.maxBackupJournals : markedLog.getLogFileId(); + long markedLogFileId = conf.getJournalReuseFiles() + ? markedLog.getLogFileId() % journal.maxBackupJournals : markedLog.getLogFileId(); List logs = Journal.listJournalIds(journal.getJournalDirectory(), journalId -> journalId >= markedLogFileId); // last log mark may be missed due to no sync up before diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 47c319c736a..b589c0ca61c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -800,7 +800,8 @@ public long scanJournal(long journalId, long journalPos, JournalScanner scanner) } if (journalPos <= 0) { - recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, conf, fileChannelProvider); + recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, + conf, fileChannelProvider); } else { recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, journalPos, conf, fileChannelProvider); @@ -985,9 +986,9 @@ public void run() { journalCreationWatcher.reset().start(); LOG.info("Start generate new journal log file."); - logFile = new JournalChannel(journalDirectory, actualLogId, journalPreAllocSize, journalWriteBufferSize, - journalAlignmentSize, removePagesFromCache, - journalFormatVersionToWrite, getBufferedChannelBuilder(), conf, fileChannelProvider); + logFile = new JournalChannel(journalDirectory, actualLogId, journalPreAllocSize, + journalWriteBufferSize, journalAlignmentSize, removePagesFromCache, + journalFormatVersionToWrite, getBufferedChannelBuilder(), conf, fileChannelProvider); logFile.writeHeader(); journalStats.getJournalCreationStats().registerSuccessfulEvent(