From 3b724958d1a1e434542c2482020aa2bf1be9da9c Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Mon, 9 Sep 2024 09:25:50 -0700 Subject: [PATCH] Test fixups/polish --- lib/browser/mqtt_request_response.spec.ts | 19 ++++++-------- lib/browser/mqtt_request_response.ts | 25 +++++++++++-------- .../mqtt_request_response_impl.spec.ts | 4 +-- lib/native/mqtt_request_response.ts | 4 +-- 4 files changed, 25 insertions(+), 27 deletions(-) diff --git a/lib/browser/mqtt_request_response.spec.ts b/lib/browser/mqtt_request_response.spec.ts index 70fa1af7..b4559f6e 100644 --- a/lib/browser/mqtt_request_response.spec.ts +++ b/lib/browser/mqtt_request_response.spec.ts @@ -8,10 +8,9 @@ import * as auth from "./auth"; import * as test_env from "@test/test_env" import * as aws_iot_311 from "./aws_iot"; import * as aws_iot_5 from "./aws_iot_mqtt5"; -//import * as mqtt5 from "./mqtt5"; +import * as mqtt5 from "./mqtt5"; import * as mqtt_request_response from "./mqtt_request_response"; -//import {v4 as uuid} from "uuid"; -//import {once} from "events"; +import {once} from "events"; import * as mrr_test from "@test/mqtt_request_response"; import {v4 as uuid} from "uuid"; import * as test_utils from "../../test/mqtt5"; @@ -558,7 +557,6 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('ShadowUpd await mrr_test.do_streaming_operation_incoming_publish_test(mrr_test.ProtocolVersion.Mqtt311); }); -/* // We only have a 5-based test because there's no way to stop the 311 client without destroying it in the process. test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('ShadowUpdated Streaming Operation Success Subscription Events MQTT5', async () => { @@ -634,33 +632,30 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming Operation Creation Failure Null Options', async () => { // @ts-ignore - await mrr_test.do_invalid_streaming_operation_config_test(null, "invalid configuration"); + await mrr_test.do_invalid_streaming_operation_config_test(null, "Invalid streaming options"); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming Operation Creation Failure Undefined Options', async () => { // @ts-ignore - await mrr_test.do_invalid_streaming_operation_config_test(undefined, "invalid configuration"); + await mrr_test.do_invalid_streaming_operation_config_test(undefined, "Invalid streaming options"); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming Operation Creation Failure Null Filter', async () => { await mrr_test.do_invalid_streaming_operation_config_test({ // @ts-ignore subscriptionTopicFilter : null, - }, "invalid configuration"); + }, "Invalid streaming options"); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming Operation Creation Failure Invalid Filter Type', async () => { await mrr_test.do_invalid_streaming_operation_config_test({ // @ts-ignore subscriptionTopicFilter : 5, - }, "invalid configuration"); + }, "Invalid streaming options"); }); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming Operation Creation Failure Invalid Filter Value', async () => { await mrr_test.do_invalid_streaming_operation_config_test({ subscriptionTopicFilter : "#/hello/#", - }, "Failed to create"); + }, "Invalid streaming options"); }); - - - */ \ No newline at end of file diff --git a/lib/browser/mqtt_request_response.ts b/lib/browser/mqtt_request_response.ts index 29efbec6..7f1f8251 100644 --- a/lib/browser/mqtt_request_response.ts +++ b/lib/browser/mqtt_request_response.ts @@ -16,7 +16,6 @@ import * as subscription_manager from "./mqtt_request_response/subscription_mana import {MqttClientConnection} from "./mqtt"; import {Mqtt5Client} from "./mqtt5"; import * as mqtt_request_response from "../common/mqtt_request_response"; -import {SubscriptionStatusEventType} from "../common/mqtt_request_response"; import * as mqtt_request_response_internal from "../common/mqtt_request_response_internal"; import {BufferedEventEmitter} from "../common/event"; import {CrtError} from "./error"; @@ -163,14 +162,13 @@ export class StreamingOperationBase extends BufferedEventEmitter implements mqtt if (this.state == mqtt_request_response_internal.StreamingOperationState.None) { this.internalOptions.open(); this.state = mqtt_request_response_internal.StreamingOperationState.Open; - } else if (this.state != mqtt_request_response_internal.StreamingOperationState.Open) { - throw new CrtError("MQTT streaming operation not in an openable state"); + } else if (this.state == mqtt_request_response_internal.StreamingOperationState.Closed) { + throw new CrtError("MQTT streaming operation already closed"); } } /** - * Stops a streaming operation from listening to the configured stream of events and releases all native - * resources associated with the stream. + * Stops a streaming operation from listening to the configured stream of events */ close(): void { if (this.state != mqtt_request_response_internal.StreamingOperationState.Closed) { @@ -524,7 +522,7 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_ let streamingOperation = operation as StreamingOperation; streamingOperation.operation.triggerSubscriptionStatusUpdateEvent({ - type: SubscriptionStatusEventType.SubscriptionEstablished + type: mqtt_request_response.SubscriptionStatusEventType.SubscriptionEstablished }); } else { this.applyRequestResponsePublish(operation as RequestResponseOperation); @@ -722,12 +720,16 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_ let streamingOperation = operation as StreamingOperation; if (operation.state != OperationState.Terminal && operation.state != OperationState.None) { streamingOperation.operation.triggerSubscriptionStatusUpdateEvent({ - type: SubscriptionStatusEventType.SubscriptionHalted, + type: mqtt_request_response.SubscriptionStatusEventType.SubscriptionHalted, error: err }); } this.changeOperationState(operation, OperationState.Terminal); + + // this is mostly a no-op except it's the only way we can guarantee that the streaming operation state also gets + // flipped to closed + streamingOperation.operation.close(); } private completeOperationWithError(id: number, err: CrtError) { @@ -934,7 +936,7 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_ let streamingOperation = operation as StreamingOperation; streamingOperation.operation.triggerSubscriptionStatusUpdateEvent({ - type: SubscriptionStatusEventType.SubscriptionEstablished + type: mqtt_request_response.SubscriptionStatusEventType.SubscriptionEstablished }); this.changeOperationState(operation, OperationState.Subscribed); @@ -960,7 +962,7 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_ let streamingOperation = operation as StreamingOperation; streamingOperation.operation.triggerSubscriptionStatusUpdateEvent({ - type: SubscriptionStatusEventType.SubscriptionLost, + type: mqtt_request_response.SubscriptionStatusEventType.SubscriptionLost, }); } @@ -984,7 +986,7 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_ let streamingOperation = operation as StreamingOperation; streamingOperation.operation.triggerSubscriptionStatusUpdateEvent({ - type: SubscriptionStatusEventType.SubscriptionHalted, + type: mqtt_request_response.SubscriptionStatusEventType.SubscriptionHalted, error: new CrtError(`Subscription Failure for topic filter "${event.topicFilter}"`) }); @@ -1063,7 +1065,8 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_ private closeStreamingOperation(id: number) { let operation = this.operations.get(id); if (!operation) { - throw new CrtError(`Attempt to close untracked streaming operation with id "${id}"`); + // don't throw here intentionally; there's a bit of a recursive tangle with closing streaming operations + return; } this.haltStreamingOperationWithError(id, new CrtError("Streaming operation closed")); diff --git a/lib/browser/mqtt_request_response_impl.spec.ts b/lib/browser/mqtt_request_response_impl.spec.ts index 10ae4bc9..98fa02aa 100644 --- a/lib/browser/mqtt_request_response_impl.spec.ts +++ b/lib/browser/mqtt_request_response_impl.spec.ts @@ -13,7 +13,7 @@ import {LiftedPromise, newLiftedPromise} from "../common/promise"; import {SubscriptionStatusEventType} from "./mqtt_request_response"; import {v4 as uuid} from "uuid"; -jest.setTimeout(1000000); +jest.setTimeout(10000); interface TestContextOptions { clientOptions?: mqtt_request_response.RequestResponseClientOptions, @@ -956,7 +956,7 @@ test('streaming operation - close client before open', async () => { expect(false); } catch (e) { let err = e as Error; - expect(err.message).toContain("client closed"); + expect(err.message).toContain("already closed"); } cleanupTestContext(context); diff --git a/lib/native/mqtt_request_response.ts b/lib/native/mqtt_request_response.ts index ecf62f8f..f1bba522 100644 --- a/lib/native/mqtt_request_response.ts +++ b/lib/native/mqtt_request_response.ts @@ -69,8 +69,8 @@ export class StreamingOperationBase extends NativeResourceMixin(BufferedEventEmi if (this.state == mqtt_request_response_internal.StreamingOperationState.None) { this.state = mqtt_request_response_internal.StreamingOperationState.Open; crt_native.mqtt_streaming_operation_open(this.native_handle()); - } else if (this.state != mqtt_request_response_internal.StreamingOperationState.Open) { - throw new CrtError("MQTT streaming operation not in an openable state"); + } else if (this.state == mqtt_request_response_internal.StreamingOperationState.Closed) { + throw new CrtError("MQTT streaming operation already closed"); } }