Skip to content

Commit

Permalink
feat: setup auth & retry stubs
Browse files Browse the repository at this point in the history
  • Loading branch information
DaniAkash committed Feb 19, 2024
1 parent 62e29ab commit 46ac29c
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 47 deletions.
42 changes: 37 additions & 5 deletions src/client/auth/helper.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import axios from "axios";
import resources_pb2 from "clarifai-nodejs-grpc/proto/clarifai/api/resources_pb";
import { grpc } from "clarifai-nodejs-grpc";
import { V2Client } from "clarifai-nodejs-grpc/proto/clarifai/api/service_grpc_pb";
import process from "process";

// TypeScript interface for the cache
Expand Down Expand Up @@ -93,11 +95,12 @@ export class ClarifaiAuthHelper {
this._base = base;
this._ui = ui;

this.setBase(base);
this.setUi(ui);

if (validate) {
this.validate();
}

this.setBase(base);
}

private validate(): void {
Expand Down Expand Up @@ -218,10 +221,39 @@ export class ClarifaiAuthHelper {
* Get the API gRPC stub using the right channel based on the API endpoint base.
* TODO: This method is currently not implemented due to the lack of a gRPC V2Stub in clarifai-node.js.
*
* @returns The service_pb2_grpc.V2Stub stub for the API.
* @returns V2Client - The gRPC client to use to make API calls.
*/
getStub(): unknown {
throw new Error("Method not implemented.");
getStub(): V2Client {
if (!(this._base in baseHttpsCache)) {
throw new Error(`Cannot determine if base ${this._base} is https`);
}

const https = baseHttpsCache[this._base];

let client: V2Client;

if (https) {
client = new V2Client(this._base, grpc.ChannelCredentials.createSsl());
} else {
let host: string;
let port: number = 80;
if (this._base.includes(":")) {
[host, port] = this._base
.split(":")
.map((item, index) => (index === 1 ? parseInt(item) : item)) as [
string,
number,
];
} else {
host = this._base;
}
client = new V2Client(
`${host}:${port}`,
grpc.ChannelCredentials.createInsecure(),
);
}

return client;
}

/**
Expand Down
120 changes: 78 additions & 42 deletions src/client/auth/stub.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { credentials, Metadata, ServiceError, status } from "@grpc/grpc-js";
import { ServiceError, status } from "@grpc/grpc-js";
import { V2Client } from "clarifai-nodejs-grpc/proto/clarifai/api/service_grpc_pb";
import { ClarifaiAuthHelper } from "./helper";
import { StatusCode } from "clarifai-nodejs-grpc/proto/clarifai/api/status/status_code_pb";
import { V2Stub } from "./register";
import { grpc } from "clarifai-nodejs-grpc";

const throttleStatusCodes = new Set([
StatusCode.CONN_THROTTLED,
Expand All @@ -13,45 +14,78 @@ const retryCodesGrpc = new Set([
status.UNAVAILABLE, // gRPC status code for retries
]);

// Utility type to extract the first parameter type from a function
type FirstParameterType<T> = T extends (
arg1: infer P,
...args: unknown[]
) => unknown
? P
: never;

// Utility type to extract the callback parameter type from a function
type CallbackParameterType<T> = T extends (
arg1: unknown,
callback: infer P,
...args: unknown[]
) => unknown
? P
: never;

// Utility type to infer response type from callback
type CallbackResponseType<T> = T extends (
error: grpc.ServiceError | null,
response: infer R,
) => void
? R
: never;

export class AuthorizedStub {
private authHelper: ClarifaiAuthHelper;
private stub: V2Client;
private metadata: [string, string][];

constructor(authHelper?: ClarifaiAuthHelper) {
if (!authHelper) {
this.authHelper = ClarifaiAuthHelper.fromEnv();
} else {
this.authHelper = authHelper;
}

constructor(authHelper: ClarifaiAuthHelper) {
this.authHelper = authHelper;
this.stub = new V2Client(authHelper.base, credentials.createSsl());
this.stub = this.authHelper.getStub();
this.metadata = this.authHelper.metadata;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
async makeCall<T>(methodName: string, request: any): Promise<T> {
const metadata = new Metadata();
const authMetadata = this.authHelper.metadata;
Object.keys(authMetadata).forEach((key) => {
// @ts-expect-error TODO: type not clearly defined yet
metadata.set(key, authMetadata[key]);
async makeCall<MethodName extends keyof V2Client>(
methodName: MethodName,
request: FirstParameterType<V2Client[MethodName]>,
): Promise<
CallbackResponseType<CallbackParameterType<V2Client[MethodName]>>
> {
const metadata = new grpc.Metadata();
const authMetadata = this.metadata;
authMetadata.forEach((meta) => {
metadata.set(meta?.[0], meta?.[1]);
});

return new Promise<T>((resolve, reject) => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if (typeof (this.stub as any)[methodName] !== "function") {
return new Promise((resolve, reject) => {
const methodFunction = this.stub[methodName];

if (typeof methodFunction !== "function") {
reject(new Error(`Method ${methodName} does not exist on stub`));
return;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/ban-types
const method: Function = (this.stub as any)[methodName];
method.call(
this.stub,
request,
metadata,
(err: ServiceError | null, response: T) => {
if (err) {
reject(err);
} else {
resolve(response);
}
},
);
// TODO - Fix the type issue with manually invoking the methodFunction
// @ts-expect-error - TS doesn't know that methodFunction has overloads & only expects 5 arguments
methodFunction.call(this.stub, request, metadata, {}, (err, response) => {
if (err) {
reject(err);
} else {
// TODO - Fix the type issue with the response
// @ts-expect-error - Response type is not fully inferred
resolve(response);
}
});
});
}
}
Expand All @@ -61,7 +95,7 @@ export class RetryStub extends AuthorizedStub {
private backoffTime: number;

constructor(
authHelper: ClarifaiAuthHelper,
authHelper?: ClarifaiAuthHelper,
maxAttempts: number = 10,
backoffTime: number = 5,
) {
Expand All @@ -70,11 +104,15 @@ export class RetryStub extends AuthorizedStub {
this.backoffTime = backoffTime;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
async makeCall<T>(methodName: string, request: any): Promise<T> {
async makeCall<MethodName extends keyof V2Client>(
methodName: MethodName,
request: FirstParameterType<V2Client[MethodName]>,
): Promise<
CallbackResponseType<CallbackParameterType<V2Client[MethodName]>>
> {
for (let attempt = 1; attempt <= this.maxAttempts; attempt++) {
try {
const response = await super.makeCall<T>(methodName, request);
const response = await super.makeCall(methodName, request);
return response;
} catch (err) {
const errorCode = (err as ServiceError).code;
Expand All @@ -98,19 +136,17 @@ export class RetryStub extends AuthorizedStub {
}
}

/**
* Create client stub that handles authorization and basic retries for
* unavailable or throttled connections.
* Args:
* authHelper: ClarifaiAuthHelper to use for auth metadata (default: from env)
* maxRetryAttempts: max attempts to retry RPCs with retryable failures (default: 10)
*/
export function createStub(
authHelper: ClarifaiAuthHelper,
authHelper?: ClarifaiAuthHelper,
maxRetryAttempts: number = 10,
): V2Stub {
/*
Create client stub that handles authorization and basic retries for
unavailable or throttled connections.
Args:
authHelper: ClarifaiAuthHelper to use for auth metadata (default: from env)
maxRetryAttempts: max attempts to retry RPCs with retryable failures
*/

// Assuming AuthorizedStub's constructor can handle a null authHelper by defaulting internally or through another mechanism
const stub: AuthorizedStub = new AuthorizedStub(authHelper);

Expand Down
Empty file added src/client/base.ts
Empty file.
86 changes: 86 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { version as nodeVersion } from "process";
import * as os from "os";
import packageJson from "../package.json";

const CLIENT_VERSION = packageJson.version;
const OS_VER = os.platform() + " " + os.release();
const NODE_VERSION = nodeVersion;

interface Response {
status: {
code: string;
description: string;
details: string;
};
}

export class TokenError extends Error {}

export class ApiError extends Error {
resource: string;
params: object;
method: string;
response: Response | null;
error_code: string | null;
error_desc: string | null;
error_details: string | null;

constructor(
resource: string,
params: object,
method: string,
response: Response | null = null,
) {
super();
this.resource = resource;
this.params = params;
this.method = method;
this.response = response;
this.error_code = "N/A";
this.error_desc = "N/A";
this.error_details = "N/A";

let response_json: string = "N/A";
if (response) {
// TODO: Might need a function to convert response to JSON object
const response_json_dict = response; // Adapt based on actual protobuf usage

this.error_code = response_json_dict?.status?.code ?? "N/A";
this.error_desc = response_json_dict?.status?.description ?? "N/A";
this.error_details = response_json_dict?.status?.details ?? "N/A";
response_json = JSON.stringify(response_json_dict.status, null, 2);
}

const current_ts_str = Date.now().toString();

const msg = `${method} ${resource} FAILED(${current_ts_str}). error_code: ${this.error_code}, error_description: ${this.error_desc}, error_details: ${this.error_details}
>> Node client ${CLIENT_VERSION} with Node ${NODE_VERSION} on ${OS_VER}
>> ${method} ${resource}
>> REQUEST(${current_ts_str}) ${JSON.stringify(params, null, 2)}
>> RESPONSE(${current_ts_str}) ${response_json}`;

this.message = msg;
}
}

export class ApiClientError extends Error {}
export class UserError extends Error {}
export class AuthError extends Error {}

export function baseUrl(url: string): string {
try {
return url.slice(0, nthOccurrence(url, "/", 4) + 1);
} catch (e) {
return "";
}
}

export function nthOccurrence(str: string, char: string, n: number): number {
let firstIndex = str.indexOf(char);
let count = 1;
while (firstIndex >= 0 && count < n) {
firstIndex = str.indexOf(char, firstIndex + 1);
count++;
}
return firstIndex;
}

0 comments on commit 46ac29c

Please sign in to comment.