diff --git a/backend/src/index.ts b/backend/src/index.ts index f9f93ba3ea..795c3bd2d1 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -18,6 +18,7 @@ import HomeMetricsService from './service/home-metrics.service'; import CommandService from './service/oia/command.service'; import RegistrationService from './service/oia/registration.service'; import ProxyServer from './web-server/proxy-server'; +import ConnectionService from './service/connection.service'; import OIAnalyticsMessageService from './service/oia/message.service'; const CONFIG_DATABASE = 'oibus.db'; @@ -88,8 +89,9 @@ const LOG_DB_NAME = 'logs.db'; repositoryService.registrationRepository.getRegistrationSettings() ); + const connectionService = new ConnectionService(loggerService.logger!); const northService = new NorthService(encryptionService, repositoryService); - const southService = new SouthService(encryptionService, repositoryService); + const southService = new SouthService(encryptionService, repositoryService, connectionService); const historyQueryService = new HistoryQueryService(repositoryService); const engineMetricsService = new EngineMetricsService(loggerService.logger!, oibusSettings.id, repositoryService.engineMetricsRepository); diff --git a/backend/src/repository/south-connector.repository.spec.ts b/backend/src/repository/south-connector.repository.spec.ts index bd26f7e557..fa43eba98d 100644 --- a/backend/src/repository/south-connector.repository.spec.ts +++ b/backend/src/repository/south-connector.repository.spec.ts @@ -82,7 +82,7 @@ describe('South connector repository', () => { ]); const southConnectors = repository.getSouthConnectors(); expect(database.prepare).toHaveBeenCalledWith( - 'SELECT id, name, type, description, enabled, history_max_instant_per_item AS maxInstantPerItem, ' + + 'SELECT id, name, type, description, enabled, history_max_instant_per_item AS maxInstantPerItem, shared_connection as sharedConnection, ' + 'history_max_read_interval AS maxReadInterval, history_read_delay AS readDelay, history_read_overlap AS overlap, settings FROM south_connectors;' ); expect(southConnectors).toEqual(expectedValue); @@ -117,7 +117,7 @@ describe('South connector repository', () => { }); const southConnector = repository.getSouthConnector('id1'); expect(database.prepare).toHaveBeenCalledWith( - 'SELECT id, name, type, description, enabled, history_max_instant_per_item AS maxInstantPerItem, ' + + 'SELECT id, name, type, description, enabled, history_max_instant_per_item AS maxInstantPerItem, shared_connection as sharedConnection, ' + 'history_max_read_interval AS maxReadInterval, history_read_delay AS readDelay, history_read_overlap AS overlap, settings FROM south_connectors WHERE id = ?;' ); expect(get).toHaveBeenCalledWith('id1'); @@ -128,7 +128,7 @@ describe('South connector repository', () => { get.mockReturnValueOnce(null); const southConnector = repository.getSouthConnector('id1'); expect(database.prepare).toHaveBeenCalledWith( - 'SELECT id, name, type, description, enabled, history_max_instant_per_item AS maxInstantPerItem, ' + + 'SELECT id, name, type, description, enabled, history_max_instant_per_item AS maxInstantPerItem, shared_connection as sharedConnection, ' + 'history_max_read_interval AS maxReadInterval, history_read_delay AS readDelay, history_read_overlap AS overlap, settings FROM south_connectors WHERE id = ?;' ); expect(get).toHaveBeenCalledWith('id1'); diff --git a/backend/src/repository/south-connector.repository.ts b/backend/src/repository/south-connector.repository.ts index 0e1370df14..97e75aa423 100644 --- a/backend/src/repository/south-connector.repository.ts +++ b/backend/src/repository/south-connector.repository.ts @@ -15,7 +15,7 @@ export default class SouthConnectorRepository { */ getSouthConnectors(): Array { const query = - `SELECT id, name, type, description, enabled, history_max_instant_per_item AS maxInstantPerItem, ` + + `SELECT id, name, type, description, enabled, history_max_instant_per_item AS maxInstantPerItem, shared_connection as sharedConnection, ` + `history_max_read_interval AS maxReadInterval, history_read_delay AS readDelay, history_read_overlap AS overlap, ` + `settings FROM ${SOUTH_CONNECTORS_TABLE};`; return this.database @@ -27,6 +27,7 @@ export default class SouthConnectorRepository { type: result.type, description: result.description, enabled: Boolean(result.enabled), + sharedConnection: result.sharedConnection, history: { maxInstantPerItem: Boolean(result.maxInstantPerItem), maxReadInterval: result.maxReadInterval, @@ -42,7 +43,7 @@ export default class SouthConnectorRepository { */ getSouthConnector(id: string): SouthConnectorDTO | null { const query = - `SELECT id, name, type, description, enabled, history_max_instant_per_item AS maxInstantPerItem, ` + + `SELECT id, name, type, description, enabled, history_max_instant_per_item AS maxInstantPerItem, shared_connection as sharedConnection, ` + `history_max_read_interval AS maxReadInterval, history_read_delay AS readDelay, history_read_overlap AS overlap, ` + `settings FROM ${SOUTH_CONNECTORS_TABLE} WHERE id = ?;`; const result: any = this.database.prepare(query).get(id); @@ -57,6 +58,7 @@ export default class SouthConnectorRepository { type: result.type, description: result.description, enabled: Boolean(result.enabled), + sharedConnection: result.sharedConnection, history: { maxInstantPerItem: Boolean(result.maxInstantPerItem), maxReadInterval: result.maxReadInterval, diff --git a/backend/src/service/connection.service.spec.ts b/backend/src/service/connection.service.spec.ts new file mode 100644 index 0000000000..cd18a1d55c --- /dev/null +++ b/backend/src/service/connection.service.spec.ts @@ -0,0 +1,365 @@ +import ConnectionService, { ManagedConnectionDTO, ManagedConnection } from './connection.service'; +import PinoLogger from '../tests/__mocks__/logger.mock'; +import pino from 'pino'; +import { SouthSettings } from '../../../shared/model/south-settings.model'; + +const promiseReject = jest.fn(); +const promiseResolve = jest.fn(); +const promiseFn = jest.fn(); + +jest.mock('./deferred-promise', () => { + return { + __esModule: true, + default: class { + reject = () => promiseReject(); + resolve = () => promiseResolve(); + get promise() { + return promiseFn(); + } + } + }; +}); + +const serviceLogger: pino.Logger = new PinoLogger(); +const connectionLogger: pino.Logger = new PinoLogger(); + +const closeFn = jest.fn(() => Promise.resolve()); +const mockSession = { close: closeFn }; +const connectionDTO: ManagedConnectionDTO = { + type: 'opcua', + connectorSettings: { + foo: 'bar', + bar: 'baz' + } as unknown as SouthSettings, + createSessionFn: jest.fn(() => Promise.resolve(mockSession)), + settings: { + closeFnName: 'close', + sharedConnection: false + } +}; + +const sharedConnectionDTO: ManagedConnectionDTO = { + type: 'opcua', + connectorSettings: { + foo: 'bar', + bar: 'baz' + } as unknown as SouthSettings, + createSessionFn: jest.fn(() => Promise.resolve(mockSession)), + settings: { + closeFnName: 'close', + sharedConnection: true + } +}; + +describe('ConnectionService', () => { + let connectionService: ConnectionService; + + beforeEach(() => { + jest.clearAllMocks(); + (serviceLogger.child as jest.Mock).mockReturnValue(serviceLogger); + connectionService = new ConnectionService(serviceLogger); + (serviceLogger.child as jest.Mock).mockReturnValue(connectionLogger); + }); + + it('should create a new connection', () => { + const connection = connectionService.create('connectorId', connectionDTO); + const connectionMetadata = connectionService['findConnection']('connectorId', connectionDTO); + + expect(serviceLogger.debug).toHaveBeenCalledWith('New connection of type "opcua" created'); + expect(connectionMetadata).toStrictEqual({ + instance: connection, + reliantConnectorIds: new Set(['connectorId']), + connectorSettings: connectionDTO.connectorSettings, + sharedConnection: false + }); + }); + + it('should return an existing connection if it matches the settings', () => { + const connection = connectionService.create('connectorId', connectionDTO); + const newConnection: ManagedConnection = connectionService.create('connectorId2', connectionDTO); + + const sharedConnection = connectionService.create('sharedConnectorId', sharedConnectionDTO); + const newSharedConnection: ManagedConnection = connectionService.create('sharedConnectorId2', sharedConnectionDTO); + + expect(connection).not.toEqual(newConnection); // Not to be equal, since these are not shared connections + expect(sharedConnection).toStrictEqual(newSharedConnection); // To be equal, since these are shared connections + + expect(connection).not.toEqual(sharedConnection); + expect(newConnection).not.toEqual(newSharedConnection); + + expect((serviceLogger.debug as jest.Mock).mock.calls).toEqual([ + ['New connection of type "opcua" created'], // First connection (not shared) + ['New connection of type "opcua" created'], // Second connection (not shared) + ['New connection of type "opcua" created'] // First shared connection + ]); + expect((serviceLogger.trace as jest.Mock).mock.calls).toEqual([ + ['Connection of type "opcua" already exists, returning existing connection'] // Second shared connection + ]); + }); + + it('should remove a connection used by one connector', async () => { + const connection = connectionService.create('connectorId', connectionDTO); + const sharedConnection = connectionService.create('sharedConnectorId', sharedConnectionDTO); + + const connectionCloseSpy = jest.spyOn(connection, 'close'); + const sharedConnectionCloseSpy = jest.spyOn(sharedConnection, 'close'); + + await connectionService.remove('opcua', 'connectorId'); + await connectionService.remove('opcua', 'sharedConnectorId'); + + // Ensure that the connections are removed + expect(connectionService['findConnection']('connectorId', connectionDTO)).toBeNull(); + expect(connectionService['findConnection']('sharedConnectorId', sharedConnectionDTO)).toBeNull(); + + // Ensure that the close function is called + expect(connectionCloseSpy).toHaveBeenCalled(); + expect(sharedConnectionCloseSpy).toHaveBeenCalled(); + + expect((serviceLogger.debug as jest.Mock).mock.calls).toEqual([ + ['New connection of type "opcua" created'], // Connection (not shared) + ['New connection of type "opcua" created'], // Shared connection + + ['Connector "connectorId" removed from connection of type "opcua"'], // Connection (not shared) + ['Connection of type "opcua" removed'], // Connection (not shared) + ['Connector "sharedConnectorId" removed from connection of type "opcua"'], // Shared connection + ['Connection of type "opcua" removed'] // Shared connection + ]); + + expect((serviceLogger.trace as jest.Mock).mock.calls).toEqual([ + ['No reliant connectors left for connection of type "opcua", closing connection'], + ['No reliant connectors left for connection of type "opcua", closing connection'] + ]); + }); + + it('should not remove a connection with unknown type', async () => { + await connectionService.remove('unknown', 'connectorId'); + expect(serviceLogger.trace).toHaveBeenCalledWith('Connection type "unknown" not found. Nothing to remove'); + }); + + it('should not remove a connection with unknown connector id', async () => { + connectionService.create('connectorId', connectionDTO); + await connectionService.remove('opcua', 'unknown'); + expect(serviceLogger.trace).toHaveBeenCalledWith('No connection found for connector "unknown" of type "opcua". Nothing to remove'); + }); + + it('should not close a connection with reliant connectors', async () => { + connectionService.create('sharedConnectorId', sharedConnectionDTO); + const sharedConnection2 = connectionService.create('sharedConnectorId2', sharedConnectionDTO); + + const sharedConnectionCloseSpy = jest.spyOn(sharedConnection2, 'close'); + + await connectionService.remove('opcua', 'sharedConnectorId2'); + + // Ensure that the connection is removed + const connectionMeta = connectionService['findConnection']('sharedConnectorId', sharedConnectionDTO); + expect(connectionMeta?.reliantConnectorIds).toEqual(new Set(['sharedConnectorId'])); + + // Ensure that the close function is not called + expect(sharedConnectionCloseSpy).not.toHaveBeenCalled(); + + expect((serviceLogger.debug as jest.Mock).mock.calls).toEqual([ + ['New connection of type "opcua" created'], // Shared connection + + ['Connector "sharedConnectorId2" removed from connection of type "opcua"'] // Shared connection + ]); + + expect((serviceLogger.trace as jest.Mock).mock.calls).not.toEqual([ + ['No reliant connectors left for connection of type "opcua", closing connection'] + ]); + }); + + it('should not find a connection which does not exist', () => { + const foundConnection = connectionService['findConnection']('unknown', connectionDTO); + expect(foundConnection).toBeNull(); + }); +}); + +describe('ManagedConnectionClass', () => { + let connectionService: ConnectionService; + + beforeEach(() => { + jest.useFakeTimers(); + jest.clearAllMocks(); + (serviceLogger.child as jest.Mock).mockReturnValue(serviceLogger); + connectionService = new ConnectionService(serviceLogger); + (serviceLogger.child as jest.Mock).mockReturnValue(connectionLogger); + }); + + it('should create a new session', async () => { + const connection = connectionService.create('connectorId', connectionDTO); + const session = await connection.getSession(); + + expect(session).toBeDefined(); + expect(connectionDTO.createSessionFn).toHaveBeenCalled(); + expect(promiseResolve).toHaveBeenCalled(); + expect((connectionLogger.trace as jest.Mock).mock.calls).toEqual([['Getting session'], ['Creating new session'], ['Session created']]); + }); + + it('should return an existing session', async () => { + const connection = connectionService.create('connectorId', connectionDTO); + const session1 = await connection.getSession(); + const session2 = await connection.getSession(); + + expect(session1).toStrictEqual(session2); + expect(connectionDTO.createSessionFn).toHaveBeenCalledTimes(1); + expect(promiseResolve).toHaveBeenCalledTimes(1); + expect((connectionLogger.trace as jest.Mock).mock.calls).toEqual([ + ['Getting session'], + ['Creating new session'], + ['Session created'], + + ['Getting session'], + ['Session already exists, returning existing session'] + ]); + }); + + it('should throw an error when creating a session fails', async () => { + (connectionDTO.createSessionFn as jest.Mock).mockReturnValueOnce(null); + const connection = connectionService.create('connectorId', connectionDTO); + + await expect(connection.getSession()).rejects.toThrow('Session create function returned null'); + expect((connectionLogger.error as jest.Mock).mock.calls).toEqual([ + ['Session could not be created: Session create function returned null'] + ]); + }); + + it('should throw an error when creating a session throws an error', async () => { + (connectionDTO.createSessionFn as jest.Mock).mockImplementationOnce(() => { + throw new Error('Dummy error'); + }); + const connection = connectionService.create('connectorId', connectionDTO); + + await expect(connection.getSession()).rejects.toThrow('Dummy error'); + expect((connectionLogger.error as jest.Mock).mock.calls).toEqual([['Session could not be created: Dummy error']]); + }); + + it('should wait when two sessions are created at the same time', async () => { + // promiseFn will be called when promise is awaited + // and we assign that resolve function to promiseResolve + // so getSession will be able to resolve the promise + promiseFn.mockReturnValueOnce( + new Promise(resolve => { + promiseResolve.mockImplementationOnce(resolve); + }) + ); + + // mock the creation time of the session + (connectionDTO.createSessionFn as jest.Mock).mockImplementationOnce(() => { + return new Promise(resolve => setTimeout(() => resolve(mockSession), 1000)); + }); + + const connection = connectionService.create('connectorId', connectionDTO); + connection.getSession(); + connection.getSession(); + + jest.advanceTimersByTime(1000); + await jest.runAllTimersAsync(); + + expect((connectionLogger.trace as jest.Mock).mock.calls).toEqual([ + ['Getting session'], // First getSession + ['Creating new session'], // First getSession + ['Getting session'], // Second getSession + ['Session created'], // First getSession + ['Session already exists, returning existing session'] // Second getSession + ]); + }); + + it('should close a session', async () => { + const connection = connectionService.create('connectorId', connectionDTO); + await connection.getSession(); + // Mock having no reliant connectors + connectionService['findConnection']('connectorId', connectionDTO)!.reliantConnectorIds.clear(); + (connectionLogger.trace as jest.Mock).mockClear(); // Clear the trace calls + + await connection.close(); + + expect(closeFn).toHaveBeenCalled(); + expect(connection['_session']).toBeNull(); + expect(promiseResolve).toHaveBeenCalled(); + expect((connectionLogger.trace as jest.Mock).mock.calls).toEqual([['Closing session'], ['Session closed']]); + }); + + it('should not close an already closed session', async () => { + const connection = connectionService.create('connectorId', connectionDTO); + await connection.getSession(); + // Mock having no reliant connectors + connectionService['findConnection']('connectorId', connectionDTO)!.reliantConnectorIds.clear(); + + await connection.close(); + (connectionLogger.trace as jest.Mock).mockClear(); // Clear the trace calls + promiseResolve.mockClear(); // Clear the resolve calls + + await connection.close(); + + expect(closeFn).toHaveBeenCalledTimes(1); + expect(connection['_session']).toBeNull(); + expect(promiseResolve).not.toHaveBeenCalled(); + expect((connectionLogger.trace as jest.Mock).mock.calls).toEqual([['Closing session'], ['Session does not exist, nothing to close']]); + }); + + it('should not close a session with reliant connectors', async () => { + const connection = connectionService.create('connectorId', connectionDTO); + await connection.getSession(); + (connectionLogger.trace as jest.Mock).mockClear(); // Clear the trace calls + promiseResolve.mockClear(); // Clear the resolve calls + + await connection.close(); + + expect(closeFn).not.toHaveBeenCalled(); + expect(connection['_session']).not.toBeNull(); + expect(promiseResolve).not.toHaveBeenCalled(); + expect((connectionLogger.trace as jest.Mock).mock.calls).toEqual([ + ['Closing session'], + ['Session is still being used by other connectors, not closing the session'] + ]); + }); + + it('should throw an error when closing a session fails', async () => { + closeFn.mockImplementationOnce(() => { + throw new Error('Dummy error'); + }); + + const connection = connectionService.create('connectorId', connectionDTO); + await connection.getSession(); + // Mock having no reliant connectors + connectionService['findConnection']('connectorId', connectionDTO)!.reliantConnectorIds.clear(); + + await expect(connection.close()).rejects.toThrow('Dummy error'); + expect(promiseResolve).toHaveBeenCalled(); + expect((connectionLogger.error as jest.Mock).mock.calls).toEqual([['Session could not be closed: Dummy error']]); + }); + + it('should wait when two sessions are being closed at the same time', async () => { + // promiseFn will be called when promise is awaited + // and we assign that resolve function to promiseResolve + // so getSession will be able to resolve the promise + promiseFn.mockReturnValueOnce( + new Promise(resolve => { + promiseResolve.mockImplementationOnce(resolve); + }) + ); + + // mock the creation time of the session + closeFn.mockImplementation(() => { + return new Promise(resolve => setTimeout(() => resolve(), 1000)); + }); + + const connection = connectionService.create('connectorId', connectionDTO); + // manually setting the session, because using "await connection.getSession()" will cause weird behaviors + connection['_session'] = mockSession; + connection['reliantConnectorIds'].clear(); + + connection.close(); + connection.close(); + + jest.advanceTimersByTime(1000); + await jest.runAllTimersAsync(); + + expect((connectionLogger.trace as jest.Mock).mock.calls).toEqual([ + ['Closing session'], + ['Closing session'], + ['Session closed'], + ['Session does not exist, nothing to close'] + ]); + }); +}); diff --git a/backend/src/service/connection.service.ts b/backend/src/service/connection.service.ts new file mode 100644 index 0000000000..f2b72b61e0 --- /dev/null +++ b/backend/src/service/connection.service.ts @@ -0,0 +1,268 @@ +import { SouthSettings } from '../../../shared/model/south-settings.model'; +import { isDeepStrictEqual } from 'util'; +import DeferredPromise from './deferred-promise'; +import pino from 'pino'; + +/** + * DTO for creating a managed connection + */ +export type ManagedConnectionDTO = { + /** + * The type of the connection (usually the type of the connector eg. 'opcua') + */ + type: string; + /** + * Connector settings to distinguish between different connections + */ + connectorSettings: SouthSettings; + /** + * The function to call when creating a new session for the connection + * + * Note: The function has to bind the proper context beforehand + */ + createSessionFn: () => Promise; + /** + * Additional settings for the managed connection + */ + settings: ManagedConnectionSettings; +}; + +/** + * Settings for a managed connection + */ +export type ManagedConnectionSettings = { + /** + * The name of the function to call when closing the connection + */ + closeFnName: keyof TSession; + /** + * Whether the connection can be shared or not + */ + sharedConnection: boolean; +}; + +type ManagedConnectionMetadata = { + instance: ManagedConnection; + reliantConnectorIds: Set; + connectorSettings: SouthSettings; + sharedConnection: boolean; +}; + +/** + * Service to manage connections within OIBus + */ +export default class ConnectionService { + private readonly connections = new Map(); + private readonly logger: pino.Logger; + + constructor(logger: pino.Logger) { + this.logger = logger.child({ scopeType: 'internal', scopeName: 'ConnectionService' }); + } + + /** + * Creates a new connection and returns it + * + * If a connection with the same settings already exists and is shared, the existing connection will be returned + */ + create(connectorId: string, connectionDTO: ManagedConnectionDTO): ManagedConnection { + const foundConnection = this.findConnection(connectorId, connectionDTO); + + if (foundConnection) { + this.logger.trace(`Connection of type "${connectionDTO.type}" already exists, returning existing connection`); + foundConnection.reliantConnectorIds.add(connectorId); + return foundConnection.instance; + } + + const reliantConnectorIds = new Set([connectorId]); + const instance = new ManagedConnectionClass( + connectionDTO, + reliantConnectorIds, + this.logger.child({ scopeType: 'internal', scopeName: `ManagedConnection (${connectionDTO.type})` }) + ); + + // Initialize the array if it does not exist + if (!this.connections.has(connectionDTO.type)) { + this.connections.set(connectionDTO.type, []); + } + + // Add the new connection to the array + this.connections.get(connectionDTO.type)!.push({ + instance, + reliantConnectorIds, + connectorSettings: connectionDTO.connectorSettings, + sharedConnection: connectionDTO.settings.sharedConnection + }); + + this.logger.debug(`New connection of type "${connectionDTO.type}" created`); + + return instance; + } + + /** + * Removes the connection used by the given connector + * + * If the connection is shared, it will not be removed until all reliant connectors are removed + */ + async remove(type: string, connectorId: string) { + const connectionsOfType = this.connections.get(type); + + if (!connectionsOfType) { + this.logger.trace(`Connection type "${type}" not found. Nothing to remove`); + return; + } + + const connectionIndex = connectionsOfType.findIndex(c => c.reliantConnectorIds.has(connectorId)); + + if (connectionIndex === -1) { + this.logger.trace(`No connection found for connector "${connectorId}" of type "${type}". Nothing to remove`); + return; + } + + const connection = connectionsOfType[connectionIndex]; + connection.reliantConnectorIds.delete(connectorId); + this.logger.debug(`Connector "${connectorId}" removed from connection of type "${type}"`); + + if (connection.reliantConnectorIds.size === 0) { + this.logger.trace(`No reliant connectors left for connection of type "${type}", closing connection`); + await connection.instance.close(); + connectionsOfType.splice(connectionIndex, 1); + this.logger.debug(`Connection of type "${type}" removed`); + } + } + + /** + * Finds a connection that satisfies the given settings, + * or returns null if no such connection exists + */ + private findConnection(connectorId: string, connectionDTO: ManagedConnectionDTO): ManagedConnectionMetadata | null { + const { + type, + connectorSettings, + settings: { sharedConnection } + } = connectionDTO; + const connectionsOfType = this.connections.get(type); + + if (!connectionsOfType) { + return null; + } + + // If the connection is shared, it can be used by any connector, + // so we only need to check if the connector settings are the same + if (sharedConnection) { + const connectionMeta = connectionsOfType.find(c => c.sharedConnection && isDeepStrictEqual(c.connectorSettings, connectorSettings)); + return connectionMeta ?? null; + } + + // If the connection is not shared, it can only be used by the connector that created it, + // so we need to check if the connector settings are the same and if the connector is the creator + const connectionMeta = connectionsOfType.find( + c => !c.sharedConnection && c.reliantConnectorIds.has(connectorId) && isDeepStrictEqual(c.connectorSettings, connectorSettings) + ); + return connectionMeta ?? null; + } +} + +class ManagedConnectionClass { + protected _session: TSession | null = null; + protected createPromise$: DeferredPromise | undefined; + protected closePromise$: DeferredPromise | undefined; + + constructor( + private readonly dto: ManagedConnectionDTO, + private readonly reliantConnectorIds: Set, + private readonly logger: pino.Logger + ) {} + + /** + * Gets the session, creating it if it does not exist + * @throws Error if the session could not be created + */ + async getSession(): Promise { + this.logger.trace('Getting session'); + + if (this._session) { + this.logger.trace('Session already exists, returning existing session'); + return this._session; + } + + // Lock the creation of a new session + if (this.createPromise$) { + // Wait for any other session creation to finish + await this.createPromise$.promise; + + // Check if the session was created while waiting + if (this._session) { + this.logger.trace('Session already exists, returning existing session'); + return this._session; + } + } + + this.createPromise$ = new DeferredPromise(); + this.logger.trace('Creating new session'); + + try { + this._session = await this.dto.createSessionFn(); + if (!this._session) { + throw new Error('Session create function returned null'); + } + + this.logger.trace('Session created'); + return this._session; + } catch (error: any) { + this.logger.error(`Session could not be created: ${error.message}`); + throw error; + } finally { + // Unlock the creation of a new session + this.createPromise$.resolve(); + this.createPromise$ = undefined; + } + } + + /** + * Closes the session + * @throws Error if the session close function throws an error + */ + async close(): Promise { + this.logger.trace('Closing session'); + + if (!this._session) { + this.logger.trace('Session does not exist, nothing to close'); + return; + } + + if (this.reliantConnectorIds.size > 0) { + this.logger.trace('Session is still being used by other connectors, not closing the session'); + return; + } + + // Lock the closing of a session + if (this.closePromise$) { + // Wait for any other session closing to finish + await this.closePromise$.promise; + + // Check if the session was closed while waiting + if (!this._session) { + this.logger.trace('Session does not exist, nothing to close'); + return; + } + } + + this.closePromise$ = new DeferredPromise(); + + try { + await (this._session[this.dto.settings.closeFnName] as any).call(this._session); + this._session = null; + this.logger.trace('Session closed'); + } catch (error: any) { + this.logger.error(`Session could not be closed: ${error.message}`); + throw error; + } finally { + // Unlock the closing of a session + this.closePromise$.resolve(); + this.closePromise$ = undefined; + } + } +} + +// Only export the type, to prevent the class from being used directly +export type ManagedConnection = ManagedConnectionClass; diff --git a/backend/src/service/south.service.spec.ts b/backend/src/service/south.service.spec.ts index a8625acd82..4765c1efe2 100644 --- a/backend/src/service/south.service.spec.ts +++ b/backend/src/service/south.service.spec.ts @@ -5,10 +5,12 @@ import EncryptionService from './encryption.service'; import RepositoryService from './repository.service'; import pino from 'pino'; import SouthService from './south.service'; +import ConnectionService from './connection.service'; jest.mock('./encryption.service'); jest.mock('./south-cache.service'); jest.mock('./south-connector-metrics.service'); +jest.mock('./connection.service'); const encryptionService: EncryptionService = new EncryptionServiceMock('', ''); const repositoryRepository: RepositoryService = new RepositoryServiceMock('', ''); @@ -18,7 +20,8 @@ let service: SouthService; describe('south service', () => { beforeEach(() => { jest.clearAllMocks(); - service = new SouthService(encryptionService, repositoryRepository); + const connectionService = new ConnectionService(logger); + service = new SouthService(encryptionService, repositoryRepository, connectionService); }); it('should get a South connector settings', () => { diff --git a/backend/src/service/south.service.ts b/backend/src/service/south.service.ts index c2421808c6..75fb3274e7 100644 --- a/backend/src/service/south.service.ts +++ b/backend/src/service/south.service.ts @@ -41,6 +41,7 @@ import opchdaManifest from '../south/south-opchda/manifest'; import oledbManifest from '../south/south-oledb/manifest'; import piManifest from '../south/south-pi/manifest'; import sftpManifest from '../south/south-sftp/manifest'; +import ConnectionService from './connection.service'; const southList: Array<{ class: typeof SouthConnector; manifest: SouthConnectorManifest }> = [ { class: SouthFolderScanner, manifest: folderScannerManifest }, @@ -65,7 +66,8 @@ const southList: Array<{ class: typeof SouthConnector; manifest: South export default class SouthService { constructor( private readonly encryptionService: EncryptionService, - private readonly repositoryService: RepositoryService + private readonly repositoryService: RepositoryService, + private readonly _connectionService: ConnectionService ) {} /** @@ -82,7 +84,15 @@ export default class SouthService { if (!SouthConnector) { throw Error(`South connector of type ${settings.type} not installed`); } - return new SouthConnector.class(settings, addValues, addFile, this.encryptionService, this.repositoryService, logger, baseFolder); + return new SouthConnector.class(settings, + addValues, + addFile, + this.encryptionService, + this.repositoryService, + logger, + baseFolder, + this._connectionService + ); } /** diff --git a/backend/src/south/south-connector.ts b/backend/src/south/south-connector.ts index e89c08531a..58b398ac55 100644 --- a/backend/src/south/south-connector.ts +++ b/backend/src/south/south-connector.ts @@ -12,11 +12,12 @@ import DeferredPromise from '../service/deferred-promise'; import { DateTime } from 'luxon'; import SouthCacheService from '../service/south-cache.service'; import { PassThrough } from 'node:stream'; -import { QueriesFile, QueriesHistory, QueriesLastPoint, QueriesSubscription } from './south-interface'; +import { QueriesFile, QueriesHistory, QueriesLastPoint, QueriesSubscription, DelegatesConnection } from './south-interface'; import SouthConnectorMetricsService from '../service/south-connector-metrics.service'; import { SouthItemSettings, SouthSettings } from '../../../shared/model/south-settings.model'; import { OIBusDataValue } from '../../../shared/model/engine.model'; import path from 'node:path'; +import ConnectionService, { ManagedConnectionDTO } from '../service/connection.service'; /** * Class SouthConnector : provides general attributes and methods for south connectors. @@ -64,7 +65,9 @@ export default class SouthConnector = { + type: this.connector.type, + connectorSettings: this.connector.settings, + createSessionFn: this.createSession.bind(this), + settings: this.connectionSettings + }; + + this.connection = this.connectionService!.create(this.connector.id, connectionDTO); + } + this.logger.info(`South connector "${this.connector.name}" of type ${this.connector.type} started`); for (const cronJob of this.cronByScanModeIds.values()) { @@ -499,6 +514,12 @@ export default class SouthConnector { this.logger.warn('testConnection must be override'); } diff --git a/backend/src/south/south-interface.ts b/backend/src/south/south-interface.ts index 641bb597e2..6746b5041a 100644 --- a/backend/src/south/south-interface.ts +++ b/backend/src/south/south-interface.ts @@ -1,3 +1,4 @@ +import { ManagedConnection, ManagedConnectionSettings } from '../service/connection.service'; import { SouthConnectorItemDTO } from '../../../shared/model/south-connector.model'; import { Instant } from '../../../shared/model/types'; @@ -30,3 +31,21 @@ export interface QueriesSubscription { subscribe(items: Array): Promise; unsubscribe(items: Array): Promise; } + +export interface DelegatesConnection { + /** + * The connection that is being managed + */ + connection: ManagedConnection; + /** + * The settings for the connection + */ + connectionSettings: ManagedConnectionSettings; + /** + * The function to create a new session + * + * This function will be called by the connection service to create a new session using + * the context of the class that implements this interface + */ + createSession(): Promise; +} diff --git a/backend/src/south/south-opcua/manifest.ts b/backend/src/south/south-opcua/manifest.ts index 7804b89e99..2b4395d166 100644 --- a/backend/src/south/south-opcua/manifest.ts +++ b/backend/src/south/south-opcua/manifest.ts @@ -10,7 +10,8 @@ const manifest: SouthConnectorManifest = { lastPoint: true, lastFile: false, history: true, - forceMaxInstantPerItem: false + forceMaxInstantPerItem: false, + sharedConnection: true }, settings: [ { diff --git a/backend/src/south/south-opcua/south-opcua.spec.ts b/backend/src/south/south-opcua/south-opcua.spec.ts index 5f09929c0e..2dd83c7820 100644 --- a/backend/src/south/south-opcua/south-opcua.spec.ts +++ b/backend/src/south/south-opcua/south-opcua.spec.ts @@ -29,6 +29,7 @@ import { import { HistoryReadValueIdOptions } from 'node-opcua-types/source/_generated_opcua_types'; import Stream from 'node:stream'; import { createFolder } from '../../service/utils'; +import ConnectionService from '../../service/connection.service'; class CustomStream extends Stream { constructor() { @@ -40,7 +41,7 @@ class CustomStream extends Stream { // Mock node-opcua-client jest.mock('node-opcua-client', () => ({ - OPCUAClient: { createSession: jest.fn() }, + OPCUAClient: { createSession: jest.fn(() => ({})) }, ClientSubscription: { create: jest.fn() }, ClientMonitoredItem: { create: jest.fn() }, MessageSecurityMode: { None: 1 }, @@ -222,7 +223,17 @@ describe('SouthOPCUA', () => { jest.useFakeTimers().setSystemTime(new Date(nowDateString)); repositoryService.southConnectorRepository.getSouthConnector = jest.fn().mockReturnValue(configuration); - south = new SouthOPCUA(configuration, addValues, addFile, encryptionService, repositoryService, logger, 'baseFolder'); + const connectionService = new ConnectionService(logger); + south = new SouthOPCUA( + configuration, + addValues, + addFile, + encryptionService, + repositoryService, + logger, + 'baseFolder', + connectionService + ); }); it('should be properly initialized', async () => { @@ -231,6 +242,9 @@ describe('SouthOPCUA', () => { await south.start(); await south.start(); expect(south.initOpcuaCertificateFolders).toHaveBeenCalledTimes(2); + // createSession should not be called right after starting, because + // it will be eventually called when the first session is needed + expect(south.createSession).not.toHaveBeenCalled(); expect(south.connect).toHaveBeenCalledTimes(2); }); @@ -257,6 +271,8 @@ describe('SouthOPCUA', () => { await south.start(); + // retrieving a session to trigger the creation of a session + await south.connection.getSession(); expect(nodeOPCUAClient.OPCUAClient.createSession).toHaveBeenCalledWith( configuration.settings.url, expectedUserIdentity, @@ -267,27 +283,18 @@ describe('SouthOPCUA', () => { }); it('should properly manage connection error', async () => { - const setTimeoutSpy = jest.spyOn(global, 'setTimeout'); - const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout'); - - (nodeOPCUAClient.OPCUAClient.createSession as jest.Mock) - .mockImplementationOnce(() => { - throw new Error('connection error'); - }) - .mockImplementationOnce(() => { - throw new Error('connection error'); - }); + (nodeOPCUAClient.OPCUAClient.createSession as jest.Mock).mockImplementationOnce(() => { + throw new Error('connection error'); + }); await south.start(); - expect(setTimeoutSpy).toHaveBeenCalledWith(expect.any(Function), configuration.settings.retryInterval); - expect(logger.error).toHaveBeenCalledWith(`Error while connecting to the OPCUA server. ${new Error('connection error')}`); - - await south.connect(); - expect(clearTimeoutSpy).toHaveBeenCalledTimes(1); - - await south.disconnect(); - expect(clearTimeoutSpy).toHaveBeenCalledTimes(2); + try { + await south.connection.getSession(); + } catch (error) { + expect(logger.error).toHaveBeenCalledWith(`Error while connecting to the OPCUA server. ${new Error('connection error')}`); + await south.disconnect(); + } }); it('should properly manage history query', async () => { @@ -571,6 +578,10 @@ describe('SouthOPCUA', () => { south.addValues = jest.fn(); await south.start(); + // In order to trigger the call to the 'close' function, there needs to be a session created, + // because otherwise the 'close' function will not be called + // If this is not called, the 'disconnect' function will resolve right away, without calling the 'close' function + await south.connection.getSession(); south.disconnect(); await expect( south.historyQuery( @@ -665,6 +676,10 @@ describe('SouthOPCUA', () => { south.addValues = jest.fn(); await south.start(); + // In order to trigger the call to the 'close' function, there needs to be a session created, + // because otherwise the 'close' function will not be called + // If this is not called, the 'disconnect' function will resolve right away, without calling the 'close' function + await south.connection.getSession(); south.disconnect(); await expect(south.lastPointQuery(items)).rejects.toThrow('opcua read error'); const expectedItemsToRead = items.filter(item => item.settings.mode === 'DA'); @@ -835,7 +850,17 @@ describe('SouthOPCUA with basic auth', () => { jest.useFakeTimers(); repositoryService.southConnectorRepository.getSouthConnector = jest.fn().mockReturnValue(configuration); - south = new SouthOPCUA(configuration, addValues, addFile, encryptionService, repositoryService, logger, 'baseFolder'); + const connectionService = new ConnectionService(logger); + south = new SouthOPCUA( + configuration, + addValues, + addFile, + encryptionService, + repositoryService, + logger, + 'baseFolder', + connectionService + ); }); it('should properly connect to OPCUA server with basic auth', async () => { @@ -860,7 +885,7 @@ describe('SouthOPCUA with basic auth', () => { password: configuration.settings.authentication.password }; - await south.connect(); + await south.createSession(); expect(nodeOPCUAClient.OPCUAClient.createSession).toHaveBeenCalledWith( configuration.settings.url, @@ -906,7 +931,7 @@ describe('SouthOPCUA with certificate', () => { jest.useFakeTimers(); repositoryService.southConnectorRepository.getSouthConnector = jest.fn().mockReturnValue(configuration); - south = new SouthOPCUA(configuration, addValues, addFile, encryptionService, repositoryService, logger, 'baseFolder'); + south = new SouthOPCUA(configuration, addValues, addFile, encryptionService, repositoryService, logger, 'baseFolder', connectionService); }); it('should properly connect to OPCUA server with basic auth', async () => { @@ -936,7 +961,7 @@ describe('SouthOPCUA with certificate', () => { privateKey: Buffer.from('key content').toString('utf8') }; - await south.connect(); + await south.createSession(); expect(nodeOPCUAClient.OPCUAClient.createSession).toHaveBeenCalledWith( configuration.settings.url, @@ -1094,7 +1119,17 @@ describe('SouthOPCUA test connection', () => { jest.useFakeTimers().setSystemTime(); repositoryService.southConnectorRepository.getSouthConnector = jest.fn().mockReturnValue(configuration); - south = new SouthOPCUA(configuration, addValues, addFile, encryptionService, repositoryService, logger, 'baseFolder'); + const connectionService = new ConnectionService(logger); + south = new SouthOPCUA( + configuration, + addValues, + addFile, + encryptionService, + repositoryService, + logger, + 'baseFolder', + connectionService + ); }); it('Connection settings are correct', async () => { @@ -1224,3 +1259,74 @@ describe('SouthOPCUA test connection', () => { await expect(south.testConnection()).rejects.toThrow(new Error('Unknown error')); }); }); + +describe('SouthOPCUA with shared connection', () => { + const connector: SouthConnectorDTO = { + id: 'southId', + name: 'south', + type: 'test', + description: 'my test connector', + enabled: true, + sharedConnection: true, + history: { + maxInstantPerItem: true, + maxReadInterval: 3600, + readDelay: 0, + overlap: 0 + }, + settings: { + url: 'opc.tcp://localhost:666/OPCUA/SimulationServer', + retryInterval: 10000, + readTimeout: 15000, + authentication: { + type: 'none', + username: null, + password: null, + certFilePath: null, + keyFilePath: null + } as unknown as SouthOPCUASettingsAuthentication, + securityMode: 'None', + securityPolicy: 'None', + keepSessionAlive: false + } + }; + + beforeEach(async () => { + jest.clearAllMocks(); + jest.useFakeTimers().setSystemTime(); + + const connectionService = new ConnectionService(logger); + south = new SouthOPCUA( + connector, + items, + addValues, + addFile, + encryptionService, + repositoryService, + logger, + 'baseFolder', + connectionService + ); + }); + + it('should initialize connectionSettings', () => { + // Initially sharedConnection is true + expect(south.connectionSettings).toEqual({ + closeFnName: 'close', + sharedConnection: true + }); + }); + + it('should properly name the connection', async () => { + const createSessionConfigsSpy = jest.spyOn(south, 'createSessionConfigs'); + + await south.createSession(); + + expect(createSessionConfigsSpy).toHaveBeenCalledWith( + connector.settings, + south['clientCertificateManager'], + encryptionService, + 'Shared session' + ); + }); +}); diff --git a/backend/src/south/south-opcua/south-opcua.ts b/backend/src/south/south-opcua/south-opcua.ts index 7f3c63eea2..4c89b00e0b 100644 --- a/backend/src/south/south-opcua/south-opcua.ts +++ b/backend/src/south/south-opcua/south-opcua.ts @@ -30,13 +30,14 @@ import { OPCUAClientOptions } from 'node-opcua-client/source/opcua_client'; import { DateTime } from 'luxon'; import fs from 'node:fs/promises'; import path from 'node:path'; -import { QueriesHistory, QueriesLastPoint, QueriesSubscription } from '../south-interface'; +import { QueriesHistory, QueriesLastPoint, QueriesSubscription, DelegatesConnection } from '../south-interface'; import { SouthOPCUAItemSettings, SouthOPCUASettings } from '../../../../shared/model/south-settings.model'; import { randomUUID } from 'crypto'; import { HistoryReadValueIdOptions } from 'node-opcua-types/source/_generated_opcua_types'; import { createFolder } from '../../service/utils'; import { OPCUACertificateManager } from 'node-opcua-certificate-manager'; import { OIBusDataValue } from '../../../../shared/model/engine.model'; +import ConnectionService, { ManagedConnection, ManagedConnectionSettings } from '../../service/connection.service'; export const MAX_NUMBER_OF_NODE_TO_LOG = 10; export const NUM_VALUES_PER_NODE = 1000; @@ -46,16 +47,17 @@ export const NUM_VALUES_PER_NODE = 1000; */ export default class SouthOPCUA extends SouthConnector - implements QueriesHistory, QueriesLastPoint, QueriesSubscription + implements QueriesHistory, QueriesLastPoint, QueriesSubscription, DelegatesConnection { static type = manifest.id; private clientCertificateManager: OPCUACertificateManager | null = null; - private session: ClientSession | null = null; private reconnectTimeout: NodeJS.Timeout | null = null; private disconnecting = false; private monitoredItems = new Map(); private subscription: ClientSubscription | null = null; + connectionSettings: ManagedConnectionSettings; + connection!: ManagedConnection; constructor( connector: SouthConnectorDTO, @@ -64,9 +66,25 @@ export default class SouthOPCUA encryptionService: EncryptionService, repositoryService: RepositoryService, logger: pino.Logger, - baseFolder: string + baseFolder: string, + connectionService: ConnectionService ) { - super(connector, engineAddValuesCallback, engineAddFileCallback, encryptionService, repositoryService, logger, baseFolder); + super( + connector, + + engineAddValuesCallback, + engineAddFileCallback, + encryptionService, + repositoryService, + logger, + baseFolder, + connectionService + ); + + this.connectionSettings = { + closeFnName: 'close', + sharedConnection: connector.sharedConnection ?? false + }; } override async start(dataStream = true): Promise { @@ -83,7 +101,7 @@ export default class SouthOPCUA await super.start(dataStream); } - override async connect(): Promise { + override async createSession(): Promise { await this.session?.close(); // close the session if it already exists if (this.reconnectTimeout) { @@ -91,23 +109,22 @@ export default class SouthOPCUA this.reconnectTimeout = null; } try { + const clientName = this.connectionSettings.sharedConnection ? 'Shared session' : this.connector.id; + const { options, userIdentity } = await this.createSessionConfigs( this.connector.settings, this.clientCertificateManager!, this.encryptionService, - this.connector.id // the id of the connector + clientName ); this.logger.debug(`Connecting to OPCUA on ${this.connector.settings.url}`); this.session = await OPCUAClient.createSession(this.connector.settings.url, userIdentity, options); this.logger.info(`OPCUA ${this.connector.name} connected`); - await super.connect(); + return session; } catch (error) { this.logger.error(`Error while connecting to the OPCUA server. ${error}`); - await this.disconnect(); - if (!this.disconnecting && this.connector.enabled && !this.reconnectTimeout) { - this.reconnectTimeout = setTimeout(this.connect.bind(this), this.connector.settings.retryInterval); - } + return null; } } @@ -190,12 +207,17 @@ export default class SouthOPCUA startTime: Instant, endTime: Instant ): Promise { + // Try to get a session + let session; + try { + session = await this.connection.getSession(); + } catch (error) { + this.logger.error('OPCUA session not set. The connector cannot read values'); + return null; + } + try { let maxTimestamp: number | null = null; - if (!this.session) { - this.logger.error('OPCUA session not set. The connector cannot read values'); - return startTime; - } const itemsByAggregates = new Map>>(); items.forEach(item => { @@ -239,7 +261,7 @@ export default class SouthOPCUA request.requestHeader.timeoutHint = this.connector.settings.readTimeout; // @ts-ignore - const response = await this.session.performMessageTransaction(request); + const response = await session.performMessageTransaction(request); if (response.responseHeader.serviceResult.isNot(StatusCodes.Good)) { this.logger.error(`Error while reading history: ${response.responseHeader.serviceResult.description}`); } @@ -314,7 +336,7 @@ export default class SouthOPCUA })); // @ts-ignore - const response = await this.session.performMessageTransaction( + const response = await session.performMessageTransaction( this.getHistoryReadRequest(startTime, endTime, aggregate, resampling, nodesToRead) ); @@ -354,9 +376,6 @@ export default class SouthOPCUA await this.subscription?.terminate(); this.subscription = null; - await this.session?.close(); - this.session = null; - await super.disconnect(); this.disconnecting = false; } @@ -490,10 +509,15 @@ export default class SouthOPCUA } async lastPointQuery(items: Array>): Promise { - if (!this.session) { + // Try to get a session + let session; + try { + session = await this.connection.getSession(); + } catch (error) { this.logger.error('OPCUA session not set. The connector cannot read values'); return; } + const itemsToRead = items.filter(item => item.settings.mode === 'DA'); if (itemsToRead.length === 0) { return; @@ -509,10 +533,9 @@ export default class SouthOPCUA } const startRequest = DateTime.now().toMillis(); - const dataValues = await this.session.read(itemsToRead.map(item => ({ nodeId: item.settings.nodeId }))); + const dataValues = await session.read(itemsToRead.map(item => ({ nodeId: item.settings.nodeId }))); const requestDuration = DateTime.now().toMillis() - startRequest; this.logger.debug(`Found ${dataValues.length} results for ${itemsToRead.length} items (DA mode) in ${requestDuration} ms`); - if (dataValues.length !== itemsToRead.length) { this.logger.error( `Received ${dataValues.length} node results, requested ${itemsToRead.length} nodes. Request done in ${requestDuration} ms` @@ -542,12 +565,18 @@ export default class SouthOPCUA if (!items.length) { return; } - if (!this.session) { + + // Try to get a session + let session; + try { + session = await this.connection.getSession(); + } catch (error) { this.logger.error('OPCUA client could not subscribe to items: session not set'); return; } + if (!this.subscription) { - this.subscription = ClientSubscription.create(this.session, { + this.subscription = ClientSubscription.create(session, { requestedPublishingInterval: 150, requestedLifetimeCount: 10 * 60 * 10, requestedMaxKeepAliveCount: 10, diff --git a/backend/src/tests/__mocks__/logger.mock.ts b/backend/src/tests/__mocks__/logger.mock.ts index 0686ca4cc6..40889ad179 100644 --- a/backend/src/tests/__mocks__/logger.mock.ts +++ b/backend/src/tests/__mocks__/logger.mock.ts @@ -1,7 +1,7 @@ /** * Create a mock object for Pino logger */ -export default jest.fn().mockImplementation(() => { +const mockFn = jest.fn().mockImplementation(() => { return { trace: jest.fn(), debug: jest.fn(), @@ -9,6 +9,8 @@ export default jest.fn().mockImplementation(() => { warn: jest.fn(), error: jest.fn(), fatal: jest.fn(), - child: jest.fn() + child: jest.fn().mockImplementation(mockFn) }; }); + +export default mockFn; diff --git a/shared/model/south-connector.model.ts b/shared/model/south-connector.model.ts index 466945ad68..c82bc1ef00 100644 --- a/shared/model/south-connector.model.ts +++ b/shared/model/south-connector.model.ts @@ -30,6 +30,7 @@ export interface SouthConnectorDTO extends BaseEn type: string; description: string; enabled: boolean; + sharedConnection?: boolean; settings: T; history: SouthConnectorHistorySettings; } @@ -104,6 +105,7 @@ export interface SouthConnectorManifest { lastFile: boolean; history: boolean; forceMaxInstantPerItem: boolean; + sharedConnection?: boolean; }; settings: Array; items: SouthConnectorItemManifest;