Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move InMemorySubscriber from L2 communication module to L3 client mo… #158

Merged
merged 6 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@
<include>**/uri/**</include>
<include>**/uuid/**</include>
<include>**/validation/**</include>
<include>**/client/**</include>
</includes>
</configuration>
</plugin>
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add copyright header

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Original file line number Diff line number Diff line change
@@ -1,16 +1,4 @@
/**
* SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.eclipse.uprotocol.communication;
package org.eclipse.uprotocol.client.usubscription.v3;

import java.util.Objects;
import java.util.Optional;
Expand All @@ -19,6 +7,20 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;

import org.eclipse.uprotocol.communication.CallOptions;
import org.eclipse.uprotocol.communication.InMemoryRpcClient;
import org.eclipse.uprotocol.communication.Notifier;
import org.eclipse.uprotocol.communication.RpcClient;
import org.eclipse.uprotocol.communication.RpcMapper;
import org.eclipse.uprotocol.communication.SimpleNotifier;
import org.eclipse.uprotocol.communication.UPayload;
import org.eclipse.uprotocol.communication.UStatusException;
import org.eclipse.uprotocol.core.usubscription.v3.FetchSubscribersRequest;
import org.eclipse.uprotocol.core.usubscription.v3.FetchSubscribersResponse;
import org.eclipse.uprotocol.core.usubscription.v3.FetchSubscriptionsRequest;
import org.eclipse.uprotocol.core.usubscription.v3.FetchSubscriptionsResponse;
import org.eclipse.uprotocol.core.usubscription.v3.NotificationsRequest;
import org.eclipse.uprotocol.core.usubscription.v3.NotificationsResponse;
import org.eclipse.uprotocol.core.usubscription.v3.SubscriberInfo;
import org.eclipse.uprotocol.core.usubscription.v3.SubscriptionRequest;
import org.eclipse.uprotocol.core.usubscription.v3.SubscriptionResponse;
Expand All @@ -39,45 +41,84 @@
import com.google.protobuf.Descriptors.ServiceDescriptor;

/**
* The following is an in-memory implementation of the {@link Subscriber} interface that
* wraps the {@link UTransport} for implementing the Subscriber-side of the pub/sub
* messaging pattern to allow developers to subscribe and unsubscribe to topics. This
* implementation uses the {@link InMemoryRpcClient} and {@link SimpleNotifier} interfaces
* to invoke the subscription request message to the usubscription service, and register
* to receive notifications for changes from the uSubscription service.
*
* In memory client side of the uSubscription business logic that uses a transport and
* RpcClient and Notifier APIs.
*/
public class InMemorySubscriber implements Subscriber {

public class InMemoryUSubscriptionClient implements USubscriptionClient {
private final UTransport transport;
private final RpcClient rpcClient;
private final Notifier notifier;
private final CallOptions options;

private static final ServiceDescriptor USUBSCRIPTION = USubscriptionProto.getDescriptor().getServices().get(0);

// TODO: The following items need to be pulled from generated code
// TODO: The following items eventually need to be pulled from generated code
private static final UUri SUBSCRIBE_METHOD = UriFactory.fromProto(USUBSCRIPTION, 1);
private static final UUri UNSUBSCRIBE_METHOD = UriFactory.fromProto(USUBSCRIPTION, 2);
private static final UUri NOTIFICATION_TOPIC = UriFactory.fromProto(USUBSCRIPTION, 0x8000);
private static final UUri FETCH_SUBSCRIBERS_METHOD = UriFactory.fromProto(USUBSCRIPTION, 8);
private static final UUri FETCH_SUBSCRIPTIONS_METHOD = UriFactory.fromProto(USUBSCRIPTION, 3);
private static final UUri REGISTER_NOTIFICATIONS_METHOD = UriFactory.fromProto(USUBSCRIPTION, 6);
private static final UUri UNREGISTER_NOTIFICATIONS_METHOD = UriFactory.fromProto(USUBSCRIPTION, 7);

private static final UUri NOTIFICATION_TOPIC = UriFactory.fromProto(USUBSCRIPTION, 0x8000);


// Map to store subscription change notification handlers
private final ConcurrentHashMap<UUri, SubscriptionChangeHandler> mHandlers = new ConcurrentHashMap<>();

// transport Notification listener that will process subscription change notifications
private final UListener mNotificationListener = this::handleNotifications;



/**
* Creates a new USubscription client passing only a transport.
*
* @param transport the transport to use for sending the notifications
*/
public InMemoryUSubscriptionClient (UTransport transport) {
this(transport, CallOptions.DEFAULT);
}

/**
* Creates a new USubscription client passing {@link UTransport} and {@link CallOptions}
* used to provide additional options for the RPC requests to uSubscription service.
*
* @param transport the transport to use for sending the notifications
* @param options the call options to use for the RPC requests
*/
public InMemoryUSubscriptionClient (UTransport transport, CallOptions options) {
this(transport, new InMemoryRpcClient(transport), new SimpleNotifier(transport), options);
}


/**
* Creates a new subscriber for existing Communication Layer client implementations.
* Creates a new USubscription client passing {@link UTransport}, {@link RpcClient} and {@link Notifier}.
*
* @param transport the transport to use for sending the notifications
* @param rpcClient the rpc client to use for sending the RPC requests
* @param notifier the notifier to use for registering the notification listener
* @param options the call options to use for the RPC requests
*/
public InMemorySubscriber (UTransport transport, RpcClient rpcClient, Notifier notifier) {
public InMemoryUSubscriptionClient (UTransport transport, RpcClient rpcClient, Notifier notifier) {
this(transport, rpcClient, notifier, CallOptions.DEFAULT);
}


/**
* Creates a new USubscription client passing {@link UTransport}, {@link CallOptions},
* and an implementation of {@link RpcClient} and {@link Notifier}.
*
* @param transport the transport to use for sending the notifications
* @param rpcClient the rpc client to use for sending the RPC requests
* @param notifier the notifier to use for registering the notification listener
* @param options the call options to use for the RPC requests
*/
public InMemoryUSubscriptionClient (UTransport transport, RpcClient rpcClient,
Notifier notifier, CallOptions options) {
Objects.requireNonNull(transport, UTransport.TRANSPORT_NULL_ERROR);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CallOptions contains auth token and it is immutable. When token expires it has to be refreshed, so the current implementation will require to rebuild the whole instance just to update token, I think it is better to pass options into each RPC method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok fixed

Objects.requireNonNull(rpcClient, "RpcClient missing");
Objects.requireNonNull(notifier, "Notifier missing");
this.options = Objects.requireNonNullElse(options, CallOptions.DEFAULT);
this.transport = transport;
this.rpcClient = rpcClient;
this.notifier = notifier;
Expand All @@ -99,18 +140,16 @@ public InMemorySubscriber (UTransport transport, RpcClient rpcClient, Notifier n
*
* @param topic The topic to subscribe to.
* @param listener The listener to be called when a message is received on the topic.
* @param options The call options for the subscription.
* @param handler {@link SubscriptionChangeHandler} to handle changes to subscription states.
* @return Returns the CompletionStage with {@link SubscriptionResponse} or exception with the failure
* reason as {@link UStatus}. {@link UCode.ALREADY_EXISTS} will be returned if you call this API multiple
* times passing a different handler.
*/
@Override
public CompletionStage<SubscriptionResponse> subscribe(UUri topic, UListener listener, CallOptions options,
public CompletionStage<SubscriptionResponse> subscribe(UUri topic, UListener listener,
SubscriptionChangeHandler handler) {
Objects.requireNonNull(topic, "Subscribe topic missing");
Objects.requireNonNull(listener, "Request listener missing");
options = Objects.requireNonNullElse(options, CallOptions.DEFAULT);

final SubscriptionRequest request = SubscriptionRequest.newBuilder()
.setTopic(topic)
Expand Down Expand Up @@ -162,11 +201,10 @@ public CompletionStage<SubscriptionResponse> subscribe(UUri topic, UListener lis
*
* @param topic The topic to unsubscribe to.
* @param listener The listener to be called when a message is received on the topic.
* @param options The call options for the subscription.
* @return Returns {@link UStatus} with the result from the unsubscribe request.
*/
@Override
public CompletionStage<UStatus> unsubscribe(UUri topic, UListener listener, CallOptions options) {
public CompletionStage<UStatus> unsubscribe(UUri topic, UListener listener) {
Objects.requireNonNull(topic, "Unsubscribe topic missing");
Objects.requireNonNull(listener, "listener missing");

Expand All @@ -190,7 +228,7 @@ public CompletionStage<UStatus> unsubscribe(UUri topic, UListener listener, Call


/**
* Unregisters a listener and removes any registered {@link SubscriptionChangeHandler} for the topic.
* Unregister the listener and removes any registered {@link SubscriptionChangeHandler} for the topic.
*
* This method is used to remove handlers/listeners without notifying the uSubscription service
* so that we can be persistently subscribed even when the uE is not running.
Expand All @@ -207,13 +245,103 @@ public CompletionStage<UStatus> unregisterListener(UUri topic, UListener listene
.whenComplete((status, exception) -> mHandlers.remove(topic));
}

/**
* Close the subscription client and clean up resources.
*/
public void close() {
mHandlers.clear();
notifier.unregisterNotificationListener(NOTIFICATION_TOPIC, mNotificationListener)
.toCompletableFuture().join();
}


/**
* Register for Subscription Change Notifications.
*
* This API allows producers to register to receive subscription change notifications for
* topics that they produce only.
*
* @param topic The topic to register for notifications.
* @param handler The {@link SubscriptionChangeHandler} to handle the subscription changes.
* @return {@link CompletionStage} completed successfully if uSubscription service accepts the
* request to register the caller to be notified of subscription changes, or
* the CompletionStage completes exceptionally with {@link UStatus} that indicates
* the failure reason.
*/
@Override
public CompletionStage<NotificationsResponse> registerForNotifications(UUri topic,
SubscriptionChangeHandler handler) {
Objects.requireNonNull(topic, "Topic missing");
Objects.requireNonNull(handler, "Handler missing");

NotificationsRequest request = NotificationsRequest.newBuilder()
.setTopic(topic)
.setSubscriber(SubscriberInfo.newBuilder().setUri(transport.getSource()).build())
.build();

return RpcMapper.mapResponse(rpcClient.invokeMethod(REGISTER_NOTIFICATIONS_METHOD,
UPayload.pack(request), options), NotificationsResponse.class)
// Then Add the handler (if the client provided one) so the client can be notified of
// changes to the subscription state.
.whenComplete( (response, exception) -> {
if (exception == null) {
mHandlers.compute(topic, (k, existingHandler) -> {
if (existingHandler != null && existingHandler != handler) {
throw new UStatusException(UCode.ALREADY_EXISTS, "Handler already registered");
}
return handler;
});
}
});
}


/**
* Unregister for subscription change notifications.
*
* @param topic The topic to unregister for notifications.
* @param handler The {@link SubscriptionChangeHandler} to handle the subscription changes.
* @return {@link CompletionStage} completed successfully with {@link NotificationResponse} with
* the status of the API call to uSubscription service, or completed unsuccessfully with
* {@link UStatus} with the reason for the failure. {@link UCode.PERMISSION_DENIED} is
* returned if the topic ue_id does not equal the callers ue_id.
*/
@Override
public CompletionStage<NotificationsResponse> unregisterForNotifications(UUri topic,
SubscriptionChangeHandler handler) {
Objects.requireNonNull(topic, "Topic missing");
Objects.requireNonNull(handler, "Handler missing");

NotificationsRequest request = NotificationsRequest.newBuilder()
.setTopic(topic)
.setSubscriber(SubscriberInfo.newBuilder().setUri(transport.getSource()).build())
.build();

return RpcMapper.mapResponse(rpcClient.invokeMethod(UNREGISTER_NOTIFICATIONS_METHOD,
UPayload.pack(request), options), NotificationsResponse.class)
.whenComplete((response, exception) -> mHandlers.remove(topic));
}


@Override
public CompletionStage<FetchSubscribersResponse> fetchSubscribers(UUri topic) {
Objects.requireNonNull(topic, "Topic missing");

FetchSubscribersRequest request = FetchSubscribersRequest.newBuilder().setTopic(topic).build();
return RpcMapper.mapResponse(rpcClient.invokeMethod(FETCH_SUBSCRIBERS_METHOD,
UPayload.pack(request), options), FetchSubscribersResponse.class);
}


@Override
public CompletionStage<FetchSubscriptionsResponse> fetchSubscriptions(FetchSubscriptionsRequest request) {
Objects.requireNonNull(request, "Request missing");

return RpcMapper.mapResponse(rpcClient.invokeMethod(FETCH_SUBSCRIPTIONS_METHOD,
UPayload.pack(request), options), FetchSubscriptionsResponse.class);
}


/**
* Handles incoming notifications from the USubscription service.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.eclipse.uprotocol.communication;
package org.eclipse.uprotocol.client.usubscription.v3;

import org.eclipse.uprotocol.core.usubscription.v3.SubscriptionStatus;
import org.eclipse.uprotocol.v1.UUri;
Expand Down
Loading
Loading