diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index 28c76f63296..32c1a4ec602 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -973,13 +973,15 @@ public void process(int journalVersion, long offset, ByteBuffer recBuff) throws */ private void replay(Journal journal, JournalScanner scanner) throws IOException { final LogMark markedLog = journal.getLastLogMark().getCurMark(); + long markedLogFileId = conf.getJournalReuseFiles() + ? markedLog.getLogFileId() % journal.maxBackupJournals : markedLog.getLogFileId(); List logs = Journal.listJournalIds(journal.getJournalDirectory(), journalId -> - journalId >= markedLog.getLogFileId()); + journalId >= markedLogFileId); // last log mark may be missed due to no sync up before // validate filtered log ids only when we have markedLogId if (markedLog.getLogFileId() > 0) { - if (logs.size() == 0 || logs.get(0) != markedLog.getLogFileId()) { - throw new IOException("Recovery log " + markedLog.getLogFileId() + " is missing"); + if (logs.size() == 0 || logs.get(0) != markedLogFileId) { + throw new IOException("Recovery log " + markedLogFileId + " is missing"); } } @@ -988,7 +990,7 @@ private void replay(Journal journal, JournalScanner scanner) throws IOException // system calls done. for (Long id : logs) { long logPosition = 0L; - if (id == markedLog.getLogFileId()) { + if (id == markedLogFileId) { logPosition = markedLog.getLogFileOffset(); } LOG.info("Replaying journal {} from position {}", id, logPosition); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 3e3b12392c1..b589c0ca61c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -616,6 +616,9 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour // Should data be fsynced on disk before triggering the callback private final boolean syncData; + // Whether should we reuse journal files. + private final boolean reuseJournalFiles; + private final LastLogMark lastLogMark = new LastLogMark(0, 0); private static final String LAST_MARK_DEFAULT_NAME = "lastMark"; @@ -635,7 +638,6 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour private final LedgerDirsManager ledgerDirsManager; private final ByteBufAllocator allocator; private final MemoryLimitController memoryLimitController; - private final int journalMaxPoolSize; // Expose Stats private final JournalStats journalStats; @@ -678,6 +680,7 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf this.journalFormatVersionToWrite = conf.getJournalFormatVersionToWrite(); this.journalAlignmentSize = conf.getJournalAlignmentSize(); this.journalPageCacheFlushIntervalMSec = conf.getJournalPageCacheFlushIntervalMSec(); + this.reuseJournalFiles = conf.getJournalReuseFiles(); if (conf.getNumJournalCallbackThreads() > 0) { this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumJournalCallbackThreads(), new DefaultThreadFactory("bookie-journal-callback")); @@ -708,7 +711,6 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf conf.getJournalChannelProvider()); throw new RuntimeException("Failed to initiate fileChannel provider"); } - this.journalMaxPoolSize = conf.getJournalMaxPoolSize(); // Expose Stats this.journalStats = new JournalStats(statsLogger); @@ -793,12 +795,18 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO public long scanJournal(long journalId, long journalPos, JournalScanner scanner) throws IOException { JournalChannel recLog; + if (reuseJournalFiles) { + journalId = journalId % maxBackupJournals; + } + if (journalPos <= 0) { - recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, conf, fileChannelProvider); + recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, + conf, fileChannelProvider); } else { recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, journalPos, conf, fileChannelProvider); } + recLog.skipHeader(); int journalVersion = recLog.getFormatVersion(); try { ByteBuffer lenBuff = ByteBuffer.allocate(4); @@ -809,11 +817,13 @@ public long scanJournal(long journalId, long journalPos, JournalScanner scanner) // start reading entry lenBuff.clear(); fullRead(recLog, lenBuff); - if (lenBuff.remaining() != 0) { + if (lenBuff.remaining() == 0) { break; } + lenBuff.flip(); int len = lenBuff.getInt(); + if (len == 0) { break; } @@ -956,6 +966,7 @@ public void run() { // could only be used to measure elapsed time. // http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29 long logId = journalIds.isEmpty() ? System.currentTimeMillis() : journalIds.get(journalIds.size() - 1); + long actualLogId; long lastFlushPosition = 0; boolean groupWhenTimeout = false; @@ -966,18 +977,19 @@ public void run() { while (true) { // new journal file to write if (null == logFile) { - - if (fileChannelProvider instanceof DefaultFileChannelProvider) { - logId = logId + 1; + logId = logId + 1; + if (reuseJournalFiles) { + actualLogId = logId % maxBackupJournals; } else { - logId = (logId + 1) % journalMaxPoolSize; + actualLogId = logId; } journalCreationWatcher.reset().start(); LOG.info("Start generate new journal log file."); - logFile = new JournalChannel(journalDirectory, logId, journalPreAllocSize, journalWriteBufferSize, - journalAlignmentSize, removePagesFromCache, - journalFormatVersionToWrite, getBufferedChannelBuilder(), conf, fileChannelProvider); + logFile = new JournalChannel(journalDirectory, actualLogId, journalPreAllocSize, + journalWriteBufferSize, journalAlignmentSize, removePagesFromCache, + journalFormatVersionToWrite, getBufferedChannelBuilder(), conf, fileChannelProvider); + logFile.writeHeader(); journalStats.getJournalCreationStats().registerSuccessfulEvent( journalCreationWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java index 4e0c1720e8b..8c00f53e268 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java @@ -52,7 +52,8 @@ class JournalChannel implements Closeable { final int fd; final FileChannel fc; final BufferedChannel bc; - final int formatVersion; + int formatVersion; + final long position; long nextPrealloc = 0; final byte[] magicWord = "BKLG".getBytes(UTF_8); @@ -163,6 +164,7 @@ private JournalChannel(File journalDirectory, long logId, this.fRemoveFromPageCache = fRemoveFromPageCache; this.configuration = configuration; this.fileChannelProvider = provider; + this.position = position; File fn = new File(journalDirectory, Long.toHexString(logId) + ".txn"); channel = fileChannelProvider.open(fn, configuration); @@ -182,6 +184,7 @@ private JournalChannel(File journalDirectory, long logId, fc = channel.getFileChannel(); formatVersion = formatVersionToWrite; + /* int headerSize = (V4 == formatVersion) ? VERSION_HEADER_SIZE : HEADER_SIZE; ByteBuffer bb = ByteBuffer.allocate(headerSize); ZeroBuffer.put(bb); @@ -191,78 +194,24 @@ private JournalChannel(File journalDirectory, long logId, bb.clear(); fc.write(bb); + */ + bc = bcBuilder.create(fc, writeBufferSize); + /* forceWrite(true); nextPrealloc = this.preAllocSize; fc.write(zeros, nextPrealloc - journalAlignSize); + + */ } else { // open an existing file if (channel instanceof DefaultFileChannel) { fc = channel.getFileChannel(); bc = null; // readonly - - ByteBuffer bb = ByteBuffer.allocate(VERSION_HEADER_SIZE); - int c = fc.read(bb); - bb.flip(); - - if (c == VERSION_HEADER_SIZE) { - byte[] first4 = new byte[4]; - bb.get(first4); - - if (Arrays.equals(first4, magicWord)) { - formatVersion = bb.getInt(); - } else { - // pre magic word journal, reset to 0; - formatVersion = V1; - } - } else { - // no header, must be old version - formatVersion = V1; - } - - if (formatVersion < MIN_COMPAT_JOURNAL_FORMAT_VERSION - || formatVersion > CURRENT_JOURNAL_FORMAT_VERSION) { - String err = String.format("Invalid journal version, unable to read." - + " Expected between (%d) and (%d), got (%d)", - MIN_COMPAT_JOURNAL_FORMAT_VERSION, CURRENT_JOURNAL_FORMAT_VERSION, - formatVersion); - LOG.error(err); - throw new IOException(err); - } - - try { - if (position == START_OF_FILE) { - if (formatVersion >= V5) { - fc.position(HEADER_SIZE); - } else if (formatVersion >= V2) { - fc.position(VERSION_HEADER_SIZE); - } else { - fc.position(0); - } - } else { - fc.position(position); - } - } catch (IOException e) { - LOG.error("Bookie journal file can seek to position :", e); - throw e; - } } else { // rewrite the existing file fc = channel.getFileChannel(); formatVersion = formatVersionToWrite; - - int headerSize = (V4 == formatVersion) ? VERSION_HEADER_SIZE : HEADER_SIZE; - ByteBuffer bb = ByteBuffer.allocate(headerSize); - ZeroBuffer.put(bb); - bb.clear(); - bb.put(magicWord); - bb.putInt(formatVersion); - bb.clear(); - fc.write(bb); - bc = bcBuilder.create(fc, writeBufferSize); - forceWrite(true); - nextPrealloc = this.preAllocSize; - fc.write(zeros, nextPrealloc - journalAlignSize); } } if (fRemoveFromPageCache) { @@ -272,6 +221,74 @@ private JournalChannel(File journalDirectory, long logId, } } + public void writeHeader() throws IOException { + try { + int headerSize = (V4 == formatVersion) ? VERSION_HEADER_SIZE : HEADER_SIZE; + ByteBuffer bb = ByteBuffer.allocate(headerSize); + ZeroBuffer.put(bb); + bb.clear(); + bb.put(magicWord); + bb.putInt(formatVersion); + bb.clear(); + fc.write(bb); + + forceWrite(true); + nextPrealloc = this.preAllocSize; + fc.write(zeros, nextPrealloc - journalAlignSize); + } catch (IOException e) { + LOG.error("Failed to write journal header. ", e); + throw e; + } + } + + public void skipHeader() throws IOException { + ByteBuffer bb = ByteBuffer.allocate(VERSION_HEADER_SIZE); + int c = fc.read(bb); + bb.flip(); + + if (c == VERSION_HEADER_SIZE) { + byte[] first4 = new byte[4]; + bb.get(first4); + + if (Arrays.equals(first4, magicWord)) { + formatVersion = bb.getInt(); + } else { + // pre magic word journal, reset to 0; + formatVersion = V1; + } + } else { + // no header, must be old version + formatVersion = V1; + } + + if (formatVersion < MIN_COMPAT_JOURNAL_FORMAT_VERSION + || formatVersion > CURRENT_JOURNAL_FORMAT_VERSION) { + String err = String.format("Invalid journal version, unable to read." + + " Expected between (%d) and (%d), got (%d)", + MIN_COMPAT_JOURNAL_FORMAT_VERSION, CURRENT_JOURNAL_FORMAT_VERSION, + formatVersion); + LOG.error(err); + throw new IOException(err); + } + + try { + if (position == START_OF_FILE) { + if (formatVersion >= V5) { + fc.position(HEADER_SIZE); + } else if (formatVersion >= V2) { + fc.position(VERSION_HEADER_SIZE); + } else { + fc.position(0); + } + } else { + fc.position(position); + } + } catch (IOException e) { + LOG.error("Bookie journal file can seek to position :", e); + throw e; + } + } + int getFormatVersion() { return formatVersion; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 94904befb40..530232951a7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -582,6 +582,7 @@ boolean isFlushRequired() { @Override public void checkpoint(Checkpoint checkpoint) throws IOException { Checkpoint thisCheckpoint = checkpointSource.newCheckpoint(); + if (lastCheckpoint.compareTo(checkpoint) > 0) { return; } @@ -624,7 +625,6 @@ public void checkpoint(Checkpoint checkpoint) throws IOException { log.debug("DB batch flushed time : {} s", MathUtils.elapsedNanos(batchFlushStarTime) / (double) TimeUnit.SECONDS.toNanos(1)); } - ledgerIndex.flush(); cleanupExecutor.execute(() -> { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index acb086d61a1..8d0a360378a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -148,7 +148,7 @@ public class ServerConfiguration extends AbstractConfiguration