Skip to content

Commit 4b8a414

Browse files
authored
Add total timeout + limit redis queue retries (#248)
* time limits: readd total timeount to runTask() in worker, just in case refactor working runTask() to either return true/false if task was timed out if timed out, recreate the page redis: add limit to retried URLs, currently set to 1 * retry: remove URL if not retrying, log removal of URL from queue
1 parent aadd9a0 commit 4b8a414

File tree

4 files changed

+90
-67
lines changed

4 files changed

+90
-67
lines changed

crawler.js

+16-29
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { parseArgs } from "./util/argParser.js";
2222
import { initRedis } from "./util/redis.js";
2323
import { Logger, errJSON, setExternalLogStream, setDebugLogging } from "./util/logger.js";
2424
import { WorkerPool } from "./util/worker.js";
25+
import { sleep, timedRun } from "./util/timing.js";
2526

2627
import { getBrowserExe, loadProfile, chromeArgs, getDefaultUA, evaluateWithCLI } from "./util/browser.js";
2728

@@ -160,7 +161,7 @@ export class Crawler {
160161
} catch (e) {
161162
//this.logger.fatal("Unable to connect to state store Redis: " + redisUrl);
162163
this.logger.warn(`Waiting for redis at ${redisUrl}`, {}, "state");
163-
await this.sleep(3);
164+
await sleep(3);
164165
}
165166
}
166167

@@ -381,7 +382,7 @@ export class Crawler {
381382

382383
if (!this.isInScope(data, logDetails)) {
383384
this.logger.info("Page no longer in scope", data);
384-
return;
385+
return true;
385386
}
386387

387388
try {
@@ -443,7 +444,7 @@ export class Crawler {
443444
} else {
444445
const behaviorTimeout = this.params.behaviorTimeout / 1000;
445446

446-
const res = await this.timedRun(
447+
const res = await timedRun(
447448
this.runBehaviors(page, logDetails),
448449
behaviorTimeout,
449450
"Behaviors timed out",
@@ -466,7 +467,10 @@ export class Crawler {
466467
} catch (e) {
467468
this.logger.error("Page Errored", {...errJSON(e), ...logDetails}, "pageStatus");
468469
await this.markPageFailed(page);
470+
return false;
469471
}
472+
473+
return true;
470474
}
471475

472476
async runBehaviors(page, logDetails) {
@@ -605,7 +609,7 @@ export class Crawler {
605609
while (initState === "debug") {
606610
this.logger.info("Paused for debugging, will continue after manual resume");
607611

608-
await this.sleep(60);
612+
await sleep(60);
609613

610614
initState = await this.crawlState.getStatus();
611615
}
@@ -668,6 +672,7 @@ export class Crawler {
668672
crawlState: this.crawlState,
669673
screencaster: this.screencaster,
670674
healthChecker: this.healthChecker,
675+
totalTimeout: (this.params.behaviorTimeout + this.params.timeout) / 1000 + 60,
671676
task: (opts) => this.crawlPage(opts)
672677
});
673678

@@ -860,7 +865,7 @@ export class Crawler {
860865

861866
let isHTMLPage = true;
862867

863-
const isHTMLResult = await this.timedRun(
868+
const isHTMLResult = await timedRun(
864869
this.isHTML(url),
865870
FETCH_TIMEOUT_SECS,
866871
"HEAD request to determine if URL is HTML page timed out",
@@ -869,7 +874,7 @@ export class Crawler {
869874
if (isHTMLResult && (isHTMLResult.value == false)) {
870875
isHTMLPage = false;
871876
try {
872-
const captureResult = await this.timedRun(
877+
const captureResult = await timedRun(
873878
this.directFetchCapture(url),
874879
FETCH_TIMEOUT_SECS,
875880
"Direct fetch capture attempt timed out",
@@ -978,7 +983,7 @@ export class Crawler {
978983
}
979984
// in case page starts loading via fetch/xhr immediately after page load,
980985
// we want to ensure we don't exit too early
981-
await this.sleep(0.5);
986+
await sleep(0.5);
982987

983988
try {
984989
await page.waitForNetworkIdle({timeout: this.params.netIdleWait * 1000});
@@ -1005,7 +1010,7 @@ export class Crawler {
10051010
const frames = page.__filteredFrames;
10061011

10071012
const linkResults = await Promise.allSettled(
1008-
frames.map(frame => this.timedRun(
1013+
frames.map(frame => timedRun(
10091014
frame.evaluate(loadFunc, selector, extract),
10101015
PAGE_OP_TIMEOUT_SECS,
10111016
"Link extraction timed out",
@@ -1063,12 +1068,12 @@ export class Crawler {
10631068
try {
10641069
this.logger.debug("Check CF Blocking", logDetails);
10651070

1066-
while (await this.timedRun(
1071+
while (await timedRun(
10671072
page.$("div.cf-browser-verification.cf-im-under-attack"),
10681073
PAGE_OP_TIMEOUT_SECS
10691074
)) {
10701075
this.logger.debug("Cloudflare Check Detected, waiting for reload...", logDetails);
1071-
await this.sleep(5.5);
1076+
await sleep(5.5);
10721077
}
10731078
} catch (e) {
10741079
//this.logger.warn("Check CF failed, ignoring");
@@ -1213,28 +1218,10 @@ export class Crawler {
12131218
break;
12141219
}
12151220

1216-
await this.sleep(1);
1221+
await sleep(1);
12171222
}
12181223
}
12191224

1220-
timedRun(promise, seconds, message="Promise timed out", logDetails={}, context="general") {
1221-
// return Promise return value or log error if timeout is reached first
1222-
const timeout = seconds * 1000;
1223-
1224-
const rejectPromiseOnTimeout = (timeout) => {
1225-
return new Promise((resolve, reject) => {
1226-
setTimeout(() => (reject("timeout reached")), timeout);
1227-
});
1228-
};
1229-
1230-
return Promise.race([promise, rejectPromiseOnTimeout(timeout)])
1231-
.catch(() => this.logger.error(message, {"seconds": seconds, ...logDetails}, context));
1232-
}
1233-
1234-
sleep(seconds) {
1235-
return new Promise(resolve => setTimeout(resolve, seconds * 1000));
1236-
}
1237-
12381225
async parseSitemap(url, seedId) {
12391226
const sitemapper = new Sitemapper({
12401227
url,

util/state.js

+19-3
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@ export class RedisCrawlState extends BaseState
194194
super();
195195
this.redis = redis;
196196

197+
this.maxRetryPending = 1;
198+
197199
this._lastSize = 0;
198200

199201
this.uid = uid;
@@ -267,9 +269,16 @@ local res = redis.call('get', KEYS[3]);
267269
if not res then
268270
local json = redis.call('hget', KEYS[1], ARGV[1]);
269271
if json then
270-
redis.call('lpush', KEYS[2], json);
272+
local data = cjson.decode(json);
273+
data['retry'] = (data['retry'] or 0) + 1;
271274
redis.call('hdel', KEYS[1], ARGV[1]);
272-
return 1
275+
if tonumber(data['retry']) <= tonumber(ARGV[2]) then
276+
json = cjson.encode(data);
277+
redis.call('lpush', KEYS[2], json);
278+
return 1;
279+
else
280+
return 2;
281+
end
273282
end
274283
end
275284
return 0;
@@ -459,8 +468,15 @@ return 0;
459468
const pendingUrls = await this.redis.hkeys(this.pkey);
460469

461470
for (const url of pendingUrls) {
462-
if (await this.redis.requeue(this.pkey, this.qkey, this.pkey + ":" + url, url)) {
471+
const res = await this.redis.requeue(this.pkey, this.qkey, this.pkey + ":" + url, url, this.maxRetryPending);
472+
switch (res) {
473+
case 1:
463474
logger.info(`Requeued: ${url}`);
475+
break;
476+
477+
case 2:
478+
logger.info(`Not requeuing anymore: ${url}`);
479+
break;
464480
}
465481
}
466482
}

util/timing.js

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import { Logger } from "./logger.js";
2+
3+
const logger = new Logger();
4+
5+
export function sleep(seconds) {
6+
return new Promise(resolve => setTimeout(resolve, seconds * 1000));
7+
}
8+
9+
export function timedRun(promise, seconds, message="Promise timed out", logDetails={}, context="general") {
10+
// return Promise return value or log error if timeout is reached first
11+
const timeout = seconds * 1000;
12+
13+
const rejectPromiseOnTimeout = (timeout) => {
14+
return new Promise((resolve, reject) => {
15+
setTimeout(() => (reject("timeout reached")), timeout);
16+
});
17+
};
18+
19+
return Promise.race([promise, rejectPromiseOnTimeout(timeout)])
20+
.catch(() => logger.error(message, {"seconds": seconds, ...logDetails}, context));
21+
}
22+
23+

util/worker.js

+32-35
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import PQueue from "p-queue";
33
import puppeteer from "puppeteer-core";
44

55
import { Logger, errJSON } from "./logger.js";
6+
import { sleep, timedRun } from "./timing.js";
67

78
const logger = new Logger();
89

@@ -15,11 +16,10 @@ const MAX_REUSE = 5;
1516
// ===========================================================================
1617
export class Worker
1718
{
18-
constructor(id, browser, task, puppeteerOptions, screencaster, healthChecker) {
19+
constructor(id, browser, task, screencaster, healthChecker) {
1920
this.id = id;
2021
this.browser = browser;
2122
this.task = task;
22-
this.puppeteerOptions = puppeteerOptions;
2323
this.screencaster = screencaster;
2424
this.healthChecker = healthChecker;
2525

@@ -29,19 +29,24 @@ export class Worker
2929
this.startPage = "about:blank?_browsertrix" + Math.random().toString(36).slice(2);
3030
}
3131

32-
async initPage() {
33-
if (this.page && ++this.reuseCount <= MAX_REUSE) {
34-
logger.debug("Reusing page", {reuseCount: this.reuseCount}, "worker");
35-
return this.page;
36-
} else if (this.page) {
32+
async closePage() {
33+
if (this.page) {
3734
try {
3835
await this.page.close();
3936
} catch (e) {
4037
// ignore
4138
}
4239
this.page = null;
4340
}
41+
}
4442

43+
async initPage() {
44+
if (this.page && ++this.reuseCount <= MAX_REUSE) {
45+
logger.debug("Reusing page", {reuseCount: this.reuseCount}, "worker");
46+
return this.page;
47+
} else {
48+
await this.closePage();
49+
}
4550
//open page in a new tab
4651
this.pendingTargets = new Map();
4752

@@ -71,7 +76,7 @@ export class Worker
7176
break;
7277
} catch (err) {
7378
logger.warn("Error getting new page in window context", {"workerid": this.id, ...errJSON(err)}, "worker");
74-
await sleep(500);
79+
await sleep(0.5);
7580
logger.warn("Retry getting new page");
7681

7782
if (this.healthChecker) {
@@ -94,25 +99,10 @@ export class Worker
9499

95100
logger.info("Starting page", {"workerid": this.id, "page": url}, "worker");
96101

97-
let result;
98-
let errorState;
99-
100-
await this.task({
102+
return await this.task({
101103
page: this.page,
102104
data: urlData,
103105
});
104-
105-
if (errorState) {
106-
return {
107-
type: "error",
108-
error: errorState,
109-
};
110-
}
111-
112-
return {
113-
data: result,
114-
type: "success",
115-
};
116106
}
117107
}
118108

@@ -126,6 +116,7 @@ export class WorkerPool
126116
this.crawlState = options.crawlState;
127117
this.screencaster = options.screencaster;
128118
this.healthChecker = options.healthChecker;
119+
this.totalTimeout = options.totalTimeout || 1e4;
129120

130121
this.task = options.task;
131122

@@ -156,7 +147,6 @@ export class WorkerPool
156147
id,
157148
this.browser,
158149
this.task,
159-
this.puppeteerOptions,
160150
this.screencaster,
161151
this.healthChecker
162152
);
@@ -173,7 +163,7 @@ export class WorkerPool
173163

174164
// wait half a second and try again
175165
logger.info("Waiting for available worker", {}, "worker");
176-
await sleep(500);
166+
await sleep(0.5);
177167

178168
return await this.getAvailableWorker();
179169

@@ -196,22 +186,32 @@ export class WorkerPool
196186
return;
197187
}
198188

199-
const result = await worker.runTask(job);
189+
const result = await timedRun(
190+
worker.runTask(job),
191+
this.totalTimeout,
192+
"Page Worker Timeout",
193+
{"workerid": worker.id},
194+
"worker"
195+
);
196+
197+
if (!result) {
198+
logger.debug("Resetting failed page", {}, "worker");
199+
200+
await worker.closePage();
200201

201-
if (result.type === "error") {
202202
if (job.callbacks) {
203-
job.callbacks.reject(result.error);
203+
job.callbacks.reject("timed out");
204204
}
205205
if (this.healthChecker) {
206206
this.healthChecker.incError();
207207
}
208-
} else if (result.type === "success") {
208+
} else {
209209
if (this.healthChecker) {
210210
this.healthChecker.resetErrors();
211211
}
212212

213213
if (job.callbacks) {
214-
job.callbacks.resolve(result.data);
214+
job.callbacks.resolve(result);
215215
}
216216
}
217217

@@ -236,7 +236,7 @@ export class WorkerPool
236236
}
237237

238238
// wait half a second
239-
await sleep(500);
239+
await sleep(0.5);
240240
}
241241

242242
logger.debug("Awaiting queue onIdle()", {}, "worker");
@@ -259,7 +259,4 @@ export class WorkerPool
259259
}
260260
}
261261

262-
function sleep(millis) {
263-
return new Promise(resolve => setTimeout(resolve, millis));
264-
}
265262

0 commit comments

Comments
 (0)