Skip to content

Commit

Permalink
Merge branch 'main' into NET-201-brubeck-client
Browse files Browse the repository at this point in the history
* main:
  broker: Storage node cluster (#180)
  ci: workaround for actions/labeler#136
  style: fix label.yml indentation
  • Loading branch information
timoxley committed Oct 4, 2021
2 parents e2ea1b1 + 92cd55e commit adc3c7e
Show file tree
Hide file tree
Showing 11 changed files with 279 additions and 76 deletions.
30 changes: 30 additions & 0 deletions packages/broker/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/broker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
"eslint-plugin-promise": "^5.1.0",
"jest": "^27.0.5",
"jest-circus": "^27.0.5",
"nock": "^13.1.3",
"sinon": "^11.1.1",
"stream-to-array": "^2.3.0",
"supertest": "^6.1.3",
Expand Down
75 changes: 54 additions & 21 deletions packages/broker/src/plugins/storage/StorageConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ const getKeyFromStream = (streamId: string, streamPartition: number): StreamKey
return `${streamId}::${streamPartition}`
}

const getKeysFromStream = (streamId: string, partitions: number) => {
const keys = new Set<StreamKey>()
const getKeysFromStream = (streamId: string, partitions: number): StreamKey[] => {
const keys: StreamKey[] = []
for (let i = 0; i < partitions; i++) {
keys.add(getKeyFromStream(streamId, i))
keys.push(getKeyFromStream(streamId, i))
}
return keys
}
Expand All @@ -51,22 +51,26 @@ export class StorageConfig {

streamKeys: Set<StreamKey>
listeners: StorageConfigListener[]
nodeId: string
clusterId: string
clusterSize: number
myIndexInCluster: number
apiUrl: string
private _poller!: ReturnType<typeof setTimeout>
private _stopPoller: boolean

// use createInstance method instead: it fetches the up-to-date config from API
constructor(nodeId: string, apiUrl: string) {
constructor(clusterId: string, clusterSize: number, myIndexInCluster: number, apiUrl: string) {
this.streamKeys = new Set<StreamKey>()
this.listeners = []
this.nodeId = nodeId
this.clusterId = clusterId
this.clusterSize = clusterSize
this.myIndexInCluster = myIndexInCluster
this.apiUrl = apiUrl
this._stopPoller = false
}

static async createInstance(nodeId: string, apiUrl: string, pollInterval: number): Promise<StorageConfig> {
const instance = new StorageConfig(nodeId, apiUrl)
static async createInstance(clusterId: string, clusterSize: number, myIndexInCluster: number, apiUrl: string, pollInterval: number): Promise<StorageConfig> {
const instance = new StorageConfig(clusterId, clusterSize, myIndexInCluster, apiUrl)
// eslint-disable-next-line no-underscore-dangle
if (pollInterval !== 0) {
await instance._poll(pollInterval)
Expand Down Expand Up @@ -104,7 +108,7 @@ export class StorageConfig {
}

async refresh(): Promise<void> {
const res = await fetch(`${this.apiUrl}/storageNodes/${this.nodeId}/streams`)
const res = await fetch(`${this.apiUrl}/storageNodes/${this.clusterId}/streams`)
if (!res.ok) {
throw new Error(`Refresh failed: ${res.status} ${await res.text()}`)
}
Expand All @@ -113,9 +117,11 @@ export class StorageConfig {
throw new Error(`Invalid response. Refresh failed: ${json}`)
}

const streamKeys = new Set<StreamKey>(json.flatMap((stream: { id: string, partitions: number }) => ([
...getKeysFromStream(stream.id, stream.partitions)
])))
const streamKeys = new Set<StreamKey>(
json.flatMap((stream: { id: string, partitions: number }) => ([
...getKeysFromStream(stream.id, stream.partitions)
])).filter ((key: StreamKey) => this.belongsToMeInCluster(key))
)
this._setStreams(streamKeys)
}

Expand All @@ -133,22 +139,27 @@ export class StorageConfig {
}
}

private _addStreams(streamKeys: Set<StreamKey>): void {
logger.info('Add %d streams to storage config: %s', streamKeys.size, Array.from(streamKeys).join(','))
this.streamKeys = new Set([...this.streamKeys, ...streamKeys])
private _addStreams(keysToAdd: Set<StreamKey>): void {
logger.info('Add %d streams to storage config: %s', keysToAdd.size, Array.from(keysToAdd).join(','))
this.streamKeys = new Set([...this.streamKeys, ...keysToAdd])
this.listeners.forEach((listener) => {
streamKeys.forEach((key: StreamKey) => listener.onStreamAdded(getStreamFromKey(key)))
keysToAdd.forEach((key: StreamKey) => listener.onStreamAdded(getStreamFromKey(key)))
})
}

private _removeStreams(streamKeys: Set<StreamKey>): void {
logger.info('Remove %d streams from storage config: %s', streamKeys.size, Array.from(streamKeys).join(','))
this.streamKeys = new Set([...this.streamKeys].filter((x) => !streamKeys.has(x)))
private _removeStreams(keysToRemove: Set<StreamKey>): void {
logger.info('Remove %d streams from storage config: %s', keysToRemove.size, Array.from(keysToRemove).join(','))
this.streamKeys = new Set([...this.streamKeys].filter((x) => !keysToRemove.has(x)))
this.listeners.forEach((listener) => {
streamKeys.forEach((key: StreamKey) => listener.onStreamRemoved(getStreamFromKey(key)))
keysToRemove.forEach((key: StreamKey) => listener.onStreamRemoved(getStreamFromKey(key)))
})
}

private belongsToMeInCluster(key: StreamKey): boolean {
const hashedIndex = Protocol.Utils.keyToArrayIndex(this.clusterSize, key.toString())
return hashedIndex === this.myIndexInCluster
}

startAssignmentEventListener(streamrAddress: string, subscriptionManager: SubscriptionManager): (msg: StreamMessage<AssignmentMessage>) => void {
const assignmentStreamId = this.getAssignmentStreamId(streamrAddress)
const messageListener = (msg: StreamMessage<AssignmentMessage>) => {
Expand All @@ -167,7 +178,29 @@ export class StorageConfig {
return messageListener
}

stopAssignmentEventListener(messageListener: (msg: StreamMessage<AssignmentMessage>) => void, streamrAddress: string, subscriptionManager: SubscriptionManager) {
onAssignmentEvent(content: { storageNode: string, stream: { id: string, partitions: number }, event: string }) {
if (content.storageNode && content.storageNode.toLowerCase() == this.clusterId.toLowerCase()) {
logger.trace('Received storage assignment message: %o', content)
const keys = new Set(
getKeysFromStream(content.stream.id, content.stream.partitions)
.filter ((key: StreamKey) => this.belongsToMeInCluster(key))
)

logger.trace('Adding %d of %d partitions in stream %s to this instance', keys.size, content.stream.partitions, content.stream.id)

if (content.event === 'STREAM_ADDED') {
this._addStreams(keys)
} else if (content.event === 'STREAM_REMOVED') {
this._removeStreams(keys)
}
} else if (!content.storageNode) {
logger.error('Received storage assignment message with no storageNode field present: %o', content)
} else {
logger.trace('Received storage assignment message for another storage node: %o', content)
}
}

stopAssignmentEventListener(messageListener: (msg: StreamMessage) => void, streamrAddress: string, subscriptionManager: SubscriptionManager) {
subscriptionManager.networkNode.removeMessageListener(messageListener)
const assignmentStreamId = this.getAssignmentStreamId(streamrAddress)
subscriptionManager.unsubscribe(assignmentStreamId, 0)
Expand Down
13 changes: 12 additions & 1 deletion packages/broker/src/plugins/storage/StoragePlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ export interface StoragePluginConfig {
storageConfig: {
refreshInterval: number
}
cluster: {
// If clusterAddress is null, the broker's address will be used
clusterAddress: string | null,
clusterSize: number,
myIndexInCluster: number
}
}

export class StoragePlugin extends Plugin<StoragePluginConfig> {
Expand Down Expand Up @@ -78,7 +84,12 @@ export class StoragePlugin extends Plugin<StoragePluginConfig> {
private async createStorageConfig() {
const brokerAddress = new Wallet(this.brokerConfig.ethereumPrivateKey).address
const apiUrl = this.brokerConfig.streamrUrl + '/api/v1'
const storageConfig = await StorageConfig.createInstance(brokerAddress, apiUrl, this.pluginConfig.storageConfig.refreshInterval)
const storageConfig = await StorageConfig.createInstance(
this.pluginConfig.cluster.clusterAddress || brokerAddress,
this.pluginConfig.cluster.clusterSize,
this.pluginConfig.cluster.myIndexInCluster,
apiUrl,
this.pluginConfig.storageConfig.refreshInterval)
this.assignmentMessageListener = storageConfig.startAssignmentEventListener(this.brokerConfig.streamrAddress, this.subscriptionManager)
return storageConfig
}
Expand Down
28 changes: 27 additions & 1 deletion packages/broker/src/plugins/storage/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,32 @@
"default": {
"refreshInterval": 600000
}
}
},
"cluster": {
"type": "object",
"description": "Storage node cluster config",
"required": [
"clusterAddress",
"clusterSize",
"myIndexInCluster"
],
"additionalProperties": false,
"properties": {
"clusterAddress": {
"type": ["string", "null"]
},
"clusterSize": {
"type": "number"
},
"myIndexInCluster": {
"type": "number"
}
},
"default": {
"clusterAddress": null,
"clusterSize": 1,
"myIndexInCluster": 0
}
}
}
}
8 changes: 4 additions & 4 deletions packages/broker/test/integration/broker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ describe('broker: end-to-end', () => {
const user1 = Wallet.createRandom()
const user2 = Wallet.createRandom()
client1 = createClient(tracker, user1.privateKey, {
storageNodeRegistry: storageNodeRegistry
storageNodeRegistry,
})
client2 = createClient(tracker, user1.privateKey, {
storageNodeRegistry: storageNodeRegistry
storageNodeRegistry,
})
client3 = createClient(tracker, user2.privateKey, {
storageNodeRegistry: storageNodeRegistry
storageNodeRegistry,
})
assignmentEventManager = new StorageAssignmentEventManager(tracker, engineAndEditorAccount)
assignmentEventManager = new StorageAssignmentEventManager(tracker, engineAndEditorAccount, storageNodeAccount)
await assignmentEventManager.createStream()

// Set up stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ describe('DataMetadataEndpoints', () => {
client1 = createClient(tracker, undefined, {
storageNodeRegistry: storageNodeRegistry,
})
assignmentEventManager = new StorageAssignmentEventManager(tracker, engineAndEditorAccount)
assignmentEventManager = new StorageAssignmentEventManager(tracker, engineAndEditorAccount, storageNodeAccount)
await assignmentEventManager.createStream()
}, 10 * 1000)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ describe('StorageConfig', () => {
enableCassandra: false
})
client = createClient(tracker, publisherAccount.privateKey)
assignmentEventManager = new StorageAssignmentEventManager(tracker, engineAndEditorAccount)
assignmentEventManager = new StorageAssignmentEventManager(tracker, engineAndEditorAccount, storageNodeAccount)
await assignmentEventManager.createStream()
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,27 @@ it.skip('resend cancellation', () => {
const storagePlugin: StoragePlugin = storageNode.plugins.find((p) => p.name === 'storage')
//@ts-expect-error .cassandra is private
storagePlugin.cassandra.requestLast = mockStorageData
assignmentEventManager = new StorageAssignmentEventManager(tracker, engineAndEditorAccount)
assignmentEventManager = new StorageAssignmentEventManager(tracker, engineAndEditorAccount, storageNodeAccount)
await assignmentEventManager.createStream()
}, 10 * 1000)

afterAll(async () => {
afterEach(async () => {
await destroy()
await networkNode.stop()
await websocketServer.close()
await tracker.stop()
await client.destroy()
await storageNode.stop()
await assignmentEventManager.close()
})

beforeAll(async () => {
mockDataQueryServer = await createMockDataServer()
})

afterAll(async () => {
mockDataQueryServer.close()
await once(mockDataQueryServer, 'close')
})


async function setUpStream(): Promise<Stream> {
const freshStream = await createTestStream(client, module)
await freshStream.addToStorageNode(storageNodeAccount.address)
Expand All @@ -112,5 +122,4 @@ it.skip('resend cancellation', () => {
await p
expect(mockStorageData.destroyed).toBe(true)
})
*/
})
Loading

0 comments on commit adc3c7e

Please sign in to comment.