Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix for realtime calling subscription multiple time - Appwrite 1.4.x #713

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 46 additions & 36 deletions templates/flutter/lib/src/realtime_mixin.dart.twig
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import 'dart:async';
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart';
import 'package:web_socket_channel/status.dart' as status;
import 'exception.dart';
import 'realtime_subscription.dart';
import 'client.dart';
Expand All @@ -15,15 +15,20 @@ typedef GetFallbackCookie = String? Function();

mixin RealtimeMixin {
late Client client;
final Map<String, List<StreamController<RealtimeMessage>>> _channels = {};
final Set<String> _channels = {};
WebSocketChannel? _websok;
String? _lastUrl;
late WebSocketFactory getWebSocket;
GetFallbackCookie? getFallbackCookie;
int? get closeCode => _websok?.closeCode;
int _subscriptionsCounter = 0;
Map<int, RealtimeSubscription> _subscriptions = {};
bool _notifyDone = true;
StreamSubscription? _websocketSubscription;

Future<dynamic> _closeConnection() async {
await _websok?.sink.close(normalClosure);
await _websocketSubscription?.cancel();
await _websok?.sink.close(status.normalClosure, 'Ending session');
_lastUrl = null;
}

Expand All @@ -36,14 +41,16 @@ mixin RealtimeMixin {
if (_lastUrl == uri.toString() && _websok?.closeCode == null) {
return;
}
_notifyDone = false;
await _closeConnection();
_lastUrl = uri.toString();
_websok = await getWebSocket(uri);
_notifyDone = true;
}
debugPrint('subscription: $_lastUrl');

try {
_websok?.stream.listen((response) {
_websocketSubscription = _websok?.stream.listen((response) {
final data = RealtimeResponse.fromJson(response);
switch (data.type) {
case 'error':
Expand All @@ -67,48 +74,45 @@ mixin RealtimeMixin {
break;
case 'event':
final message = RealtimeMessage.fromMap(data.data);
for(var channel in message.channels) {
if (_channels[channel] != null) {
for( var stream in _channels[channel]!) {
stream.sink.add(message);
for (var subscription in _subscriptions.values) {
for (var channel in message.channels) {
if (subscription.channels.contains(channel)) {
subscription.controller.add(message);
}
}
}
break;
}
}, onDone: () {
for (var list in _channels.values) {
for (var stream in list) {
stream.close();
}
if (!_notifyDone) return;
for (var subscription in _subscriptions.values) {
subscription.close();
}
_channels.clear();
_closeConnection();
}, onError: (err, stack) {
for (var list in _channels.values) {
for (var stream in list) {
stream.sink.addError(err, stack);
}
for (var subscription in _subscriptions.values) {
subscription.controller.addError(err, stack);
}
if (_websok?.closeCode != null && _websok?.closeCode != 1008) {
debugPrint("Reconnecting in one second.");
Future.delayed(Duration(seconds: 1), _createSocket);
}
});
} catch (e) {
if (e is {{spec.title | caseUcfirst}}Exception) {
if (e is AppwriteException) {
rethrow;
}
if (e is WebSocketChannelException) {
throw {{spec.title | caseUcfirst}}Exception(e.message);
throw AppwriteException(e.message);
}
throw {{spec.title | caseUcfirst}}Exception(e.toString());
throw AppwriteException(e.toString());
}
}

Uri _prepareUri() {
if (client.endPointRealtime == null) {
throw {{spec.title | caseUcfirst}}Exception(
throw AppwriteException(
"Please set endPointRealtime to connect to realtime server");
}
var uri = Uri.parse(client.endPointRealtime!);
Expand All @@ -118,43 +122,49 @@ mixin RealtimeMixin {
port: uri.port,
queryParameters: {
"project": client.config['project'],
"channels[]": _channels.keys.toList(),
"channels[]": _channels.toList(),
},
path: uri.path + "/realtime",
);
}

RealtimeSubscription subscribeTo(List<String> channels) {
StreamController<RealtimeMessage> controller = StreamController.broadcast();
for(var channel in channels) {
if (!_channels.containsKey(channel)) {
_channels[channel] = [];
}
_channels[channel]!.add(controller);
}
_channels.addAll(channels);
Future.delayed(Duration.zero, () => _createSocket());
int counter = _subscriptionsCounter++;
RealtimeSubscription subscription = RealtimeSubscription(
stream: controller.stream,
controller: controller,
channels: channels,
close: () async {
_subscriptions.remove(counter);
_subscriptionsCounter--;
controller.close();
for(var channel in channels) {
_channels[channel]!.remove(controller);
if (_channels[channel]!.isEmpty) {
_channels.remove(channel);
}
}
if(_channels.isNotEmpty) {
_cleanup(channels);

if (_channels.isNotEmpty) {
await Future.delayed(Duration.zero, () => _createSocket());
} else {
await _closeConnection();
}
});
_subscriptions[counter] = subscription;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using a counter can lead to problems. For example:

  1. start (counter = 0, subscriptions map is empty)
  2. subscribe() (counter = 1, subscriptions[0] is set)
  3. subscribe() (counter = 2, subscriptions[0] and [1] are set)
  4. the first one gets removed (counter = 1, subscriptions[1] is set)
  5. a new subscription added (counter = 2, subscriptions[1] is set and replaces the subscription from step 2)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I've updated to use the time in microseconds to overcome this. please let me know what you think.

return subscription;
}

void _cleanup(List<String> channels) {
for (var channel in channels) {
bool found = _subscriptions.values
.any((subscription) => subscription.channels.contains(channel));
if (!found) {
_channels.remove(channel);
}
}
}

void handleError(RealtimeResponse response) {
if (response.data['code'] == 1008) {
throw {{spec.title | caseUcfirst}}Exception(response.data["message"], response.data["code"]);
throw AppwriteException(response.data["message"], response.data["code"]);
} else {
debugPrint("Reconnecting in one second.");
Future.delayed(const Duration(seconds: 1), () {
Expand Down
11 changes: 10 additions & 1 deletion templates/flutter/lib/src/realtime_subscription.dart.twig
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import 'dart:async';

import 'realtime_message.dart';

/// Realtime Subscription
class RealtimeSubscription {
/// Stream of [RealtimeMessage]s
final Stream<RealtimeMessage> stream;

final StreamController<RealtimeMessage> controller;

/// List of channels
List<String> channels;

/// Closes the subscription
final Future<void> Function() close;

/// Initializes a [RealtimeSubscription]
RealtimeSubscription({required this.stream, required this.close});
RealtimeSubscription(
{required this.close, required this.channels, required this.controller})
: stream = controller.stream;
}
20 changes: 10 additions & 10 deletions templates/flutter/test/src/realtime_subscription_test.dart.twig
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import 'package:mockito/mockito.dart';
import 'package:{{language.params.packageName}}/src/realtime_message.dart';
import 'package:{{language.params.packageName}}/src/realtime_subscription.dart';
import 'package:appwrite/src/realtime_message.dart';
import 'package:appwrite/src/realtime_subscription.dart';
import 'package:flutter_test/flutter_test.dart';

class MockStream<T> extends Mock implements Stream<T> {}


import 'dart:async';

void main() {
group('RealtimeSubscription', () {
final mockStream = MockStream<RealtimeMessage>();
final mockStream = StreamController<RealtimeMessage>.broadcast();
final mockCloseFunction = () async {};
final subscription = RealtimeSubscription(stream: mockStream, close: mockCloseFunction);
final subscription = RealtimeSubscription(
controller: mockStream,
close: mockCloseFunction,
channels: ['documents']);

test('should have the correct stream and close function', () {
expect(subscription.stream, equals(mockStream));
expect(subscription.controller, equals(mockStream));
expect(subscription.stream, equals(mockStream.stream));
expect(subscription.close, equals(mockCloseFunction));
});
});
Expand Down