Skip to content

Commit

Permalink
Merge pull request #2896 from murgatroid99/grpc-js_reduce_channel_leak
Browse files Browse the repository at this point in the history
grpc-js: Allow garbage collection of IDLE channels
  • Loading branch information
murgatroid99 authored Feb 4, 2025
2 parents c6c69df + ca21e4a commit 6bd791d
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 39 deletions.
2 changes: 1 addition & 1 deletion packages/grpc-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.12.5",
"version": "1.12.6",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
Expand Down
96 changes: 58 additions & 38 deletions packages/grpc-js/src/internal-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,13 @@ class ChannelSubchannelWrapper
) => {
channel.throttleKeepalive(keepaliveTime);
};
childSubchannel.addConnectivityStateListener(this.subchannelStateListener);
}

ref(): void {
if (this.refCount === 0) {
this.child.addConnectivityStateListener(this.subchannelStateListener);
this.channel.addWrappedSubchannel(this);
}
this.child.ref();
this.refCount += 1;
}
Expand Down Expand Up @@ -159,6 +162,26 @@ class ShutdownPicker implements Picker {
}
}

class ChannelzInfoTracker {
readonly trace = new ChannelzTrace();
readonly callTracker = new ChannelzCallTracker();
readonly childrenTracker = new ChannelzChildrenTracker();
state: ConnectivityState = ConnectivityState.IDLE;
constructor(private target: string) {}

getChannelzInfoCallback(): () => ChannelInfo {
return () => {
return {
target: this.target,
state: this.state,
trace: this.trace,
callTracker: this.callTracker,
children: this.childrenTracker.getChildLists()
};
};
}
}

export class InternalChannel {
private readonly resolvingLoadBalancer: ResolvingLoadBalancer;
private readonly subchannelPool: SubchannelPool;
Expand All @@ -179,9 +202,10 @@ export class InternalChannel {
* event loop open while there are any pending calls for the channel that
* have not yet been assigned to specific subchannels. In other words,
* the invariant is that callRefTimer is reffed if and only if pickQueue
* is non-empty.
* is non-empty. In addition, the timer is null while the state is IDLE or
* SHUTDOWN and there are no pending calls.
*/
private readonly callRefTimer: NodeJS.Timeout;
private callRefTimer: NodeJS.Timeout | null = null;
private configSelector: ConfigSelector | null = null;
/**
* This is the error from the name resolver if it failed most recently. It
Expand All @@ -203,11 +227,8 @@ export class InternalChannel {

// Channelz info
private readonly channelzEnabled: boolean = true;
private readonly originalTarget: string;
private readonly channelzRef: ChannelRef;
private readonly channelzTrace: ChannelzTrace;
private readonly callTracker = new ChannelzCallTracker();
private readonly childrenTracker = new ChannelzChildrenTracker();
private readonly channelzInfoTracker: ChannelzInfoTracker;

/**
* Randomly generated ID to be passed to the config selector, for use by
Expand Down Expand Up @@ -236,7 +257,7 @@ export class InternalChannel {
throw new TypeError('Channel options must be an object');
}
}
this.originalTarget = target;
this.channelzInfoTracker = new ChannelzInfoTracker(target);
const originalTargetUri = parseUri(target);
if (originalTargetUri === null) {
throw new Error(`Could not parse target name "${target}"`);
Expand All @@ -250,21 +271,17 @@ export class InternalChannel {
);
}

this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME);
this.callRefTimer.unref?.();

if (this.options['grpc.enable_channelz'] === 0) {
this.channelzEnabled = false;
}

this.channelzTrace = new ChannelzTrace();
this.channelzRef = registerChannelzChannel(
target,
() => this.getChannelzInfo(),
this.channelzInfoTracker.getChannelzInfoCallback(),
this.channelzEnabled
);
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Channel created');
this.channelzInfoTracker.trace.addTrace('CT_INFO', 'Channel created');
}

if (this.options['grpc.default_authority']) {
Expand Down Expand Up @@ -305,7 +322,7 @@ export class InternalChannel {
);
subchannel.throttleKeepalive(this.keepaliveTime);
if (this.channelzEnabled) {
this.channelzTrace.addTrace(
this.channelzInfoTracker.trace.addTrace(
'CT_INFO',
'Created subchannel or used existing subchannel',
subchannel.getChannelzRef()
Expand All @@ -315,7 +332,6 @@ export class InternalChannel {
subchannel,
this
);
this.wrappedSubchannels.add(wrappedSubchannel);
return wrappedSubchannel;
},
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
Expand All @@ -338,12 +354,12 @@ export class InternalChannel {
},
addChannelzChild: (child: ChannelRef | SubchannelRef) => {
if (this.channelzEnabled) {
this.childrenTracker.refChild(child);
this.channelzInfoTracker.childrenTracker.refChild(child);
}
},
removeChannelzChild: (child: ChannelRef | SubchannelRef) => {
if (this.channelzEnabled) {
this.childrenTracker.unrefChild(child);
this.channelzInfoTracker.childrenTracker.unrefChild(child);
}
},
};
Expand All @@ -366,7 +382,7 @@ export class InternalChannel {
RETRY_THROTTLER_MAP.delete(this.getTarget());
}
if (this.channelzEnabled) {
this.channelzTrace.addTrace(
this.channelzInfoTracker.trace.addTrace(
'CT_INFO',
'Address resolution succeeded'
);
Expand All @@ -388,7 +404,7 @@ export class InternalChannel {
},
status => {
if (this.channelzEnabled) {
this.channelzTrace.addTrace(
this.channelzInfoTracker.trace.addTrace(
'CT_WARNING',
'Address resolution failed with code ' +
status.code +
Expand Down Expand Up @@ -440,16 +456,6 @@ export class InternalChannel {
this.lastActivityTimestamp = new Date();
}

private getChannelzInfo(): ChannelInfo {
return {
target: this.originalTarget,
state: this.connectivityState,
trace: this.channelzTrace,
callTracker: this.callTracker,
children: this.childrenTracker.getChildLists(),
};
}

private trace(text: string, verbosityOverride?: LogVerbosity) {
trace(
verbosityOverride ?? LogVerbosity.DEBUG,
Expand All @@ -459,6 +465,9 @@ export class InternalChannel {
}

private callRefTimerRef() {
if (!this.callRefTimer) {
this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME)
}
// If the hasRef function does not exist, always run the code
if (!this.callRefTimer.hasRef?.()) {
this.trace(
Expand All @@ -472,15 +481,15 @@ export class InternalChannel {
}

private callRefTimerUnref() {
// If the hasRef function does not exist, always run the code
if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) {
// If the timer or the hasRef function does not exist, always run the code
if (!this.callRefTimer?.hasRef || this.callRefTimer.hasRef()) {
this.trace(
'callRefTimer.unref | configSelectionQueue.length=' +
this.configSelectionQueue.length +
' pickQueue.length=' +
this.pickQueue.length
);
this.callRefTimer.unref?.();
this.callRefTimer?.unref?.();
}
}

Expand Down Expand Up @@ -509,12 +518,13 @@ export class InternalChannel {
ConnectivityState[newState]
);
if (this.channelzEnabled) {
this.channelzTrace.addTrace(
this.channelzInfoTracker.trace.addTrace(
'CT_INFO',
'Connectivity state change to ' + ConnectivityState[newState]
);
}
this.connectivityState = newState;
this.channelzInfoTracker.state = newState;
const watchersCopy = this.connectivityStateWatchers.slice();
for (const watcherObject of watchersCopy) {
if (newState !== watcherObject.currentState) {
Expand All @@ -539,6 +549,10 @@ export class InternalChannel {
}
}

addWrappedSubchannel(wrappedSubchannel: ChannelSubchannelWrapper) {
this.wrappedSubchannels.add(wrappedSubchannel);
}

removeWrappedSubchannel(wrappedSubchannel: ChannelSubchannelWrapper) {
this.wrappedSubchannels.delete(wrappedSubchannel);
}
Expand Down Expand Up @@ -591,6 +605,10 @@ export class InternalChannel {
clearTimeout(this.idleTimer);
this.idleTimer = null;
}
if (this.callRefTimer) {
clearInterval(this.callRefTimer);
this.callRefTimer = null;
}
}

private startIdleTimeout(timeoutMs: number) {
Expand Down Expand Up @@ -634,17 +652,17 @@ export class InternalChannel {

private onCallStart() {
if (this.channelzEnabled) {
this.callTracker.addCallStarted();
this.channelzInfoTracker.callTracker.addCallStarted();
}
this.callCount += 1;
}

private onCallEnd(status: StatusObject) {
if (this.channelzEnabled) {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
this.channelzInfoTracker.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
this.channelzInfoTracker.callTracker.addCallFailed();
}
}
this.callCount -= 1;
Expand Down Expand Up @@ -776,7 +794,9 @@ export class InternalChannel {
call.cancelWithStatus(Status.UNAVAILABLE, 'Channel closed before call started');
}
this.pickQueue = [];
clearInterval(this.callRefTimer);
if (this.callRefTimer) {
clearInterval(this.callRefTimer);
}
if (this.idleTimer) {
clearTimeout(this.idleTimer);
}
Expand Down

0 comments on commit 6bd791d

Please sign in to comment.