diff --git a/service/src/main/java/crawlercommons/urlfrontier/service/AbstractFrontierService.java b/service/src/main/java/crawlercommons/urlfrontier/service/AbstractFrontierService.java index fbb3200..93fce02 100644 --- a/service/src/main/java/crawlercommons/urlfrontier/service/AbstractFrontierService.java +++ b/service/src/main/java/crawlercommons/urlfrontier/service/AbstractFrontierService.java @@ -11,6 +11,7 @@ import crawlercommons.urlfrontier.Urlfrontier.AckMessage.Status; import crawlercommons.urlfrontier.Urlfrontier.BlockQueueParams; import crawlercommons.urlfrontier.Urlfrontier.Boolean; +import crawlercommons.urlfrontier.Urlfrontier.CrawlLimitParams; import crawlercommons.urlfrontier.Urlfrontier.Empty; import crawlercommons.urlfrontier.Urlfrontier.GetParams; import crawlercommons.urlfrontier.Urlfrontier.KnownURLItem; @@ -870,6 +871,26 @@ public void close() throws IOException { } } + public void setCrawlLimit(CrawlLimitParams params, StreamObserver responseObserver) { + QueueWithinCrawl searchKey = new QueueWithinCrawl(params.getKey(), params.getCrawlID()); + synchronized (getQueues()) { + QueueInterface qi = getQueues().get(searchKey); + if (qi != null) { + qi.setCrawlLimit(params.getLimit()); + } else { + LOG.error( + "Queue with key: {} and CrawlId: {} was not found.", + searchKey.getQueue(), + searchKey.getCrawlid()); + responseObserver.onError( + new RuntimeException("CrawlId and Queue combination is not found.")); + return; + } + } + + responseObserver.onCompleted(); + } + public abstract void getURLStatus( crawlercommons.urlfrontier.Urlfrontier.URLStatusRequest request, io.grpc.stub.StreamObserver responseObserver);