Skip to content

Commit

Permalink
[ISSUE apache#8653] Fix index service upload last file when broker sh…
Browse files Browse the repository at this point in the history
…utdown and fetcher check in tiered storage
  • Loading branch information
lizhimins committed Sep 7, 2024
1 parent ce1988f commit 10e26ca
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,17 +271,15 @@ public CompletableFuture<List<IndexItem>> queryAsync(
public void forceUpload() {
try {
readWriteLock.writeLock().lock();
if (this.currentWriteFile == null) {
log.warn("IndexStoreService no need force upload current write file");
return;
}
// note: current file has been shutdown before
IndexStoreFile lastFile = new IndexStoreFile(storeConfig, currentWriteFile.getTimestamp());
if (this.doCompactThenUploadFile(lastFile)) {
this.setCompactTimestamp(lastFile.getTimestamp());
} else {
throw new TieredStoreException(
TieredStoreErrorCode.UNKNOWN, "IndexStoreService force compact current file error");
while (true) {
Map.Entry<Long, IndexFile> entry =
this.timeStoreTable.higherEntry(this.compactTimestamp.get());
if (entry == null) {
break;
}
if (this.doCompactThenUploadFile(entry.getValue())) {
this.setCompactTimestamp(entry.getValue().getTimestamp());
}
}
} catch (Exception e) {
log.error("IndexStoreService force upload error", e);
Expand Down Expand Up @@ -393,19 +391,12 @@ protected IndexFile getNextSealedFile() {
@Override
public void shutdown() {
super.shutdown();
readWriteLock.writeLock().lock();
try {
for (Map.Entry<Long /* timestamp */, IndexFile> entry : timeStoreTable.entrySet()) {
entry.getValue().shutdown();
while (!this.timeStoreTable.isEmpty()) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (autoCreateNewFile) {
this.forceUpload();
}
this.timeStoreTable.clear();
} catch (Exception e) {
log.error("IndexStoreService shutdown error", e);
} finally {
readWriteLock.writeLock().unlock();
}
}

Expand All @@ -424,6 +415,18 @@ public void run() {
}
this.waitForRunning(TimeUnit.SECONDS.toMillis(10));
}
readWriteLock.writeLock().lock();
try {
if (autoCreateNewFile) {
this.forceUpload();
}
this.timeStoreTable.forEach((timestamp, file) -> file.shutdown());
this.timeStoreTable.clear();
} catch (Exception e) {
log.error("IndexStoreService shutdown error", e);
} finally {
readWriteLock.writeLock().unlock();
}
log.info(this.getServiceName() + " service shutdown");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void restartServiceTest() throws InterruptedException {
indexService = new IndexStoreService(fileAllocator, filePath);
indexService.start();
Assert.assertEquals(timestamp, indexService.getTimeStoreTable().firstKey().longValue());
Assert.assertEquals(3, indexService.getTimeStoreTable().size());
Assert.assertEquals(4, indexService.getTimeStoreTable().size());
Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD,
indexService.getTimeStoreTable().firstEntry().getValue().getFileStatus());
}
Expand Down

0 comments on commit 10e26ca

Please sign in to comment.