Skip to content

Commit

Permalink
Event delivery reliability using acknowledgements (#921)
Browse files Browse the repository at this point in the history
* enabling event_acks and sending acks on `signalwire.event`

* changeset

* Update packages/core/src/BaseSession.ts

Co-authored-by: Ammar Ansari <[email protected]>

* Update packages/core/src/BaseSession.ts

Co-authored-by: Ammar Ansari <[email protected]>

* Update packages/core/src/BaseSession.ts

Co-authored-by: Ammar Ansari <[email protected]>

---------

Co-authored-by: Ammar Ansari <[email protected]>
  • Loading branch information
jpsantosbh and iAmmar7 authored Dec 14, 2023
1 parent 9e9b6f5 commit 03f01c3
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 22 deletions.
5 changes: 5 additions & 0 deletions .changeset/beige-tomatoes-own.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@signalwire/core': minor
---

support for eventing acknowledge
78 changes: 56 additions & 22 deletions packages/core/src/BaseSession.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ describe('BaseSession', () => {
ws.on('connection', (socket: any) => {
socket.on('message', (data: any) => {
const parsedData = JSON.parse(data)
socket.send(
JSON.stringify({
jsonrpc: '2.0',
id: parsedData.id,
result: {},
})
)
if (parsedData.params) {
socket.send(
JSON.stringify({
jsonrpc: '2.0',
id: parsedData.id,
result: {},
})
)
}
})
})

Expand All @@ -58,6 +60,10 @@ describe('BaseSession', () => {
WS.clean()
})

it('should include events_ack on RPCConnect message', () => {
expect(rpcConnect.params.event_acks).toBeTruthy()
})

it('should connect and disconnect to/from the provided host', async () => {
session.connect()
await ws.connected
Expand Down Expand Up @@ -98,23 +104,51 @@ describe('BaseSession', () => {
expect(session.status).toEqual('idle')
})

it('should invoke dispatch with socketMessage action for any other message', async () => {
session.connect()
await ws.connected
describe('signalwire.event messages', () => {
it('should invoke dispatch with socketMessage action for any other message', async () => {
session.connect()
await ws.connected

await expect(ws).toReceiveMessage(JSON.stringify(rpcConnect))
const request = {
jsonrpc: '2.0' as const,
id: 'uuid',
method: 'signalwire.event' as const,
params: {
key: 'value',
},
}
ws.send(JSON.stringify(request))
await expect(ws).toReceiveMessage(JSON.stringify(rpcConnect))
const request = {
jsonrpc: '2.0' as const,
id: 'uuid',
method: 'signalwire.event' as const,
params: {
key: 'value',
},
}
ws.send(JSON.stringify(request))

expect(session.dispatch).toHaveBeenCalledTimes(1)
expect(session.dispatch).toHaveBeenCalledWith(
socketMessageAction(request)
)
})

expect(session.dispatch).toHaveBeenCalledTimes(1)
expect(session.dispatch).toHaveBeenCalledWith(socketMessageAction(request))
it('should send acknowledge message on signalwire.event', async () => {
session.connect()
await ws.connected

await expect(ws).toReceiveMessage(JSON.stringify(rpcConnect))
const request = {
jsonrpc: '2.0' as const,
id: 'uuid',
method: 'signalwire.event' as const,
params: {
key: 'value',
},
}
ws.send(JSON.stringify(request))

await expect(ws).toReceiveMessage(
JSON.stringify({
jsonrpc: '2.0' as const,
id: 'uuid',
result: {},
})
)
})
})

describe('signalwire.ping messages', () => {
Expand Down
14 changes: 14 additions & 0 deletions packages/core/src/BaseSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
DEFAULT_CONNECT_VERSION,
RPCDisconnectResponse,
RPCPingResponse,
RPCEventAckResponse,
} from './RPCMessages'
import {
SessionOptions,
Expand Down Expand Up @@ -470,6 +471,9 @@ export class BaseSession {
break
}
default:
this._eventAcknowledgingHandler(payload).catch((error) =>
this.logger.error('Event Acknowledging Error', error)
)
// If it's not a response, trigger the dispatch.
this.dispatch(socketMessageAction(payload))
}
Expand Down Expand Up @@ -556,6 +560,16 @@ export class BaseSession {
await this.execute(RPCPingResponse(payload.id, payload?.params?.timestamp))
}

private async _eventAcknowledgingHandler(
payload: JSONRPCRequest
): Promise<void> {
const { method, id } = payload
if (method === 'signalwire.event') {
return this.execute(RPCEventAckResponse(id))
}
return Promise.resolve()
}

/**
* Do something based on the current `this._status`
*/
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/RPCMessages/RPCConnect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export type RPCConnectParams = {
authorization_state?: string
contexts?: string[]
topics?: string[]
event_acks?: boolean
}

export const DEFAULT_CONNECT_VERSION = {
Expand All @@ -24,6 +25,7 @@ export const RPCConnect = (params: RPCConnectParams) => {
method: 'signalwire.connect',
params: {
version: DEFAULT_CONNECT_VERSION,
event_acks: true,
...params,
},
})
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/RPCMessages/RPCEventAck.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { makeRPCResponse } from './helpers';


export const RPCEventAckResponse = (id: string) => makeRPCResponse({ id, result: {} })
5 changes: 5 additions & 0 deletions packages/core/src/RPCMessages/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ describe('RPC Messages', () => {
params: {
authentication: { project: 'project', token: 'token' },
version: DEFAULT_CONNECT_VERSION,
event_acks: true,
},
})
})
Expand All @@ -51,6 +52,7 @@ describe('RPC Messages', () => {
version: DEFAULT_CONNECT_VERSION,
protocol: 'old-proto',
contexts: ['test'],
event_acks: true,
},
})
})
Expand All @@ -65,6 +67,7 @@ describe('RPC Messages', () => {
params: {
authentication: { project: 'project', jwt_token: 'jwt' },
version: DEFAULT_CONNECT_VERSION,
event_acks: true,
},
})
})
Expand All @@ -86,6 +89,7 @@ describe('RPC Messages', () => {
authentication: { project: 'project', jwt_token: 'jwt' },
version: DEFAULT_CONNECT_VERSION,
agent: 'Jest Random Test',
event_acks: true,
},
})
})
Expand All @@ -104,6 +108,7 @@ describe('RPC Messages', () => {
authentication: { jwt_token: 'jwt' },
version: DEFAULT_CONNECT_VERSION,
agent: 'Jest Random Test',
event_acks: true,
},
})
})
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/RPCMessages/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export * from './RPCPing'
export * from './RPCExecute'
export * from './RPCDisconnect'
export * from './VertoMessages'
export * from './RPCEventAck'

0 comments on commit 03f01c3

Please sign in to comment.