Skip to content

Commit

Permalink
Merge pull request #92 from klockla/geturlstatus
Browse files Browse the repository at this point in the history
Add method to get URL Status (returns an URLItem)
  • Loading branch information
jnioche authored Sep 3, 2024
2 parents 3ec6f44 + 285dcf8 commit 08f09c3
Show file tree
Hide file tree
Showing 15 changed files with 20,914 additions and 20,790 deletions.
3,232 changes: 1,384 additions & 1,848 deletions API/src/main/java/crawlercommons/urlfrontier/URLFrontierGrpc.java

Large diffs are not rendered by default.

37,535 changes: 18,596 additions & 18,939 deletions API/src/main/java/crawlercommons/urlfrontier/Urlfrontier.java

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions API/urlfrontier.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ service URLFrontier {

/** Sets crawl limit for domain **/
rpc SetCrawlLimit(CrawlLimitParams) returns (Empty) {}

/** Get status of a particular URL
This does not take into account URL scheduling.
Used to check current status of an URL within the frontier
**/
rpc GetURLStatus(URLStatusRequest) returns (URLItem) {}
}

/**
Expand Down Expand Up @@ -290,3 +296,13 @@ message CrawlLimitParams {
// crawl ID
string crawlID = 3;
}

message URLStatusRequest {
/** URL for which we request info */
string url = 1;
/** ID for the queue **/
string key = 2;
// crawl ID - empty string for default
string crawlID = 3;
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
DeleteQueue.class,
DeleteCrawl.class,
SetLogLevel.class,
SetCrawlLimit.class
SetCrawlLimit.class,
GetURLStatus.class,
},
description = "Interacts with a URL Frontier from the command line")
public class Client {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// SPDX-FileCopyrightText: 2020 Crawler-commons
// SPDX-License-Identifier: Apache-2.0

package crawlercommons.urlfrontier.client;

import crawlercommons.urlfrontier.CrawlID;
import crawlercommons.urlfrontier.URLFrontierGrpc;
import crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierBlockingStub;
import crawlercommons.urlfrontier.Urlfrontier.URLItem;
import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest;
import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest.Builder;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.ParentCommand;

@Command(name = "GetURLStatus", description = "Get the status of an URL", sortOptions = false)
public class GetURLStatus implements Runnable {

@ParentCommand private Client parent;

@Option(
names = {"-c", "--crawlID"},
required = false,
defaultValue = CrawlID.DEFAULT,
paramLabel = "STRING",
description = "crawl of the url to be checked")
private String crawl;

@Option(
names = {"-k", "--key"},
required = false,
paramLabel = "STRING",
description = "key to use to target a specific queue")
private String key;

@Option(
names = {"-u", "--url"},
required = true,
paramLabel = "STRING",
description = "url to check for")
private String url;

@Option(
names = {"-p", "--parsedate"},
description = {
"Print the refetch date in local time zone",
"By default, time is in UTC seconds since the Unix epoch"
})
private boolean parse = false;

// Use the system default time zone
private ZoneId zoneId = ZoneId.systemDefault();

@Override
public void run() {
ManagedChannel channel =
ManagedChannelBuilder.forAddress(parent.hostname, parent.port)
.usePlaintext()
.build();
URLFrontierBlockingStub blockingFrontier = URLFrontierGrpc.newBlockingStub(channel);

Builder builder = URLStatusRequest.newBuilder().setUrl(url).setCrawlID(crawl);

String s1 = String.format("Checking status of URL %s (crawlId = %s)", url, crawl);
if (key != null) {
s1 += String.format(" (key = %s)", key);
builder.setKey(key);
}
System.out.println(s1);

URLStatusRequest request = builder.build();

try {
URLItem item = blockingFrontier.getURLStatus(request);
String fetchDate;

if (parse) {
Instant instant = Instant.ofEpochSecond(item.getKnown().getRefetchableFromDate());
LocalDateTime localDate = instant.atZone(zoneId).toLocalDateTime();
fetchDate = localDate.toString();
} else {
fetchDate = String.valueOf(item.getKnown().getRefetchableFromDate());
}
System.out.println(item.getKnown().getInfo().getUrl() + ";" + fetchDate);

} catch (StatusRuntimeException sre) {
if (sre.getStatus().getCode() == io.grpc.Status.Code.NOT_FOUND) {
System.out.println("URL is not in frontier: " + url);
} else {
// Handle other errors
System.err.println(sre.getMessage());
}
} catch (Exception t) {
// Handle other errors
System.err.println(t.getMessage());
}

channel.shutdownNow();
}
}
25 changes: 23 additions & 2 deletions service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.12</version>
<version>1.4.14</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -109,9 +109,30 @@
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
<version>2.16.1</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.3</version> <!-- can be omitted when using the BOM -->
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.12.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>5.12.0</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -862,4 +862,8 @@ public void close() throws IOException {
readExecutorService.shutdownNow();
}
}

public abstract void getURLStatus(
crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest request,
io.grpc.stub.StreamObserver<URLItem> responseObserver);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import crawlercommons.urlfrontier.Urlfrontier.KnownURLItem;
import crawlercommons.urlfrontier.Urlfrontier.URLInfo;
import crawlercommons.urlfrontier.Urlfrontier.URLItem;
import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest;
import crawlercommons.urlfrontier.service.QueueInterface;
import crawlercommons.urlfrontier.service.QueueWithinCrawl;
import crawlercommons.urlfrontier.service.SynchronizedStreamObserver;
Expand Down Expand Up @@ -841,4 +842,10 @@ protected Status putURLItem(URLItem value) {

return AckMessage.Status.OK;
}

@Override
// TODO Implementation of getURLStatus for Ignite
public void getURLStatus(URLStatusRequest request, StreamObserver<URLItem> responseObserver) {
responseObserver.onError(io.grpc.Status.UNIMPLEMENTED.asException());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
import com.google.protobuf.InvalidProtocolBufferException;
import crawlercommons.urlfrontier.Urlfrontier.AckMessage;
import crawlercommons.urlfrontier.Urlfrontier.AckMessage.Status;
import crawlercommons.urlfrontier.Urlfrontier.KnownURLItem;
import crawlercommons.urlfrontier.Urlfrontier.URLInfo;
import crawlercommons.urlfrontier.Urlfrontier.URLItem;
import crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest;
import crawlercommons.urlfrontier.service.AbstractFrontierService;
import crawlercommons.urlfrontier.service.QueueInterface;
import crawlercommons.urlfrontier.service.QueueWithinCrawl;
import crawlercommons.urlfrontier.service.SynchronizedStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -145,4 +148,80 @@ protected AckMessage.Status putURLItem(URLItem value) {
}
return Status.OK;
}

@Override
public void getURLStatus(URLStatusRequest request, StreamObserver<URLItem> responseObserver) {

String crawlId = request.getCrawlID();
String url = request.getUrl();
String key = request.getKey();
boolean found = false;

// has a queue key been defined? if not use the hostname
if (key == null || key.equals("")) {
LOG.debug("key missing for {}", url);
key = provideMissingKey(url);
if (key == null) {
LOG.error("Malformed URL {}", url);
responseObserver.onError(io.grpc.Status.INVALID_ARGUMENT.asRuntimeException());
return;
}
}

LOG.info("getURLStatus crawlId={} key={} url={}", crawlId, key, url);

QueueWithinCrawl qwc = QueueWithinCrawl.get(key, crawlId);
URLQueue queue = (URLQueue) getQueues().get(qwc);
if (queue == null) {
LOG.error("Could not find queue for Crawl={}, queue={}", crawlId, key);
responseObserver.onError(io.grpc.Status.NOT_FOUND.asRuntimeException());
return;
}

URLInfo.Builder infoBuilder = URLInfo.newBuilder();
URLInfo info = infoBuilder.setCrawlID(crawlId).setKey(key).setUrl(url).build();

URLItem.Builder builder = URLItem.newBuilder();

KnownURLItem.Builder knownBuilder = KnownURLItem.newBuilder();

if (queue.isCompleted(url)) {
knownBuilder.setInfo(info);
knownBuilder.setRefetchableFromDate(0);
builder.setKnown(knownBuilder.build());

found = true;
responseObserver.onNext(builder.build());
} else {
Iterator<InternalURL> iter = queue.iterator();

while (iter.hasNext()) {
InternalURL item = iter.next();

if (url.equals(item.url)) {

try {
knownBuilder.setInfo(item.toURLInfo(qwc));
knownBuilder.setRefetchableFromDate(item.nextFetchDate);
} catch (InvalidProtocolBufferException e) {
LOG.error(e.getMessage(), e);
responseObserver.onError(
io.grpc.Status.fromThrowable(e).asRuntimeException());
return;
}

builder.setKnown(knownBuilder.build());
found = true;
responseObserver.onNext(builder.build());
break;
}
}
}

if (found) {
responseObserver.onCompleted();
} else {
responseObserver.onError(io.grpc.Status.NOT_FOUND.asRuntimeException());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public int getDelay() {
public int countActive() {
return this.size();
}
public boolean isCompleted(String url) {
return completed.contains(url);
}

@Override
public void setCrawlLimit(int crawlLimit) {
Expand Down
Loading

0 comments on commit 08f09c3

Please sign in to comment.