Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added CloseableIterator interface #115

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ public void listURLs(
continue;
}

Iterator<URLItem> urliter = urlIterator(e);
CloseableIterator<URLItem> urliter = urlIterator(e);

while (urliter.hasNext()) {
totalCount++;
Expand All @@ -951,17 +951,24 @@ public void listURLs(
break;
}
}

try {
urliter.close();
} catch (IOException e1) {
LOG.warn("Error closing URLIterator", e1);
}
}
}

responseObserver.onCompleted();
}

protected Iterator<URLItem> urlIterator(Entry<QueueWithinCrawl, QueueInterface> qentry) {
protected CloseableIterator<URLItem> urlIterator(
Entry<QueueWithinCrawl, QueueInterface> qentry) {
return urlIterator(qentry, 0L, Long.MAX_VALUE);
}

protected abstract Iterator<URLItem> urlIterator(
protected abstract CloseableIterator<URLItem> urlIterator(
Entry<QueueWithinCrawl, QueueInterface> qentry, long start, long max);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// SPDX-FileCopyrightText: 2020 Crawler-commons
// SPDX-License-Identifier: Apache-2.0

package crawlercommons.urlfrontier.service;

import java.io.Closeable;
import java.util.Iterator;

/**
* Adds close to the Iterator Needed when we need to close resources used by the Iterator (e.g. The
* RocksDBIterator in case of RocksDb implementation).
*
* @param <T>
*/
public interface CloseableIterator<T> extends Closeable, Iterator<T> {}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import crawlercommons.urlfrontier.Urlfrontier.URLItem;
import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest;
import crawlercommons.urlfrontier.service.AbstractFrontierService;
import crawlercommons.urlfrontier.service.CloseableIterator;
import crawlercommons.urlfrontier.service.QueueInterface;
import crawlercommons.urlfrontier.service.QueueWithinCrawl;
import crawlercommons.urlfrontier.service.SynchronizedStreamObserver;
Expand Down Expand Up @@ -224,12 +225,12 @@ public void getURLStatus(URLStatusRequest request, StreamObserver<URLItem> respo
}
}

public Iterator<URLItem> urlIterator(
public CloseableIterator<URLItem> urlIterator(
Entry<QueueWithinCrawl, QueueInterface> qentry, long start, long maxURLs) {
return new MemoryURLItemIterator(qentry, start, maxURLs);
}

class MemoryURLItemIterator implements Iterator<URLItem> {
class MemoryURLItemIterator implements CloseableIterator<URLItem> {

private final org.slf4j.Logger LOG = LoggerFactory.getLogger(MemoryURLItemIterator.class);

Expand Down Expand Up @@ -298,5 +299,10 @@ public URLItem next() {
}
return null; // shouldn't happen
}

@Override
public void close() {
// No need to close anything here
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import crawlercommons.urlfrontier.Urlfrontier.URLItem;
import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest;
import crawlercommons.urlfrontier.service.AbstractFrontierService;
import crawlercommons.urlfrontier.service.CloseableIterator;
import crawlercommons.urlfrontier.service.QueueInterface;
import crawlercommons.urlfrontier.service.QueueWithinCrawl;
import crawlercommons.urlfrontier.service.SynchronizedStreamObserver;
Expand All @@ -26,7 +27,6 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -300,7 +300,7 @@ protected int sendURLsForQueue(
}

// too early for it?
long scheduled = Long.parseLong(currentKey.substring(pos2 + 1, pos3));
long scheduled = Long.parseLong(currentKey, pos2 + 1, pos3, 10);
if (scheduled > now) {
// they are sorted by date no need to go further
return alreadySent;
Expand Down Expand Up @@ -823,7 +823,7 @@ public void getURLStatus(URLStatusRequest request, StreamObserver<URLItem> respo
final int pos2 = currentKey.indexOf('_', pos + 1);
final int pos3 = currentKey.indexOf('_', pos2 + 1);

fromEpoch = Long.parseLong(currentKey.substring(pos2 + 1, pos3));
fromEpoch = Long.parseLong(currentKey, pos2 + 1, pos3, 10);

try {
info =
Expand Down Expand Up @@ -856,12 +856,12 @@ public void getURLStatus(URLStatusRequest request, StreamObserver<URLItem> respo
}
}

public Iterator<URLItem> urlIterator(
public CloseableIterator<URLItem> urlIterator(
Entry<QueueWithinCrawl, QueueInterface> qentry, long start, long maxURLs) {
return new RocksDBURLItemIterator(qentry, start, maxURLs);
}

class RocksDBURLItemIterator implements Iterator<URLItem> {
class RocksDBURLItemIterator implements CloseableIterator<URLItem> {

private final org.slf4j.Logger LOG = LoggerFactory.getLogger(RocksDBURLItemIterator.class);

Expand Down Expand Up @@ -960,7 +960,7 @@ public URLItem next() {
final int pos2 = schedulingKey.indexOf('_', pos1 + 1);
final int pos3 = schedulingKey.indexOf('_', pos2 + 1);

fromEpoch = Long.parseLong(schedulingKey.substring(pos2 + 1, pos3));
fromEpoch = Long.parseLong(schedulingKey, pos2 + 1, pos3, 10);
klockla marked this conversation as resolved.
Show resolved Hide resolved

try {
info = URLInfo.parseFrom(scheduled);
Expand Down Expand Up @@ -998,5 +998,10 @@ public URLItem next() {

return null; // Shouldn't happen
}

@Override
public void close() {
this.rocksIterator.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import crawlercommons.urlfrontier.Urlfrontier.URLInfo;
import crawlercommons.urlfrontier.Urlfrontier.URLItem;
import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest;
import crawlercommons.urlfrontier.service.CloseableIterator;
import crawlercommons.urlfrontier.service.QueueInterface;
import crawlercommons.urlfrontier.service.QueueWithinCrawl;
import crawlercommons.urlfrontier.service.SynchronizedStreamObserver;
import crawlercommons.urlfrontier.service.cluster.DistributedFrontierService;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -101,7 +101,7 @@ public void listURLs(ListUrlParams request, StreamObserver<URLItem> responseObse

@Override
// TODO Implementation of urlIterator for ShardedRocksDB
protected Iterator<URLItem> urlIterator(
protected CloseableIterator<URLItem> urlIterator(
Entry<QueueWithinCrawl, QueueInterface> qentry, long start, long max) {
throw new UnsupportedOperationException(
"Feature not implemented for ShardedRocksDB backend");
Expand Down