Skip to content

Commit

Permalink
Test fixups/polish
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Sep 9, 2024
1 parent c7a2ce2 commit 3b72495
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 27 deletions.
19 changes: 7 additions & 12 deletions lib/browser/mqtt_request_response.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import * as auth from "./auth";
import * as test_env from "@test/test_env"
import * as aws_iot_311 from "./aws_iot";
import * as aws_iot_5 from "./aws_iot_mqtt5";
//import * as mqtt5 from "./mqtt5";
import * as mqtt5 from "./mqtt5";
import * as mqtt_request_response from "./mqtt_request_response";
//import {v4 as uuid} from "uuid";
//import {once} from "events";
import {once} from "events";
import * as mrr_test from "@test/mqtt_request_response";
import {v4 as uuid} from "uuid";
import * as test_utils from "../../test/mqtt5";
Expand Down Expand Up @@ -558,7 +557,6 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('ShadowUpd
await mrr_test.do_streaming_operation_incoming_publish_test(mrr_test.ProtocolVersion.Mqtt311);
});

/*
// We only have a 5-based test because there's no way to stop the 311 client without destroying it in the process.
test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('ShadowUpdated Streaming Operation Success Subscription Events MQTT5', async () => {

Expand Down Expand Up @@ -634,33 +632,30 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming

test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming Operation Creation Failure Null Options', async () => {
// @ts-ignore
await mrr_test.do_invalid_streaming_operation_config_test(null, "invalid configuration");
await mrr_test.do_invalid_streaming_operation_config_test(null, "Invalid streaming options");
});

test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming Operation Creation Failure Undefined Options', async () => {
// @ts-ignore
await mrr_test.do_invalid_streaming_operation_config_test(undefined, "invalid configuration");
await mrr_test.do_invalid_streaming_operation_config_test(undefined, "Invalid streaming options");
});

test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming Operation Creation Failure Null Filter', async () => {
await mrr_test.do_invalid_streaming_operation_config_test({
// @ts-ignore
subscriptionTopicFilter : null,
}, "invalid configuration");
}, "Invalid streaming options");
});

test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming Operation Creation Failure Invalid Filter Type', async () => {
await mrr_test.do_invalid_streaming_operation_config_test({
// @ts-ignore
subscriptionTopicFilter : 5,
}, "invalid configuration");
}, "Invalid streaming options");
});

test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt5_is_valid_cred())('Streaming Operation Creation Failure Invalid Filter Value', async () => {
await mrr_test.do_invalid_streaming_operation_config_test({
subscriptionTopicFilter : "#/hello/#",
}, "Failed to create");
}, "Invalid streaming options");
});
*/
25 changes: 14 additions & 11 deletions lib/browser/mqtt_request_response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import * as subscription_manager from "./mqtt_request_response/subscription_mana
import {MqttClientConnection} from "./mqtt";
import {Mqtt5Client} from "./mqtt5";
import * as mqtt_request_response from "../common/mqtt_request_response";
import {SubscriptionStatusEventType} from "../common/mqtt_request_response";
import * as mqtt_request_response_internal from "../common/mqtt_request_response_internal";
import {BufferedEventEmitter} from "../common/event";
import {CrtError} from "./error";
Expand Down Expand Up @@ -163,14 +162,13 @@ export class StreamingOperationBase extends BufferedEventEmitter implements mqtt
if (this.state == mqtt_request_response_internal.StreamingOperationState.None) {
this.internalOptions.open();
this.state = mqtt_request_response_internal.StreamingOperationState.Open;
} else if (this.state != mqtt_request_response_internal.StreamingOperationState.Open) {
throw new CrtError("MQTT streaming operation not in an openable state");
} else if (this.state == mqtt_request_response_internal.StreamingOperationState.Closed) {
throw new CrtError("MQTT streaming operation already closed");
}
}

/**
* Stops a streaming operation from listening to the configured stream of events and releases all native
* resources associated with the stream.
* Stops a streaming operation from listening to the configured stream of events
*/
close(): void {
if (this.state != mqtt_request_response_internal.StreamingOperationState.Closed) {
Expand Down Expand Up @@ -524,7 +522,7 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_

let streamingOperation = operation as StreamingOperation;
streamingOperation.operation.triggerSubscriptionStatusUpdateEvent({
type: SubscriptionStatusEventType.SubscriptionEstablished
type: mqtt_request_response.SubscriptionStatusEventType.SubscriptionEstablished
});
} else {
this.applyRequestResponsePublish(operation as RequestResponseOperation);
Expand Down Expand Up @@ -722,12 +720,16 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
let streamingOperation = operation as StreamingOperation;
if (operation.state != OperationState.Terminal && operation.state != OperationState.None) {
streamingOperation.operation.triggerSubscriptionStatusUpdateEvent({
type: SubscriptionStatusEventType.SubscriptionHalted,
type: mqtt_request_response.SubscriptionStatusEventType.SubscriptionHalted,
error: err
});
}

this.changeOperationState(operation, OperationState.Terminal);

// this is mostly a no-op except it's the only way we can guarantee that the streaming operation state also gets
// flipped to closed
streamingOperation.operation.close();
}

private completeOperationWithError(id: number, err: CrtError) {
Expand Down Expand Up @@ -934,7 +936,7 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_

let streamingOperation = operation as StreamingOperation;
streamingOperation.operation.triggerSubscriptionStatusUpdateEvent({
type: SubscriptionStatusEventType.SubscriptionEstablished
type: mqtt_request_response.SubscriptionStatusEventType.SubscriptionEstablished
});

this.changeOperationState(operation, OperationState.Subscribed);
Expand All @@ -960,7 +962,7 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_

let streamingOperation = operation as StreamingOperation;
streamingOperation.operation.triggerSubscriptionStatusUpdateEvent({
type: SubscriptionStatusEventType.SubscriptionLost,
type: mqtt_request_response.SubscriptionStatusEventType.SubscriptionLost,
});
}

Expand All @@ -984,7 +986,7 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_

let streamingOperation = operation as StreamingOperation;
streamingOperation.operation.triggerSubscriptionStatusUpdateEvent({
type: SubscriptionStatusEventType.SubscriptionHalted,
type: mqtt_request_response.SubscriptionStatusEventType.SubscriptionHalted,
error: new CrtError(`Subscription Failure for topic filter "${event.topicFilter}"`)
});

Expand Down Expand Up @@ -1063,7 +1065,8 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
private closeStreamingOperation(id: number) {
let operation = this.operations.get(id);
if (!operation) {
throw new CrtError(`Attempt to close untracked streaming operation with id "${id}"`);
// don't throw here intentionally; there's a bit of a recursive tangle with closing streaming operations
return;
}

this.haltStreamingOperationWithError(id, new CrtError("Streaming operation closed"));
Expand Down
4 changes: 2 additions & 2 deletions lib/browser/mqtt_request_response_impl.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {LiftedPromise, newLiftedPromise} from "../common/promise";
import {SubscriptionStatusEventType} from "./mqtt_request_response";
import {v4 as uuid} from "uuid";

jest.setTimeout(1000000);
jest.setTimeout(10000);

interface TestContextOptions {
clientOptions?: mqtt_request_response.RequestResponseClientOptions,
Expand Down Expand Up @@ -956,7 +956,7 @@ test('streaming operation - close client before open', async () => {
expect(false);
} catch (e) {
let err = e as Error;
expect(err.message).toContain("client closed");
expect(err.message).toContain("already closed");
}

cleanupTestContext(context);
Expand Down
4 changes: 2 additions & 2 deletions lib/native/mqtt_request_response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ export class StreamingOperationBase extends NativeResourceMixin(BufferedEventEmi
if (this.state == mqtt_request_response_internal.StreamingOperationState.None) {
this.state = mqtt_request_response_internal.StreamingOperationState.Open;
crt_native.mqtt_streaming_operation_open(this.native_handle());
} else if (this.state != mqtt_request_response_internal.StreamingOperationState.Open) {
throw new CrtError("MQTT streaming operation not in an openable state");
} else if (this.state == mqtt_request_response_internal.StreamingOperationState.Closed) {
throw new CrtError("MQTT streaming operation already closed");
}
}

Expand Down

0 comments on commit 3b72495

Please sign in to comment.