Skip to content

Commit

Permalink
Merge pull request #3 from hangc0276/chenhang/fix_logid_pool
Browse files Browse the repository at this point in the history
Fix #1
Related to #2 , skip reply first, and then use another PR to fix reply issue.
  • Loading branch information
hangc0276 authored Mar 4, 2022
2 parents 7f86bca + 085033b commit 99e2b7b
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -973,13 +973,15 @@ public void process(int journalVersion, long offset, ByteBuffer recBuff) throws
*/
private void replay(Journal journal, JournalScanner scanner) throws IOException {
final LogMark markedLog = journal.getLastLogMark().getCurMark();
long markedLogFileId = conf.getJournalReuseFiles()
? markedLog.getLogFileId() % journal.maxBackupJournals : markedLog.getLogFileId();
List<Long> logs = Journal.listJournalIds(journal.getJournalDirectory(), journalId ->
journalId >= markedLog.getLogFileId());
journalId >= markedLogFileId);
// last log mark may be missed due to no sync up before
// validate filtered log ids only when we have markedLogId
if (markedLog.getLogFileId() > 0) {
if (logs.size() == 0 || logs.get(0) != markedLog.getLogFileId()) {
throw new IOException("Recovery log " + markedLog.getLogFileId() + " is missing");
if (logs.size() == 0 || logs.get(0) != markedLogFileId) {
throw new IOException("Recovery log " + markedLogFileId + " is missing");
}
}

Expand All @@ -988,7 +990,7 @@ private void replay(Journal journal, JournalScanner scanner) throws IOException
// system calls done.
for (Long id : logs) {
long logPosition = 0L;
if (id == markedLog.getLogFileId()) {
if (id == markedLogFileId) {
logPosition = markedLog.getLogFileOffset();
}
LOG.info("Replaying journal {} from position {}", id, logPosition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,9 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour
// Should data be fsynced on disk before triggering the callback
private final boolean syncData;

// Whether should we reuse journal files.
private final boolean reuseJournalFiles;

private final LastLogMark lastLogMark = new LastLogMark(0, 0);

private static final String LAST_MARK_DEFAULT_NAME = "lastMark";
Expand All @@ -635,7 +638,6 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour
private final LedgerDirsManager ledgerDirsManager;
private final ByteBufAllocator allocator;
private final MemoryLimitController memoryLimitController;
private final int journalMaxPoolSize;

// Expose Stats
private final JournalStats journalStats;
Expand Down Expand Up @@ -678,6 +680,7 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf
this.journalFormatVersionToWrite = conf.getJournalFormatVersionToWrite();
this.journalAlignmentSize = conf.getJournalAlignmentSize();
this.journalPageCacheFlushIntervalMSec = conf.getJournalPageCacheFlushIntervalMSec();
this.reuseJournalFiles = conf.getJournalReuseFiles();
if (conf.getNumJournalCallbackThreads() > 0) {
this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumJournalCallbackThreads(),
new DefaultThreadFactory("bookie-journal-callback"));
Expand Down Expand Up @@ -708,7 +711,6 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf
conf.getJournalChannelProvider());
throw new RuntimeException("Failed to initiate fileChannel provider");
}
this.journalMaxPoolSize = conf.getJournalMaxPoolSize();

// Expose Stats
this.journalStats = new JournalStats(statsLogger);
Expand Down Expand Up @@ -793,12 +795,18 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO
public long scanJournal(long journalId, long journalPos, JournalScanner scanner)
throws IOException {
JournalChannel recLog;
if (reuseJournalFiles) {
journalId = journalId % maxBackupJournals;
}

if (journalPos <= 0) {
recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, conf, fileChannelProvider);
recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize,
conf, fileChannelProvider);
} else {
recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize,
journalPos, conf, fileChannelProvider);
}
recLog.skipHeader();
int journalVersion = recLog.getFormatVersion();
try {
ByteBuffer lenBuff = ByteBuffer.allocate(4);
Expand All @@ -809,11 +817,13 @@ public long scanJournal(long journalId, long journalPos, JournalScanner scanner)
// start reading entry
lenBuff.clear();
fullRead(recLog, lenBuff);
if (lenBuff.remaining() != 0) {
if (lenBuff.remaining() == 0) {
break;
}

lenBuff.flip();
int len = lenBuff.getInt();

if (len == 0) {
break;
}
Expand Down Expand Up @@ -956,6 +966,7 @@ public void run() {
// could only be used to measure elapsed time.
// http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
long logId = journalIds.isEmpty() ? System.currentTimeMillis() : journalIds.get(journalIds.size() - 1);
long actualLogId;
long lastFlushPosition = 0;
boolean groupWhenTimeout = false;

Expand All @@ -966,18 +977,19 @@ public void run() {
while (true) {
// new journal file to write
if (null == logFile) {

if (fileChannelProvider instanceof DefaultFileChannelProvider) {
logId = logId + 1;
logId = logId + 1;
if (reuseJournalFiles) {
actualLogId = logId % maxBackupJournals;
} else {
logId = (logId + 1) % journalMaxPoolSize;
actualLogId = logId;
}

journalCreationWatcher.reset().start();
LOG.info("Start generate new journal log file.");
logFile = new JournalChannel(journalDirectory, logId, journalPreAllocSize, journalWriteBufferSize,
journalAlignmentSize, removePagesFromCache,
journalFormatVersionToWrite, getBufferedChannelBuilder(), conf, fileChannelProvider);
logFile = new JournalChannel(journalDirectory, actualLogId, journalPreAllocSize,
journalWriteBufferSize, journalAlignmentSize, removePagesFromCache,
journalFormatVersionToWrite, getBufferedChannelBuilder(), conf, fileChannelProvider);
logFile.writeHeader();

journalStats.getJournalCreationStats().registerSuccessfulEvent(
journalCreationWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class JournalChannel implements Closeable {
final int fd;
final FileChannel fc;
final BufferedChannel bc;
final int formatVersion;
int formatVersion;
final long position;
long nextPrealloc = 0;

final byte[] magicWord = "BKLG".getBytes(UTF_8);
Expand Down Expand Up @@ -163,6 +164,7 @@ private JournalChannel(File journalDirectory, long logId,
this.fRemoveFromPageCache = fRemoveFromPageCache;
this.configuration = configuration;
this.fileChannelProvider = provider;
this.position = position;

File fn = new File(journalDirectory, Long.toHexString(logId) + ".txn");
channel = fileChannelProvider.open(fn, configuration);
Expand All @@ -182,6 +184,7 @@ private JournalChannel(File journalDirectory, long logId,
fc = channel.getFileChannel();
formatVersion = formatVersionToWrite;

/*
int headerSize = (V4 == formatVersion) ? VERSION_HEADER_SIZE : HEADER_SIZE;
ByteBuffer bb = ByteBuffer.allocate(headerSize);
ZeroBuffer.put(bb);
Expand All @@ -191,78 +194,24 @@ private JournalChannel(File journalDirectory, long logId,
bb.clear();
fc.write(bb);
*/

bc = bcBuilder.create(fc, writeBufferSize);
/*
forceWrite(true);
nextPrealloc = this.preAllocSize;
fc.write(zeros, nextPrealloc - journalAlignSize);
*/
} else { // open an existing file
if (channel instanceof DefaultFileChannel) {
fc = channel.getFileChannel();
bc = null; // readonly

ByteBuffer bb = ByteBuffer.allocate(VERSION_HEADER_SIZE);
int c = fc.read(bb);
bb.flip();

if (c == VERSION_HEADER_SIZE) {
byte[] first4 = new byte[4];
bb.get(first4);

if (Arrays.equals(first4, magicWord)) {
formatVersion = bb.getInt();
} else {
// pre magic word journal, reset to 0;
formatVersion = V1;
}
} else {
// no header, must be old version
formatVersion = V1;
}

if (formatVersion < MIN_COMPAT_JOURNAL_FORMAT_VERSION
|| formatVersion > CURRENT_JOURNAL_FORMAT_VERSION) {
String err = String.format("Invalid journal version, unable to read."
+ " Expected between (%d) and (%d), got (%d)",
MIN_COMPAT_JOURNAL_FORMAT_VERSION, CURRENT_JOURNAL_FORMAT_VERSION,
formatVersion);
LOG.error(err);
throw new IOException(err);
}

try {
if (position == START_OF_FILE) {
if (formatVersion >= V5) {
fc.position(HEADER_SIZE);
} else if (formatVersion >= V2) {
fc.position(VERSION_HEADER_SIZE);
} else {
fc.position(0);
}
} else {
fc.position(position);
}
} catch (IOException e) {
LOG.error("Bookie journal file can seek to position :", e);
throw e;
}
} else {
// rewrite the existing file
fc = channel.getFileChannel();
formatVersion = formatVersionToWrite;

int headerSize = (V4 == formatVersion) ? VERSION_HEADER_SIZE : HEADER_SIZE;
ByteBuffer bb = ByteBuffer.allocate(headerSize);
ZeroBuffer.put(bb);
bb.clear();
bb.put(magicWord);
bb.putInt(formatVersion);
bb.clear();
fc.write(bb);

bc = bcBuilder.create(fc, writeBufferSize);
forceWrite(true);
nextPrealloc = this.preAllocSize;
fc.write(zeros, nextPrealloc - journalAlignSize);
}
}
if (fRemoveFromPageCache) {
Expand All @@ -272,6 +221,74 @@ private JournalChannel(File journalDirectory, long logId,
}
}

public void writeHeader() throws IOException {
try {
int headerSize = (V4 == formatVersion) ? VERSION_HEADER_SIZE : HEADER_SIZE;
ByteBuffer bb = ByteBuffer.allocate(headerSize);
ZeroBuffer.put(bb);
bb.clear();
bb.put(magicWord);
bb.putInt(formatVersion);
bb.clear();
fc.write(bb);

forceWrite(true);
nextPrealloc = this.preAllocSize;
fc.write(zeros, nextPrealloc - journalAlignSize);
} catch (IOException e) {
LOG.error("Failed to write journal header. ", e);
throw e;
}
}

public void skipHeader() throws IOException {
ByteBuffer bb = ByteBuffer.allocate(VERSION_HEADER_SIZE);
int c = fc.read(bb);
bb.flip();

if (c == VERSION_HEADER_SIZE) {
byte[] first4 = new byte[4];
bb.get(first4);

if (Arrays.equals(first4, magicWord)) {
formatVersion = bb.getInt();
} else {
// pre magic word journal, reset to 0;
formatVersion = V1;
}
} else {
// no header, must be old version
formatVersion = V1;
}

if (formatVersion < MIN_COMPAT_JOURNAL_FORMAT_VERSION
|| formatVersion > CURRENT_JOURNAL_FORMAT_VERSION) {
String err = String.format("Invalid journal version, unable to read."
+ " Expected between (%d) and (%d), got (%d)",
MIN_COMPAT_JOURNAL_FORMAT_VERSION, CURRENT_JOURNAL_FORMAT_VERSION,
formatVersion);
LOG.error(err);
throw new IOException(err);
}

try {
if (position == START_OF_FILE) {
if (formatVersion >= V5) {
fc.position(HEADER_SIZE);
} else if (formatVersion >= V2) {
fc.position(VERSION_HEADER_SIZE);
} else {
fc.position(0);
}
} else {
fc.position(position);
}
} catch (IOException e) {
LOG.error("Bookie journal file can seek to position :", e);
throw e;
}
}

int getFormatVersion() {
return formatVersion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,7 @@ boolean isFlushRequired() {
@Override
public void checkpoint(Checkpoint checkpoint) throws IOException {
Checkpoint thisCheckpoint = checkpointSource.newCheckpoint();

if (lastCheckpoint.compareTo(checkpoint) > 0) {
return;
}
Expand Down Expand Up @@ -624,7 +625,6 @@ public void checkpoint(Checkpoint checkpoint) throws IOException {
log.debug("DB batch flushed time : {} s",
MathUtils.elapsedNanos(batchFlushStarTime) / (double) TimeUnit.SECONDS.toNanos(1));
}

ledgerIndex.flush();

cleanupExecutor.execute(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
protected static final String JOURNAL_MAX_MEMORY_SIZE_MB = "journalMaxMemorySizeMb";
protected static final String JOURNAL_PAGECACHE_FLUSH_INTERVAL_MSEC = "journalPageCacheFlushIntervalMSec";
protected static final String JOURNAL_CHANNEL_PROVIDER = "journalChannelProvider";
protected static final String JOURNAL_MAX_POOL_SIZE = "journalMaxPoolSize";
protected static final String JOURNAL_REUSE_FILES = "journalReuseFiles";
// backpressure control
protected static final String MAX_ADDS_IN_PROGRESS_LIMIT = "maxAddsInProgressLimit";
protected static final String MAX_READS_IN_PROGRESS_LIMIT = "maxReadsInProgressLimit";
Expand Down Expand Up @@ -718,19 +718,22 @@ public ServerConfiguration setMaxJournalSizeMB(long maxJournalSize) {
}

/**
* Journal max pool size.
*
* @return journal max pool size
* Get reuse journal files.
* @return
*/
public int getJournalMaxPoolSize() {
return this.getInt(JOURNAL_MAX_POOL_SIZE, 50);
public boolean getJournalReuseFiles() {
return this.getBoolean(JOURNAL_REUSE_FILES, false);
}

public ServerConfiguration setJournalMaxPoolSize(int journalMaxPoolSize) {
this.setProperty(JOURNAL_MAX_POOL_SIZE, Integer.toString(journalMaxPoolSize));
/**
* Set reuse journal files.
* @param journalReuseFiles
* @return
*/
public ServerConfiguration setJournalReuseFiles(boolean journalReuseFiles) {
setProperty(JOURNAL_REUSE_FILES, journalReuseFiles);
return this;
}

/**
* How much space should we pre-allocate at a time in the journal.
*
Expand Down
4 changes: 4 additions & 0 deletions conf/bk_server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ journalDirectories=/tmp/bk-txn
# Set the Channel Provider for journal.
# The default value is
# journalChannelProvider=org.apache.bookkeeper.bookie.DefaultFileChannelProvider

# Whether reuse journal file. For fileSystem, it is unnecessarily, but for pmem, it will improve the performance a lot.
# Default is false
# journalReuseFiles = false
#############################################################################
## Ledger storage settings
#############################################################################
Expand Down

0 comments on commit 99e2b7b

Please sign in to comment.