Skip to content

Commit

Permalink
Refactor code to improve maintainability and add new tests
Browse files Browse the repository at this point in the history
Commented out unused code, removed redundant implementations, and added new tests for Base64 encoding, JetStream consumers, and status handling. Enhanced validation for names and improved coverage tooling with a streamlined "coverage" task.

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Dec 11, 2024
1 parent 6b1ccb5 commit b18ae2d
Show file tree
Hide file tree
Showing 17 changed files with 366 additions and 129 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/deno_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
CI: true
run: |
deno task test-${{ matrix.module }}
deno coverage --include=${{ matrix.module}}/src ./coverage --lcov > ./coverage/out.lcov
deno task coverage
- name: Upload coverage
uses: coverallsapp/github-action@v2
Expand Down
3 changes: 2 additions & 1 deletion deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"test-obj": "deno test -A --parallel --reload --quiet --coverage=coverage obj/tests",
"test-services": "deno test -A --parallel --reload --quiet --coverage=coverage services/tests",
"test-transport-deno": "deno test -A --parallel --reload --quiet --coverage=coverage transport-deno/tests",
"cover": "deno coverage ./coverage --lcov > ./coverage/out.lcov && genhtml -o ./coverage/html ./coverage/out.lcov && open ./coverage/html/index.html",
"coverage": "deno coverage ./coverage --exclude=test_helpers --exclude=tests --exclude=examples --exclude=sha256 --lcov > ./coverage/out.lcov",
"cover": "deno task coverage && genhtml -o ./coverage/html ./coverage/out.lcov && open ./coverage/html/index.html",
"lint": "deno task lint-core && deno task lint-jetstream && deno task lint-kv && deno task lint-obj && deno task lint-services && deno task lint-test_helpers",
"lint-core": "cd core && deno lint",
"lint-jetstream": "cd jetstream && deno lint",
Expand Down
14 changes: 0 additions & 14 deletions jetstream/src/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
*/

import type { Nanos } from "@nats-io/nats-core";
import { nanos } from "@nats-io/nats-core";
import type { StoredMsg } from "./types.ts";

export type ApiPaged = {
Expand Down Expand Up @@ -1053,19 +1052,6 @@ export enum PriorityPolicy {
Overflow = "overflow",
}

export function defaultConsumer(
name: string,
opts: Partial<ConsumerConfig> = {},
): ConsumerConfig {
return Object.assign({
name: name,
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(30 * 1000),
replay_policy: ReplayPolicy.Instant,
}, opts);
}

export type OverflowMinPending = {
/**
* The name of the priority_group
Expand Down
4 changes: 2 additions & 2 deletions jetstream/src/jserrors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ export function isMessageNotFound(err: Error): boolean {
}

export class InvalidNameError extends Error {
constructor(name: string, message: string = "", opts?: ErrorOptions) {
super(`'${name} ${message}`, opts);
constructor(message: string = "", opts?: ErrorOptions) {
super(message, opts);
this.name = "InvalidNameError";
}
}
Expand Down
58 changes: 28 additions & 30 deletions jetstream/src/jsmsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,21 @@ import type {
MsgHdrs,
MsgImpl,
ProtocolHandler,
RequestOptions,
} from "@nats-io/nats-core/internal";
import {
DataBuffer,
deferred,
millis,
nanos,
RequestOne,
} from "@nats-io/nats-core/internal";
import type { DeliveryInfo, PullOptions } from "./jsapi_types.ts";
import type { DeliveryInfo } from "./jsapi_types.ts";

export const ACK = Uint8Array.of(43, 65, 67, 75);
const NAK = Uint8Array.of(45, 78, 65, 75);
const WPI = Uint8Array.of(43, 87, 80, 73);
const NXT = Uint8Array.of(43, 78, 88, 84);
const _NXT = Uint8Array.of(43, 78, 88, 84);
const TERM = Uint8Array.of(43, 84, 69, 82, 77);
const SPACE = Uint8Array.of(32);
const _SPACE = Uint8Array.of(32);

/**
* Represents a message stored in JetStream
Expand Down Expand Up @@ -100,19 +98,19 @@ export type JsMsg = {
*/
working(): void;

/**
* !! this is an experimental feature - and could be removed
*
* next() combines ack() and pull(), requires the subject for a
* subscription processing to process a message is provided
* (can be the same) however, because the ability to specify
* how long to keep the request open can be specified, this
* functionality doesn't work well with iterators, as an error
* (408s) are expected and needed to re-trigger a pull in case
* there was a timeout. In an iterator, the error will close
* the iterator, requiring a subscription to be reset.
*/
next(subj: string, ro?: Partial<PullOptions>): void;
// /**
// * !! this is an experimental feature - and could be removed
// *
// * next() combines ack() and pull(), requires the subject for a
// * subscription processing to process a message is provided
// * (can be the same) however, because the ability to specify
// * how long to keep the request open can be specified, this
// * functionality doesn't work well with iterators, as an error
// * (408s) are expected and needed to re-trigger a pull in case
// * there was a timeout. In an iterator, the error will close
// * the iterator, requiring a subscription to be reset.
// */
// next(subj: string, ro?: Partial<PullOptions>): void;

/**
* Indicate to the JetStream server that processing of the message
Expand Down Expand Up @@ -310,18 +308,18 @@ export class JsMsgImpl implements JsMsg {
this.doAck(WPI);
}

next(subj: string, opts: Partial<PullOptions> = { batch: 1 }) {
const args: Partial<PullOptions> = {};
args.batch = opts.batch || 1;
args.no_wait = opts.no_wait || false;
if (opts.expires && opts.expires > 0) {
args.expires = nanos(opts.expires);
}
const data = new TextEncoder().encode(JSON.stringify(args));
const payload = DataBuffer.concat(NXT, SPACE, data);
const reqOpts = subj ? { reply: subj } as RequestOptions : undefined;
this.msg.respond(payload, reqOpts);
}
// next(subj: string, opts: Partial<PullOptions> = { batch: 1 }) {
// const args: Partial<PullOptions> = {};
// args.batch = opts.batch || 1;
// args.no_wait = opts.no_wait || false;
// if (opts.expires && opts.expires > 0) {
// args.expires = nanos(opts.expires);
// }
// const data = new TextEncoder().encode(JSON.stringify(args));
// const payload = DataBuffer.concat(NXT, SPACE, data);
// const reqOpts = subj ? { reply: subj } as RequestOptions : undefined;
// this.msg.respond(payload, reqOpts);
// }

term(reason = "") {
let term = TERM;
Expand Down
6 changes: 4 additions & 2 deletions jetstream/src/jsutil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* limitations under the License.
*/

import { InvalidNameError } from "./jserrors.ts";

export function validateDurableName(name?: string) {
return minValidation("durable", name);
}
Expand Down Expand Up @@ -43,8 +45,8 @@ export function minValidation(context: string, name = "") {
default:
// nothing
}
throw Error(
`invalid ${context} name - ${context} name cannot contain '${v}'`,
throw new InvalidNameError(
`${context} name ('${name}') cannot contain '${v}'`,
);
}
});
Expand Down
6 changes: 6 additions & 0 deletions jetstream/tests/consume_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
assert,
assertEquals,
assertExists,
assertFalse,
assertRejects,
} from "jsr:@std/assert";
import { initStream } from "./jstest_util.ts";
Expand All @@ -35,6 +36,8 @@ import type { PullConsumerMessagesImpl } from "../src/consumer.ts";
import {
AckPolicy,
DeliverPolicy,
isPullConsumer,
isPushConsumer,
jetstream,
jetstreamManager,
} from "../src/mod.ts";
Expand All @@ -49,6 +52,9 @@ Deno.test("consumers - consume", async () => {

const js = jetstream(nc, { timeout: 30_000 });
const c = await js.consumers.get(stream, consumer);
assert(isPullConsumer(c));
assertFalse(isPushConsumer(c));

const ci = await c.info();
assertEquals(ci.num_pending, count);
const start = Date.now();
Expand Down
2 changes: 1 addition & 1 deletion jetstream/tests/consumers_ordered_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ Deno.test("ordered consumers - name prefix", async () => {
return js.consumers.get("A", { name_prefix: "one.two" });
},
Error,
"invalid name_prefix name - name_prefix name cannot contain '.'",
"name_prefix name ('one.two') cannot contain '.'",
);

await cleanup(ns, nc);
Expand Down
19 changes: 16 additions & 3 deletions jetstream/tests/consumers_push_test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import {
AckPolicy,
DeliverPolicy,
isPullConsumer,
isPushConsumer,
jetstream,
jetstreamManager,
} from "../src/mod.ts";
import { isBoundPushConsumerOptions } from "../src/types.ts";
import { cleanup, jetstreamServerConf, Lock, setup } from "test_helpers";
import { nanos } from "@nats-io/nats-core";
import { assert, assertEquals, assertExists } from "jsr:@std/assert";
import {
assert,
assertEquals,
assertExists,
assertFalse,
} from "jsr:@std/assert";
import type { PushConsumerMessagesImpl } from "../src/pushconsumer.ts";
import { delay } from "@nats-io/nats-core/internal";

Expand All @@ -21,16 +29,21 @@ Deno.test("push consumers - basics", async () => {
js.publish("A.c"),
]);

await jsm.consumers.add("A", {
const opts = {
durable_name: "B",
deliver_subject: "hello",
deliver_policy: DeliverPolicy.All,
idle_heartbeat: nanos(5000),
flow_control: true,
ack_policy: AckPolicy.Explicit,
});
};
assert(isBoundPushConsumerOptions(opts));
await jsm.consumers.add("A", opts);

const c = await js.consumers.getPushConsumer("A", "B");
assert(isPushConsumer(c));
assertFalse(isPullConsumer(c));

let info = await c.info(true);
assertEquals(info.config.deliver_group, undefined);

Expand Down
6 changes: 3 additions & 3 deletions jetstream/tests/jsm_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2125,7 +2125,7 @@ Deno.test("jsm - validate stream name in operations", async () => {
if (v === "\t") v = "\\t";
const m = v === ""
? "stream name required"
: `invalid stream name - stream name cannot contain '${v}'`;
: `stream name ('${names[idx]}') cannot contain '${v}'`;
assertEquals((err as Error).message, m, `${test.name} - ${m}`);
}
}
Expand Down Expand Up @@ -2156,7 +2156,7 @@ Deno.test("jsm - validate consumer name", async () => {
if (v === "\r") v = "\\r";
if (v === "\n") v = "\\n";
if (v === "\t") v = "\\t";
const m = `invalid durable name - durable name cannot contain '${v}'`;
const m = `durable name ('${tests[idx]}') cannot contain '${v}'`;
assertEquals((err as Error).message, m);
}
}
Expand Down Expand Up @@ -2222,7 +2222,7 @@ Deno.test("jsm - validate consumer name in operations", async () => {
if (v === "\t") v = "\\t";
const m = v === ""
? "durable name required"
: `invalid durable name - durable name cannot contain '${v}'`;
: `durable name ('${names[idx]}') cannot contain '${v}'`;
assertEquals((err as Error).message, m, `${test.name} - ${m}`);
}
}
Expand Down
27 changes: 27 additions & 0 deletions jetstream/tests/jsmsg_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
assert,
assertEquals,
assertExists,
assertNotEquals,
assertRejects,
fail,
} from "jsr:@std/assert";
Expand Down Expand Up @@ -320,3 +321,29 @@ Deno.test("jsmsg - time and timestamp", async () => {

await cleanup(ns, nc);
});

Deno.test("jsmsg - reply/sid", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await jetstreamManager(nc) as JetStreamManagerImpl;
await jsm.streams.add({
name: "A",
subjects: ["a.>"],
storage: StorageType.Memory,
allow_direct: true,
});

const js = jetstream(nc);
await js.publish("a.a", "hello");

await jsm.consumers.add("A", {
durable_name: "a",
ack_policy: AckPolicy.None,
});
const oc = await js.consumers.get("A");
const m = await oc.next() as JsMsgImpl;
assertNotEquals(m.reply, "");
assert(m.sid > 0);
assertEquals(m.data, new TextEncoder().encode("hello"));

await cleanup(ns, nc);
});
67 changes: 2 additions & 65 deletions jetstream/tests/jstest_util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,11 @@
* limitations under the License.
*/
import { AckPolicy, jetstream, jetstreamManager } from "../src/mod.ts";
import type { JsMsg, PubAck, StreamConfig } from "../src/mod.ts";
import type { PubAck, StreamConfig } from "../src/mod.ts";

import { assert } from "jsr:@std/assert";
import { Empty, nanos, nuid } from "@nats-io/nats-core";

import type { NatsConnection, QueuedIterator } from "@nats-io/nats-core";

export async function consume(iter: QueuedIterator<JsMsg>): Promise<JsMsg[]> {
const buf: JsMsg[] = [];
await (async () => {
for await (const m of iter) {
m.ack();
buf.push(m);
}
})();
return buf;
}
import type { NatsConnection } from "@nats-io/nats-core";

export async function initStream(
nc: NatsConnection,
Expand Down Expand Up @@ -95,54 +83,3 @@ export function fill(

return Promise.all(a);
}

export function time(): Mark {
return new Mark();
}

export class Mark {
measures: [number, number][];
constructor() {
this.measures = [];
this.measures.push([Date.now(), 0]);
}

mark() {
const now = Date.now();
const idx = this.measures.length - 1;
if (this.measures[idx][1] === 0) {
this.measures[idx][1] = now;
} else {
this.measures.push([now, 0]);
}
}

duration(): number {
const idx = this.measures.length - 1;
if (this.measures[idx][1] === 0) {
this.measures.pop();
}
const times = this.measures.map((v) => v[1] - v[0]);
return times.reduce((result, item) => {
return result + item;
});
}

assertLess(target: number) {
const d = this.duration();
assert(
target >= d,
`duration ${d} not in range - ${target}${d}`,
);
}

assertInRange(target: number) {
const min = .50 * target;
const max = 1.50 * target;
const d = this.duration();
assert(
d >= min && max >= d,
`duration ${d} not in range - ${min}${d} && ${max}${d}`,
);
}
}
Loading

0 comments on commit b18ae2d

Please sign in to comment.