Skip to content

Commit

Permalink
results
Browse files Browse the repository at this point in the history
  • Loading branch information
hazae41 committed May 11, 2023
1 parent 65b327d commit fc35938
Show file tree
Hide file tree
Showing 8 changed files with 1,046 additions and 268 deletions.
997 changes: 864 additions & 133 deletions package-lock.json

Large diffs are not rendered by default.

23 changes: 13 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,26 @@
"prepare": "npm run build"
},
"dependencies": {
"@hazae41/binary": "^1.2.10",
"@hazae41/bytes": "^1.0.4",
"@hazae41/cascade": "^1.0.0",
"@hazae41/binary": "^1.2.26",
"@hazae41/bytes": "^1.1.7",
"@hazae41/cascade": "^1.1.0",
"@hazae41/cursor": "^1.1.11",
"@hazae41/future": "^1.0.3",
"@hazae41/plume": "^1.0.0"
"@hazae41/option": "^1.0.6",
"@hazae41/plume": "^2.0.1",
"@hazae41/result": "^1.0.20"
},
"devDependencies": {
"@hazae41/deimos": "^1.0.6",
"@hazae41/phobos": "^1.0.10",
"@rollup/plugin-inject": "^5.0.3",
"@rollup/plugin-typescript": "^11.0.0",
"@types/node": "^18.15.7",
"rimraf": "^4.4.1",
"rollup": "^3.20.2",
"@rollup/plugin-typescript": "^11.1.0",
"@types/node": "^20.1.2",
"rimraf": "^5.0.0",
"rollup": "^3.21.6",
"rollup-plugin-dts": "^5.3.0",
"rollup-plugin-node-externals": "^5.1.2",
"typescript": "^5.0.2"
"rollup-plugin-node-externals": "^6.0.0",
"typescript": "^5.0.4"
},
"exports": {
".": {
Expand Down
45 changes: 35 additions & 10 deletions src/libs/transports/websocket.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { Opaque, Writable } from "@hazae41/binary"
import { ResultableUnderlyingDefaultSource, ResultableUnderlyingSink, SuperReadableStream, SuperWritableStream } from "@hazae41/cascade"
import { Ok, Result } from "@hazae41/result"

export async function createWebSocketStream(url: string) {
const websocket = new WebSocket(url)
Expand Down Expand Up @@ -31,6 +33,9 @@ export type WebSocketStreamParams =
& WebSocketSinkParams

export class WebSocketStream {
readonly reader: SuperReadableStream<Opaque>
readonly writer: SuperWritableStream<Writable>

readonly readable: ReadableStream<Opaque>
readonly writable: WritableStream<Writable>

Expand All @@ -47,8 +52,11 @@ export class WebSocketStream {
if (websocket.binaryType !== "arraybuffer")
throw new Error(`WebSocket binaryType is not arraybuffer`)

this.readable = new ReadableStream(new WebSocketSource(websocket, params))
this.writable = new WritableStream(new WebSocketSink(websocket, params))
this.reader = new SuperReadableStream(new WebSocketSource(websocket, params))
this.writer = new SuperWritableStream(new WebSocketSink(websocket, params))

this.readable = this.reader.start()
this.writable = this.writer.start()
}
}

Expand All @@ -60,7 +68,7 @@ export interface WebSocketSourceParams {
shouldCloseOnCancel?: boolean
}

export class WebSocketSource implements UnderlyingDefaultSource<Opaque> {
export class WebSocketSource implements ResultableUnderlyingDefaultSource<Opaque> {

constructor(
readonly websocket: WebSocket,
Expand Down Expand Up @@ -95,13 +103,18 @@ export class WebSocketSource implements UnderlyingDefaultSource<Opaque> {
this.websocket.addEventListener("message", onMessage, { passive: true })
this.websocket.addEventListener("error", onError, { passive: true })
this.websocket.addEventListener("close", onClose, { passive: true })

return Ok.void()
}

async cancel() {
if (!this.params.shouldCloseOnCancel) return
if (!this.params.shouldCloseOnCancel)
return Ok.void()

this.websocket.close()
return Ok.void()
}

}

export interface WebSocketSinkParams {
Expand All @@ -119,7 +132,7 @@ export interface WebSocketSinkParams {
shouldCloseOnAbort?: boolean
}

export class WebSocketSink implements UnderlyingSink<Writable> {
export class WebSocketSink implements ResultableUnderlyingSink<Writable> {

constructor(
readonly websocket: WebSocket,
Expand All @@ -146,23 +159,35 @@ export class WebSocketSink implements UnderlyingSink<Writable> {

this.websocket.addEventListener("error", onError, { passive: true })
this.websocket.addEventListener("close", onClose, { passive: true })

return Ok.void()
}

async write(chunk: Writable) {
const bytes = Writable.toBytes(chunk)
async write(chunk: Writable): Promise<Result<void, unknown>> {
const bytes = Writable.tryWriteToBytes(chunk)

if (bytes.isErr())
return bytes

// console.debug("ws ->", bytes)
this.websocket.send(bytes)
this.websocket.send(bytes.inner)

return Ok.void()
}

async abort() {
if (!this.params.shouldCloseOnAbort) return
if (!this.params.shouldCloseOnAbort)
return Ok.void()

await tryClose(this.websocket)
return Ok.void()
}

async close() {
if (!this.params.shouldCloseOnClose) return
if (!this.params.shouldCloseOnClose)
return Ok.void()

await tryClose(this.websocket)
return Ok.void()
}
}
63 changes: 41 additions & 22 deletions src/mods/kcp/reader.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import { Cursor, Empty, Opaque } from "@hazae41/binary";
import { Empty, Opaque, Readable } from "@hazae41/binary";
import { SuperTransformStream } from "@hazae41/cascade";
import { AsyncEventTarget, CloseAndErrorEvents } from "@hazae41/plume";
import { Cursor } from "@hazae41/cursor";
import { EventError, StreamEvents, SuperEventTarget } from "@hazae41/plume";
import { Ok, Result } from "@hazae41/result";
import { KcpSegment } from "./segment.js";
import { SecretKcpDuplex } from "./stream.js";

export type SecretKcpReaderEvents = CloseAndErrorEvents & {
ack: MessageEvent<KcpSegment<Opaque>>
export type SecretKcpReaderEvents = StreamEvents & {
ack: KcpSegment<Opaque>
}

export class SecretKcpReader {

readonly events = new AsyncEventTarget<SecretKcpReaderEvents>()
readonly events = new SuperEventTarget<SecretKcpReaderEvents>()

readonly stream: SuperTransformStream<Opaque, Opaque>

Expand All @@ -24,52 +26,64 @@ export class SecretKcpReader {
})
}

async #onRead(chunk: Opaque) {
async #onRead(chunk: Opaque): Promise<Result<void, EventError>> {
const cursor = new Cursor(chunk.bytes)

while (cursor.remaining) {
const segment = KcpSegment.tryRead(cursor)
const segment = Readable.tryReadOrRollback(KcpSegment, cursor)

if (!segment) {
if (segment.isErr()) {
console.warn(`Not a KCP segment`)
break
}

await this.#onSegment(segment)
const result = await this.#onSegment(segment.inner)

if (result.isErr())
return result

continue
}

return Ok.void()
}

async #onSegment(segment: KcpSegment<Opaque>) {
async #onSegment(segment: KcpSegment<Opaque>): Promise<Result<void, EventError>> {
if (segment.conversation !== this.parent.conversation)
return
return Ok.void()

if (segment.command === KcpSegment.commands.push)
return await this.#onPushSegment(segment)
if (segment.command === KcpSegment.commands.ack)
return await this.#onAckSegment(segment)
if (segment.command === KcpSegment.commands.wask)
return await this.#onWaskSegment(segment)

console.warn(`Unknown KCP command`)
return Ok.void()
}

async #onPushSegment(segment: KcpSegment<Opaque>) {
async #onPushSegment(segment: KcpSegment<Opaque>): Promise<Result<void, never>> {
const conversation = this.parent.conversation
const command = KcpSegment.commands.ack
const timestamp = segment.timestamp
const serial = segment.serial
const unackSerial = this.parent.recv_counter
const fragment = new Empty()
const ack = KcpSegment.new({ conversation, command, timestamp, serial, unackSerial, fragment })
this.parent.writer.stream.enqueue(ack.prepare())

const ack = KcpSegment.tryNew({ conversation, command, timestamp, serial, unackSerial, fragment }).inner

this.parent.writer.stream.enqueue(ack)

if (segment.serial < this.parent.recv_counter) {
console.warn(`Received previous KCP segment`)
return
return Ok.void()
}

if (segment.serial > this.parent.recv_counter) {
console.warn(`Received next KCP segment`)
this.#buffer.set(segment.serial, segment)
return
return Ok.void()
}

this.stream.enqueue(segment.fragment)
Expand All @@ -83,21 +97,26 @@ export class SecretKcpReader {
this.#buffer.delete(this.parent.recv_counter)
this.parent.recv_counter++
}

return Ok.void()
}

async #onAckSegment(segment: KcpSegment<Opaque>) {
const msgEvent = new MessageEvent("ack", { data: segment })
await this.events.dispatchEvent(msgEvent, "ack")
async #onAckSegment(segment: KcpSegment<Opaque>): Promise<Result<void, EventError>> {
return await this.events.tryEmit("ack", segment).then(r => r.clear())
}

async #onWaskSegment(segment: KcpSegment<Opaque>) {
async #onWaskSegment(segment: KcpSegment<Opaque>): Promise<Result<void, never>> {
const conversation = this.parent.conversation
const command = KcpSegment.commands.wins
const serial = 0
const unackSerial = this.parent.recv_counter
const fragment = new Empty()
const wins = KcpSegment.new({ conversation, command, serial, unackSerial, fragment })
this.parent.writer.stream.enqueue(wins.prepare())

const wins = KcpSegment.tryNew<Empty>({ conversation, command, serial, unackSerial, fragment }).inner

this.parent.writer.stream.enqueue(wins)

return Ok.void()
}

}
18 changes: 14 additions & 4 deletions src/mods/kcp/segment.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,19 @@ console.log(relative(directory, pathname.replace(".mjs", ".ts")))
globalThis.crypto = webcrypto as any

test("kcp segment", async ({ test }) => {
const frame = new KcpSegment(12345, KcpSegment.commands.push, 0, 65535, Date.now() / 1000, 0, 0, Opaque.random(130))
const bytes = Writable.toBytes(frame.prepare())
const frame2 = Readable.fromBytes(KcpSegment, bytes)
const conversation = 12345
const command = KcpSegment.commands.push
const count = 0
const window = 65_535
const timestamp = Date.now() / 1000
const serial = 0
const unackSerial = 0
const fragment = Opaque.random(130)

assert(Bytes.equals(frame.fragment.bytes, frame2.fragment.bytes))
const segment = KcpSegment.tryNew({ conversation, command, count, window, timestamp, serial, unackSerial, fragment }).unwrap()

const bytes = Writable.tryWriteToBytes(segment).unwrap()
const frame2 = Readable.tryReadFromBytes(KcpSegment, bytes).unwrap()

assert(Bytes.equals2(segment.fragment.bytes, frame2.fragment.bytes))
})
Loading

0 comments on commit fc35938

Please sign in to comment.