diff --git a/lib/src/binding_coap/coap_client.dart b/lib/src/binding_coap/coap_client.dart index 2401f80a..1bfd4b3d 100644 --- a/lib/src/binding_coap/coap_client.dart +++ b/lib/src/binding_coap/coap_client.dart @@ -5,10 +5,10 @@ // SPDX-License-Identifier: BSD-3-Clause import 'dart:async'; -import 'dart:convert'; import 'package:coap/coap.dart' as coap; import 'package:coap/config/coap_config_default.dart'; +import 'package:typed_data/typed_buffers.dart'; import '../core/content.dart'; import '../core/credentials/psk_credentials.dart'; @@ -18,8 +18,6 @@ import '../core/security_provider.dart'; import '../core/thing_discovery.dart'; import '../definitions/form.dart'; import '../definitions/operation_type.dart'; -import '../definitions/security/psk_security_scheme.dart'; -import '../definitions/thing_description.dart'; import '../scripting_api/subscription.dart'; import 'coap_binding_exception.dart'; import 'coap_config.dart'; @@ -41,7 +39,7 @@ class _InternalCoapConfig extends CoapConfigDefault { return; } - if (_usesPskScheme(form) && coapConfig.useTinyDtls) { + if (form.usesPskScheme && coapConfig.useTinyDtls) { dtlsBackend = coap.DtlsBackend.TinyDtls; } else if (coapConfig.useOpenSsl) { dtlsBackend = coap.DtlsBackend.OpenSsl; @@ -59,176 +57,33 @@ class _InternalCoapConfig extends CoapConfigDefault { bool get _dtlsNeeded => _form?.resolvedHref.scheme == 'coaps'; } -bool _usesPskScheme(Form form) { - return form.securityDefinitions.whereType().isNotEmpty; -} - -class _CoapRequest { - /// Creates a new [_CoapRequest] - _CoapRequest( - this._form, - this._requestMethod, - CoapConfig _coapConfig, - ClientSecurityProvider? _clientSecurityProvider, [ - this._subprotocol, - ]) : _coapClient = coap.CoapClient( - _form.resolvedHref, - _InternalCoapConfig(_coapConfig, _form), - pskCredentialsCallback: - _createPskCallback(_form, _clientSecurityProvider), - ), - _requestUri = _form.resolvedHref; - - /// The [CoapClient] which sends out request messages. - final coap.CoapClient _coapClient; - - /// The [Uri] describing the endpoint for the request. - final Uri _requestUri; - - /// A reference to the [Form] that is the basis for this request. - final Form _form; - - /// The [CoapRequestMethod] used in the request message (e. g. - /// [CoapRequestMethod.get] or [CoapRequestMethod.post]). - final CoapRequestMethod _requestMethod; - - /// The subprotocol that should be used for requests. - final CoapSubprotocol? _subprotocol; +coap.PskCredentialsCallback? _createPskCallback( + Uri uri, + Form? form, + ClientSecurityProvider? clientSecurityProvider, +) { + final usesPskScheme = form?.usesPskScheme ?? false; + final pskCredentialsCallback = clientSecurityProvider?.pskCredentialsCallback; - // TODO(JKRhb): blockwise parameters cannot be handled at the moment due to - // limitations of the CoAP library - Future _makeRequest( - String? payload, { - int format = coap.CoapMediaType.textPlain, - int accept = coap.CoapMediaType.textPlain, - int? block1Size, - int? block2Size, - }) async { - final coap.CoapResponse? response; - switch (_requestMethod) { - case CoapRequestMethod.get: - response = await _coapClient.get( - _requestUri.path, - earlyBlock2Negotiation: true, - accept: accept, - ); - break; - case CoapRequestMethod.post: - response = await _coapClient.post( - _requestUri.path, - payload: payload ?? '', - format: format, - ); - break; - case CoapRequestMethod.put: - response = await _coapClient.put( - _requestUri.path, - payload: payload ?? '', - format: format, - ); - break; - case CoapRequestMethod.delete: - response = await _coapClient.delete(_requestUri.path); - break; - default: - throw UnimplementedError( - 'CoAP request method $_requestMethod is not supported yet.', - ); - } - _coapClient.close(); - if (response == null) { - throw CoapBindingException('Sending CoAP request to $_requestUri failed'); - } - return response; + if (!usesPskScheme || pskCredentialsCallback == null) { + return null; } - static coap.PskCredentialsCallback? _createPskCallback( - Form form, - ClientSecurityProvider? clientSecurityProvider, - ) { - final pskCredentialsCallback = - clientSecurityProvider?.pskCredentialsCallback; - if (!_usesPskScheme(form) || pskCredentialsCallback == null) { - return null; - } - - return (identityHint) { - final PskCredentials? pskCredentials = - pskCredentialsCallback(form.resolvedHref, form, identityHint); + return (identityHint) { + final PskCredentials? pskCredentials = + pskCredentialsCallback(uri, form, identityHint); - if (pskCredentials == null) { - throw CoapBindingException( - 'Missing PSK credentials for CoAPS request!', - ); - } - - return coap.PskCredentials( - identity: pskCredentials.identity, - preSharedKey: pskCredentials.preSharedKey, + if (pskCredentials == null) { + throw CoapBindingException( + 'Missing PSK credentials for CoAPS request!', ); - }; - } - - // TODO(JKRhb): Revisit name of this method - Future resolveInteraction(String? payload) async { - final response = await _makeRequest( - payload, - format: _form.format, - accept: _form.accept, - block1Size: _form.block1Size, - block2Size: _form.block2Size, - ); - final type = coap.CoapMediaType.name(response.contentFormat); - final body = _getPayloadFromResponse(response); - return Content(type, body); - } - - Future startObservation( - void Function(Content content) next, - void Function() complete, - ) async { - void handleResponse(coap.CoapResponse? response) { - if (response == null) { - return; - } - - final type = coap.CoapMediaType.name(response.contentFormat); - final body = _getPayloadFromResponse(response); - final content = Content(type, body); - next(content); - } - - final requestContentFormat = _form.format; - - if (_subprotocol == CoapSubprotocol.observe) { - final request = _requestMethod.generateRequest() - ..contentFormat = requestContentFormat; - final observeClientRelation = await _coapClient.observe(request); - observeClientRelation.stream.listen((event) { - handleResponse(event.resp); - }); - return CoapSubscription(_coapClient, observeClientRelation, complete); } - final response = await _makeRequest(null, format: requestContentFormat); - handleResponse(response); - return CoapSubscription(_coapClient, null, complete); - } - - /// Aborts the request and closes the client. - /// - // TODO(JKRhb): Check if this is actually enough - void abort() { - _coapClient.close(); - } -} - -Stream> _getPayloadFromResponse(coap.CoapResponse response) { - if (response.payload != null) { - return Stream.value(response.payload!); - } else { - return const Stream.empty(); - } + return coap.PskCredentials( + identity: pskCredentials.identity, + preSharedKey: pskCredentials.preSharedKey, + ); + }; } /// A [ProtocolClient] for the Constrained Application Protocol (CoAP). @@ -236,52 +91,110 @@ class CoapClient extends ProtocolClient { /// Creates a new [CoapClient] based on an optional [CoapConfig]. CoapClient([this._coapConfig, this._clientSecurityProvider]); - final List<_CoapRequest> _pendingRequests = []; final CoapConfig? _coapConfig; final ClientSecurityProvider? _clientSecurityProvider; - _CoapRequest _createRequest(Form form, OperationType operationType) { + Future _createRequest( + int code, + Uri uri, { + Content? content, + int? format, + int? accept, + int? block1Size, + int? block2Size, + }) async { + if (!coap.CoapCode.isRequest(code)) { + throw CoapBindingException( + '$code is not a valid request method code.', + ); + } + + final payload = Uint8Buffer(); + if (content != null) { + payload.addAll((await content.byteBuffer).asUint8List()); + } + + return coap.CoapRequest(code) + ..payload = payload + ..uriPath = uri.path + ..accept = accept ?? coap.CoapMediaType.undefined + ..contentFormat = format ?? coap.CoapMediaType.undefined; + } + + Future _sendRequestFromForm( + Form form, + OperationType operationType, [ + Content? content, + ]) async { final requestMethod = CoapRequestMethod.fromForm(form) ?? operationType.requestMethod; - final CoapSubprotocol? subprotocol = - form.coapSubprotocol ?? operationType.subprotocol; - final coapConfig = _coapConfig ?? CoapConfig(); - final request = _CoapRequest( - form, - requestMethod, - coapConfig, - _clientSecurityProvider, - subprotocol, + final code = requestMethod.code; + + return _sendRequest( + form.resolvedHref, + code, + content: content, + format: form.format, + accept: form.accept, + form: form, ); - _pendingRequests.add(request); - return request; } - Future _getInputFromContent(Content content) async { - final inputBuffer = await content.byteBuffer; - return utf8.decoder - .convert(inputBuffer.asUint8List().toList(growable: false)); + // TODO(JKRhb): blockwise parameters cannot be handled at the moment due to + // limitations of the CoAP library + Future _sendRequest( + Uri uri, + int method, { + Content? content, + required Form? form, + int? format, + int? accept, + int? block1Size, + int? block2Size, + coap.CoapMulticastResponseHandler? multicastResponseHandler, + }) async { + final coapClient = coap.CoapClient( + uri, + _InternalCoapConfig(_coapConfig ?? CoapConfig(), form), + pskCredentialsCallback: + _createPskCallback(uri, form, _clientSecurityProvider), + ); + + final request = await _createRequest( + method, + uri, + content: content, + format: format, + accept: accept, + block1Size: block1Size, + block2Size: block2Size, + ); + + final response = await coapClient.send( + request, + onMulticastResponse: multicastResponseHandler, + ); + coapClient.close(); + if (response == null) { + throw CoapBindingException('Sending CoAP request to $uri failed'); + } + return response.content; } @override Future readResource(Form form) async { - final request = _createRequest(form, OperationType.readproperty); - return request.resolveInteraction(null); + return _sendRequestFromForm(form, OperationType.readproperty); } @override Future writeResource(Form form, Content content) async { - final request = _createRequest(form, OperationType.writeproperty); - final input = await _getInputFromContent(content); - await request.resolveInteraction(input); + await _sendRequestFromForm(form, OperationType.writeproperty, content); } @override Future invokeResource(Form form, Content content) async { - final request = _createRequest(form, OperationType.invokeaction); - final input = await _getInputFromContent(content); - return request.resolveInteraction(input); + return _sendRequestFromForm(form, OperationType.invokeaction, content); } @override @@ -300,51 +213,71 @@ class CoapClient extends ProtocolClient { ), ); - final request = _createRequest(form, operationType); - - return request.startObservation(next, complete); + return _startObservation(form, operationType, next, complete); } - @override - Future start() async { - // Do nothing - } + Future _startObservation( + Form form, + OperationType operationType, + void Function(Content content) next, + void Function() complete, + ) async { + void handleResponse(coap.CoapResponse? response) { + if (response == null) { + return; + } - @override - Future stop() async { - for (final request in _pendingRequests) { - request.abort(); + next(response.content); } - _pendingRequests.clear(); - } - ThingDescription _handleDiscoveryResponse( - coap.CoapResponse? response, - Uri uri, - ) { - final rawThingDescription = response?.payloadString; + final requestMethod = + (CoapRequestMethod.fromForm(form) ?? CoapRequestMethod.get).code; - if (response == null) { - throw DiscoveryException('Direct CoAP Discovery from $uri failed!'); + final request = await _createRequest( + requestMethod, + form.resolvedHref, + format: form.format, + accept: form.accept, + ); + + final subprotocol = form.coapSubprotocol ?? operationType.subprotocol; + + final coapClient = coap.CoapClient( + form.resolvedHref, + _InternalCoapConfig(_coapConfig ?? CoapConfig(), form), + ); + + if (subprotocol == CoapSubprotocol.observe) { + final observeClientRelation = await coapClient.observe(request); + observeClientRelation.stream.listen((event) { + handleResponse(event.resp); + }); + return CoapSubscription(coapClient, observeClientRelation, complete); } - return ThingDescription(rawThingDescription); + final response = await coapClient.send(request); + handleResponse(response); + return CoapSubscription(coapClient, null, complete); + } + + @override + Future start() async { + // Do nothing } - Stream _discoverFromMulticast( + @override + Future stop() async {} + + Stream _discoverFromMulticast( coap.CoapClient client, Uri uri, ) async* { // TODO(JKRhb): This method currently does not work with block-wise transfer // due to a bug in the CoAP library. - final streamController = StreamController(); - final request = coap.CoapRequest(coap.CoapCode.get, confirmable: false) - ..uriPath = uri.path - ..accept = coap.CoapMediaType.applicationTdJson; + final streamController = StreamController(); final multicastResponseHandler = coap.CoapMulticastResponseHandler( (data) { - final thingDescription = _handleDiscoveryResponse(data.resp, uri); - streamController.add(thingDescription); + streamController.add(data.resp.content); }, onError: streamController.addError, onDone: () async { @@ -352,35 +285,31 @@ class CoapClient extends ProtocolClient { }, ); - final response = - client.send(request, onMulticastResponse: multicastResponseHandler); - unawaited(response); - unawaited( - Future.delayed( - _coapConfig?.multicastDiscoveryTimeout ?? const Duration(seconds: 20), - () { - client - ..cancel(request) - ..close(); - }), + final content = _sendRequest( + uri, + coap.CoapCode.get, + form: null, + accept: coap.CoapMediaType.applicationTdJson, + multicastResponseHandler: multicastResponseHandler, ); + unawaited(content); yield* streamController.stream; } - Stream _discoverFromUnicast( + Stream _discoverFromUnicast( coap.CoapClient client, Uri uri, ) async* { - final response = await client.get( - uri.path, + yield await _sendRequest( + uri, + coap.CoapCode.get, + form: null, accept: coap.CoapMediaType.applicationTdJson, ); - client.close(); - yield _handleDiscoveryResponse(response, uri); } @override - Stream discoverDirectly( + Stream discoverDirectly( Uri uri, { bool disableMulticast = false, }) async* { @@ -396,30 +325,9 @@ class CoapClient extends ProtocolClient { } } - @override - Stream discoverWithCoreLinkFormat(Uri uri) async* { - final discoveryUri = createCoreLinkFormatDiscoveryUri(uri); - final coapConfig = _coapConfig ?? CoapConfig(); - - final coapClient = - coap.CoapClient(discoveryUri, _InternalCoapConfig(coapConfig, null)); - - // TODO(JKRhb): Multicast could be supported here as well. - final request = coap.CoapRequest(coap.CoapCode.get) - ..uriPath = discoveryUri.path - ..accept = coap.CoapMediaType.applicationLinkFormat; - final response = await coapClient.send(request); - - coapClient.close(); - - if (response == null) { - throw DiscoveryException( - 'Got no CoRE Link Format Discovery response for $uri', - ); - } - - final actualContentFormat = response.contentFormat; - const expectedContentFormat = coap.CoapMediaType.applicationLinkFormat; + Content _handleCoreLinkFormatContent(Content content) { + final actualContentFormat = content.type; + const expectedContentFormat = 'application/link-format'; if (actualContentFormat != expectedContentFormat) { throw DiscoveryException( @@ -429,14 +337,41 @@ class CoapClient extends ProtocolClient { ); } - final payloadString = response.payloadString; + return content; + } - if (payloadString == null) { - throw DiscoveryException( - 'Received empty payload for CoRE Link Format Discovery from $uri', + @override + Stream discoverWithCoreLinkFormat(Uri uri) async* { + final discoveryUri = createCoreLinkFormatDiscoveryUri(uri); + coap.CoapMulticastResponseHandler? multicastResponseHandler; + final streamController = StreamController(); + + if (uri.isMulticastAddress) { + multicastResponseHandler = coap.CoapMulticastResponseHandler( + (data) { + final handledContent = + _handleCoreLinkFormatContent(data.resp.content); + streamController.add(handledContent); + }, + onError: streamController.addError, + onDone: () async { + await streamController.close(); + }, ); } - yield* Stream.fromIterable(parseCoreLinkFormat(payloadString, uri)); + final content = await _sendRequest( + discoveryUri, + coap.CoapCode.get, + form: null, + accept: coap.CoapMediaType.applicationLinkFormat, + multicastResponseHandler: multicastResponseHandler, + ); + + if (uri.isMulticastAddress) { + yield* streamController.stream; + } else { + yield _handleCoreLinkFormatContent(content); + } } } diff --git a/lib/src/binding_coap/coap_definitions.dart b/lib/src/binding_coap/coap_definitions.dart index 124c884c..62ad7c6b 100644 --- a/lib/src/binding_coap/coap_definitions.dart +++ b/lib/src/binding_coap/coap_definitions.dart @@ -4,6 +4,8 @@ // // SPDX-License-Identifier: BSD-3-Clause +import 'dart:collection'; + import 'package:coap/coap.dart'; import 'package:curie/curie.dart'; @@ -16,70 +18,55 @@ final coapPrefixMapping = /// Defines the available CoAP request methods. enum CoapRequestMethod { /// Corresponds with the GET request method. - get, + get(CoapCode.get, 'GET'), /// Corresponds with the PUT request method. - put, + put(CoapCode.put, 'PUT'), /// Corresponds with the POST request method. - post, + post(CoapCode.post, 'POST'), /// Corresponds with the DELETE request method. - delete, + delete(CoapCode.delete, 'DELETE'), /// Corresponds with the FETCH request method. - fetch, + fetch(CoapCode.notSet), /// Corresponds with the PATCH request method. - patch, + patch(CoapCode.notSet), /// Corresponds with the iPATCH request method. - ipatch; - - /// Generate a new [CoapRequest] based on this [CoapRequestMethod]. - CoapRequest generateRequest() { - switch (this) { - case CoapRequestMethod.get: - return CoapRequest.newGet(); - case CoapRequestMethod.post: - return CoapRequest.newPost(); - case CoapRequestMethod.put: - return CoapRequest.newPut(); - case CoapRequestMethod.delete: - return CoapRequest.newDelete(); - default: - throw UnimplementedError(); - } - } + ipatch(CoapCode.notSet); - static CoapRequestMethod? _fromString(String stringValue) { - switch (stringValue) { - case 'POST': - return CoapRequestMethod.post; - case 'PUT': - return CoapRequestMethod.put; - case 'DELETE': - return CoapRequestMethod.delete; - case 'GET': - return CoapRequestMethod.get; - default: - return null; - } - } + /// Constructor + const CoapRequestMethod(this.code, [this.stringValue]); + + /// The numeric code of this [CoapRequestMethod]. + final int code; + + /// The string value of this request method value (e.g., `GET` or `POST`). + final String? stringValue; + + static final _registry = HashMap.fromEntries( + values + .where((element) => element.stringValue != null) + .map((e) => MapEntry(e.stringValue, e)), + ); + + static CoapRequestMethod? _fromString(String stringValue) => + _registry[stringValue]; /// Determines the [CoapRequestMethod] to use based on a given [form]. static CoapRequestMethod? fromForm(Form form) { final curieString = coapPrefixMapping.expandCurie(Curie(reference: 'method')); final dynamic formDefinition = form.additionalFields[curieString]; - if (formDefinition is String) { - final requestMethod = CoapRequestMethod._fromString(formDefinition); - if (requestMethod != null) { - return requestMethod; - } + + if (formDefinition is! String) { + return null; } - return null; + return CoapRequestMethod._fromString(formDefinition); } } diff --git a/lib/src/binding_coap/coap_extensions.dart b/lib/src/binding_coap/coap_extensions.dart index 51f3355d..506779ff 100644 --- a/lib/src/binding_coap/coap_extensions.dart +++ b/lib/src/binding_coap/coap_extensions.dart @@ -2,8 +2,10 @@ import 'dart:io'; import 'package:coap/coap.dart'; +import '../core/content.dart'; import '../definitions/form.dart'; import '../definitions/operation_type.dart'; +import '../definitions/security/psk_security_scheme.dart'; import 'coap_definitions.dart'; const _validBlockwiseValues = [16, 32, 64, 128, 256, 512, 1024]; @@ -19,6 +21,10 @@ extension InternetAddressMethods on Uri { /// CoAP-specific extensions for the [Form] class. extension CoapFormExtension on Form { + /// Determines if this [Form] supports the [PskSecurityScheme]. + bool get usesPskScheme => + securityDefinitions.whereType().isNotEmpty; + /// Get the [CoapSubprotocol] for this [Form], if one is set. CoapSubprotocol? get coapSubprotocol { if (subprotocol == coapPrefixMapping.expandCurieString('observe')) { @@ -119,3 +125,29 @@ extension OperationTypeExtension on OperationType { return null; } } + +/// Extension for easily extracting the [content] from a [CoapResponse]. +extension ResponseExtension on CoapResponse { + Stream> get _payloadStream { + final payload = this.payload; + if (payload != null) { + return Stream.value(payload); + } else { + return const Stream.empty(); + } + } + + String get _contentType { + // FIXME: Replace once new CoAP library version has been released. + if (contentFormat == CoapMediaType.undefined) { + return 'application/json'; + } + + return CoapMediaType.name(contentFormat); + } + + /// Extract the [Content] of this [CoapResponse]. + Content get content { + return Content(_contentType, _payloadStream); + } +} diff --git a/lib/src/binding_http/http_client.dart b/lib/src/binding_http/http_client.dart index c397a55d..fde7c78c 100644 --- a/lib/src/binding_http/http_client.dart +++ b/lib/src/binding_http/http_client.dart @@ -19,7 +19,6 @@ import '../definitions/form.dart'; import '../definitions/operation_type.dart'; import '../definitions/security/basic_security_scheme.dart'; import '../definitions/security/bearer_security_scheme.dart'; -import '../definitions/thing_description.dart'; import '../scripting_api/subscription.dart'; import 'http_request_method.dart'; import 'http_security_exception.dart'; @@ -291,35 +290,43 @@ class HttpClient extends ProtocolClient { throw UnimplementedError(); } - Future _sendDiscoveryRequest(Request request) async { + Future _sendDiscoveryRequest( + Request request, { + required String acceptHeaderValue, + }) async { + request.headers['Accept'] = acceptHeaderValue; final response = await _client.send(request); final finalResponse = await _handleResponse(request, response); - return utf8.decode(await finalResponse.stream.toBytes()); + return Content( + response.headers['Content-Type'] ?? acceptHeaderValue, + finalResponse.stream, + ); } @override - Stream discoverDirectly( + Stream discoverDirectly( Uri uri, { bool disableMulticast = false, }) async* { - final request = Request(HttpRequestMethod.get.methodName, uri) - ..headers['Accept'] = 'application/td+json'; + final request = Request(HttpRequestMethod.get.methodName, uri); - final rawThingDescription = await _sendDiscoveryRequest(request); - yield ThingDescription(rawThingDescription); + yield await _sendDiscoveryRequest( + request, + acceptHeaderValue: 'application/td+json', + ); } @override - Stream discoverWithCoreLinkFormat(Uri uri) async* { + Stream discoverWithCoreLinkFormat(Uri uri) async* { final discoveryUri = createCoreLinkFormatDiscoveryUri(uri); - final request = Request(HttpRequestMethod.get.methodName, uri) - ..headers['Accept'] = 'application/link-format'; + final request = Request(HttpRequestMethod.get.methodName, discoveryUri); - final encodedLinks = await _sendDiscoveryRequest(request); - - yield* Stream.fromIterable( - parseCoreLinkFormat(encodedLinks, discoveryUri), + final encodedLinks = await _sendDiscoveryRequest( + request, + acceptHeaderValue: 'application/link-format', ); + + yield encodedLinks; } } diff --git a/lib/src/core/protocol_interfaces/protocol_client.dart b/lib/src/core/protocol_interfaces/protocol_client.dart index be1f9b11..3baa28ce 100644 --- a/lib/src/core/protocol_interfaces/protocol_client.dart +++ b/lib/src/core/protocol_interfaces/protocol_client.dart @@ -5,7 +5,6 @@ // SPDX-License-Identifier: BSD-3-Clause import '../../definitions/form.dart'; -import '../../definitions/thing_description.dart'; import '../../scripting_api/subscription.dart'; import '../content.dart'; @@ -17,17 +16,19 @@ abstract class ProtocolClient { /// Stops this [ProtocolClient]. Future stop(); - /// Discovers a [ThingDescription] from a [uri]. + /// Discovers one or more Thing Descriptions from a [uri], returning a + /// [Stream] of [Content]. /// /// Allows the caller to explicitly [disableMulticast], overriding the /// multicast settings in the config of the underlying binding implementation. - Stream discoverDirectly( + Stream discoverDirectly( Uri uri, { bool disableMulticast = false, }); - /// Discovers [ThingDescription] links from a [uri] using the CoRE Link - /// Format and Web Linking (see [RFC 6690]). + /// Discovers one or more Thing Descriptions from a [uri] using the CoRE Link + /// Format and Web Linking (see [RFC 6690]), returning a [Stream] of + /// [Content]. /// /// The [uri] must point to either the resource lookup interface of a CoRE /// Resource Directory (see [RFC 9176]) or to a CoRE Resource Discovery @@ -39,7 +40,7 @@ abstract class ProtocolClient { /// /// [RFC 9176]: https://datatracker.ietf.org/doc/html/rfc9176 /// [RFC 6690]: https://datatracker.ietf.org/doc/html/rfc6690 - Stream discoverWithCoreLinkFormat(Uri uri); + Stream discoverWithCoreLinkFormat(Uri uri); /// Requests the client to perform a `readproperty` operation on a [form]. Future readResource(Form form); diff --git a/lib/src/core/thing_discovery.dart b/lib/src/core/thing_discovery.dart index 1cdc3dff..a7da26d3 100644 --- a/lib/src/core/thing_discovery.dart +++ b/lib/src/core/thing_discovery.dart @@ -6,10 +6,12 @@ import 'dart:async'; +import 'package:coap/coap.dart'; + +import '../../core.dart'; import '../../scripting_api.dart' as scripting_api; import '../definitions/thing_description.dart'; -import 'protocol_interfaces/protocol_client.dart'; -import 'servient.dart'; +import 'content.dart'; /// Custom [Exception] that is thrown when the discovery process fails. class DiscoveryException implements Exception { @@ -29,11 +31,13 @@ class DiscoveryException implements Exception { class ThingDiscovery extends Stream implements scripting_api.ThingDiscovery { /// Creates a new [ThingDiscovery] object with a given [thingFilter]. - ThingDiscovery(this.thingFilter, Servient servient) - : _client = servient.clientFor(thingFilter.url.scheme) { + ThingDiscovery(this.thingFilter, this._servient) + : _client = _servient.clientFor(thingFilter.url.scheme) { _stream = _start(); } + final Servient _servient; + bool _active = true; @override @@ -51,7 +55,7 @@ class ThingDiscovery extends Stream switch (discoveryMethod) { case scripting_api.DiscoveryMethod.direct: - yield* _client.discoverDirectly(thingFilter.url); + yield* _discoverDirectly(thingFilter.url); break; case scripting_api.DiscoveryMethod.coreLinkFormat: yield* _discoverWithCoreLinkFormat(thingFilter.url); @@ -67,14 +71,57 @@ class ThingDiscovery extends Stream _active = false; } + Future _decodeThingDescription( + Content content, + Uri uri, + ) async { + final value = await _servient.contentSerdes.contentToValue(content, null); + if (value is! Map) { + throw DiscoveryException( + 'Could not parse Thing Description obtained from $uri', + ); + } + + return ThingDescription.fromJson(value); + } + + Stream _discoverDirectly(Uri uri) async* { + yield* _client + .discoverDirectly(uri, disableMulticast: true) + .asyncMap((content) => _decodeThingDescription(content, uri)); + } + Stream _discoverWithCoreLinkFormat(Uri uri) async* { final Set discoveredUris = {}; await for (final coreWebLink in _client.discoverWithCoreLinkFormat(uri)) { - if (discoveredUris.contains(coreWebLink)) { + final value = + await _servient.contentSerdes.contentToValue(coreWebLink, null); + + if (value is! CoapWebLink || + !(value.attributes.getContentTypes()?.contains('wot.thing') ?? + false)) { + continue; + } + + Uri? parsedUri = Uri.tryParse(value.uri); + + if (parsedUri == null) { + // TODO: Should an error be passed on here instead? + continue; + } + + if (!parsedUri.isAbsolute) { + parsedUri = parsedUri.replace( + scheme: uri.scheme, + host: uri.host, + port: uri.port, + ); + } + if (discoveredUris.contains(parsedUri)) { continue; } - discoveredUris.add(coreWebLink); - yield* _client.discoverDirectly(coreWebLink, disableMulticast: true); + discoveredUris.add(parsedUri); + yield* _discoverDirectly(parsedUri); } }