Skip to content

Commit

Permalink
Merge pull request #1498 from mdebbar/improve_query_batcher
Browse files Browse the repository at this point in the history
More efficient QueryBatcher
  • Loading branch information
helfer authored Mar 27, 2017
2 parents c5c924c + 89454de commit b8b3d12
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 41 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Martijn Walraven <[email protected]>
Matt Jeanes <[email protected]>
Maxime Quandalle <[email protected]>
Michiel ter Reehorst <[email protected]>
Mouad Debbar <[email protected]>
Oleksandr Stubailo <[email protected]>
Olivier Ricordeau <[email protected]>
Pavol Fulop <[email protected]>
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Expect active development and potentially significant breaking changes in the `0.x` track. We'll try to be diligent about releasing a `1.0` version in a timely fashion (ideally within 3 to 6 months), to signal the start of a more stable API.

### vNEXT
- Make `QueryBatcher` more efficient and avoid `setInterval` leakage [PR #1498](https://github.com/apollographql/apollo-client/pull/1498).
- Remove dependency on `graphql-tag/printer` per [graphql-tag#54](https://github.com/apollographql/graphql-tag/issues/54)

### 1.0.0-rc.7
Expand Down
11 changes: 4 additions & 7 deletions src/transport/batchedNetworkInterface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,19 @@ export class HTTPBatchedNetworkInterface extends BaseNetworkInterface {

public _middlewares: BatchMiddlewareInterface[];
public _afterwares: BatchAfterwareInterface[];
private pollInterval: number;
private batcher: QueryBatcher;

constructor(uri: string, pollInterval: number, fetchOpts: RequestInit) {
constructor(uri: string, batchInterval: number, fetchOpts: RequestInit) {
super(uri, fetchOpts);

if (typeof pollInterval !== 'number') {
throw new Error(`pollInterval must be a number, got ${pollInterval}`);
if (typeof batchInterval !== 'number') {
throw new Error(`batchInterval must be a number, got ${batchInterval}`);
}

this.pollInterval = pollInterval;
this.batcher = new QueryBatcher({
batchInterval: batchInterval,
batchFetchFunction: this.batchQuery.bind(this),
});
this.batcher.start(this.pollInterval);
// XXX possible leak: when do we stop polling the queue?
};

public query(request: Request): Promise<ExecutionResult> {
Expand Down
41 changes: 16 additions & 25 deletions src/transport/batching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,27 @@ export interface QueryFetchRequest {
reject?: (error: Error) => void;
};

// QueryBatcher operates on a queue of QueryFetchRequests. It polls and checks this queue
// for new fetch requests. If there are multiple requests in the queue at a time, it will batch
// them together into one query.
// QueryBatcher doesn't fire requests immediately. Requests that were enqueued within
// a certain amount of time (configurable through `batchInterval`) will be batched together
// into one query.
export class QueryBatcher {
// Queue on which the QueryBatcher will operate on a per-tick basis.
public queuedRequests: QueryFetchRequest[] = [];

private pollInterval: Number;
private pollTimer: any;
private batchInterval: Number;

//This function is called to the queries in the queue to the server.
private batchFetchFunction: (request: Request[]) => Promise<ExecutionResult[]>;

constructor({
batchInterval,
batchFetchFunction,
}: {
batchInterval: number,
batchFetchFunction: (request: Request[]) => Promise<ExecutionResult[]>,
}) {
this.queuedRequests = [];
this.batchInterval = batchInterval;
this.batchFetchFunction = batchFetchFunction;
}

Expand All @@ -49,16 +51,17 @@ export class QueryBatcher {
fetchRequest.reject = reject;
});

// The first enqueued request triggers the queue consumption after `batchInterval` milliseconds.
if (this.queuedRequests.length === 1) {
this.scheduleQueueConsumption();
}

return fetchRequest.promise;
}

// Consumes the queue. Called on a polling interval.
// Consumes the queue.
// Returns a list of promises (one for each query).
public consumeQueue(): (Promise<ExecutionResult> | undefined)[] | undefined {
if (this.queuedRequests.length < 1) {
return undefined;
}

const requests: Request[] = this.queuedRequests.map(
(queuedRequest) => queuedRequest.request,
);
Expand Down Expand Up @@ -87,21 +90,9 @@ export class QueryBatcher {
return promises;
}

// TODO instead of start and stop, just set a timeout when a request comes in,
// and batch up everything in that interval. If no requests come in, don't batch.
public start(pollInterval: Number) {
if (this.pollTimer) {
clearInterval(this.pollTimer);
}
this.pollInterval = pollInterval;
this.pollTimer = setInterval(() => {
private scheduleQueueConsumption(): void {
setTimeout(() => {
this.consumeQueue();
}, this.pollInterval);
}

public stop() {
if (this.pollTimer) {
clearInterval(this.pollTimer);
}
}, this.batchInterval);
}
}
55 changes: 46 additions & 9 deletions test/batching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ describe('QueryBatcher', () => {
it('should construct', () => {
assert.doesNotThrow(() => {
const querySched = new QueryBatcher({
batchInterval: 10,
batchFetchFunction: networkInterface.batchQuery.bind(networkInterface),
});
querySched.consumeQueue();
Expand All @@ -23,6 +24,7 @@ describe('QueryBatcher', () => {

it('should not do anything when faced with an empty queue', () => {
const batcher = new QueryBatcher({
batchInterval: 10,
batchFetchFunction: networkInterface.batchQuery.bind(networkInterface),
});

Expand All @@ -33,6 +35,7 @@ describe('QueryBatcher', () => {

it('should be able to add to the queue', () => {
const batcher = new QueryBatcher({
batchInterval: 10,
batchFetchFunction: networkInterface.batchQuery.bind(networkInterface),
});

Expand Down Expand Up @@ -80,6 +83,7 @@ describe('QueryBatcher', () => {
},
);
const batcher = new QueryBatcher({
batchInterval: 10,
batchFetchFunction: myNetworkInterface.batchQuery.bind(myNetworkInterface),
});
const request: Request = {
Expand All @@ -88,6 +92,7 @@ describe('QueryBatcher', () => {

it('should be able to consume from a queue containing a single query', (done) => {
const myBatcher = new QueryBatcher({
batchInterval: 10,
batchFetchFunction: myNetworkInterface.batchQuery.bind(myNetworkInterface),
});

Expand Down Expand Up @@ -117,6 +122,7 @@ describe('QueryBatcher', () => {
);

const myBatcher = new QueryBatcher({
batchInterval: 10,
batchFetchFunction: NI.batchQuery.bind(NI),
});
myBatcher.enqueueRequest(request);
Expand All @@ -141,6 +147,7 @@ describe('QueryBatcher', () => {
},
);
const myBatcher = new QueryBatcher({
batchInterval: 10,
batchFetchFunction: NI.batchQuery.bind(NI),
});
const promise = myBatcher.enqueueRequest(request);
Expand All @@ -152,8 +159,9 @@ describe('QueryBatcher', () => {
});
});

it('should be able to stop polling', () => {
it('should work when single query', (done) => {
const batcher = new QueryBatcher({
batchInterval: 10,
batchFetchFunction: networkInterface.batchQuery.bind(networkInterface),
});
const query = gql`
Expand All @@ -163,18 +171,46 @@ describe('QueryBatcher', () => {
lastName
}
}`;
const request: Request = {
query,
};
const request: Request = { query };

batcher.enqueueRequest(request);
batcher.enqueueRequest(request);
assert.equal(batcher.queuedRequests.length, 1);

setTimeout(() => {
assert.equal(batcher.queuedRequests.length, 0);
done();
}, 20);
});

it('should correctly batch multiple queries', (done) => {
const batcher = new QueryBatcher({
batchInterval: 10,
batchFetchFunction: networkInterface.batchQuery.bind(networkInterface),
});
const query = gql`
query {
author {
firstName
lastName
}
}`;
const request: Request = { query };

//poll with a big interval so that the queue
//won't actually be consumed by the time we stop.
batcher.start(1000);
batcher.stop();
batcher.enqueueRequest(request);
batcher.enqueueRequest(request);
assert.equal(batcher.queuedRequests.length, 2);

setTimeout(() => {
// The batch shouldn't be fired yet, so we can add one more request.
batcher.enqueueRequest(request);
assert.equal(batcher.queuedRequests.length, 3);
}, 5);

setTimeout(() => {
// The batch should've been fired by now.
assert.equal(batcher.queuedRequests.length, 0);
done();
}, 20);
});

it('should reject the promise if there is a network error', (done) => {
Expand All @@ -196,6 +232,7 @@ describe('QueryBatcher', () => {
},
);
const batcher = new QueryBatcher({
batchInterval: 10,
batchFetchFunction: myNetworkInterface.batchQuery.bind(myNetworkInterface),
});
const promise = batcher.enqueueRequest(request);
Expand Down

0 comments on commit b8b3d12

Please sign in to comment.