Skip to content

Commit

Permalink
[Java] Allow spy subscriptions to be one-to-many for different sessio…
Browse files Browse the repository at this point in the history
…ns and simplify unlinking code.
  • Loading branch information
mjpt777 committed Mar 2, 2017
1 parent 7f9d0f6 commit 8001590
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 38 deletions.
20 changes: 4 additions & 16 deletions aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -352,11 +352,7 @@ void cleanupPublication(final NetworkPublication publication)

for (int i = 0, size = subscriptionLinks.size(); i < size; i++)
{
final SubscriptionLink subscription = subscriptionLinks.get(i);
if (subscription.matches(publication))
{
subscription.unlink(publication);
}
subscriptionLinks.get(i).unlink(publication);
}
}

Expand Down Expand Up @@ -408,23 +404,15 @@ void cleanupImage(final PublicationImage image)
{
for (int i = 0, size = subscriptionLinks.size(); i < size; i++)
{
final SubscriptionLink subscription = subscriptionLinks.get(i);
if (image.matches(subscription.channelEndpoint(), subscription.streamId()))
{
subscription.unlink(image);
}
subscriptionLinks.get(i).unlink(image);
}
}

void cleanupIpcPublication(final IpcPublication publication)
{
for (int i = 0, size = subscriptionLinks.size(); i < size; i++)
{
final SubscriptionLink subscription = subscriptionLinks.get(i);
if (subscription.matches(publication.streamId()))
{
subscription.unlink(publication);
}
subscriptionLinks.get(i).unlink(publication);
}
}

Expand Down Expand Up @@ -1102,7 +1090,7 @@ private void linkSpies(final NetworkPublication publication)
for (int i = 0, size = links.size(); i < size; i++)
{
final SubscriptionLink subscription = links.get(i);
if (subscription.matches(publication))
if (subscription.matches(publication) && !subscription.isLinked(publication))
{
linkSpy(publication, subscription);
}
Expand Down
22 changes: 0 additions & 22 deletions aeron-driver/src/main/java/io/aeron/driver/SubscriptionLink.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,6 @@ public boolean matches(final int streamId)
class SpySubscriptionLink extends SubscriptionLink
{
private final UdpChannel udpChannel;
private NetworkPublication publication = null;
private ReadablePosition position = null;

SpySubscriptionLink(
final long registrationId,
Expand All @@ -211,29 +209,9 @@ class SpySubscriptionLink extends SubscriptionLink
this.udpChannel = spiedChannel;
}

public void link(final Subscribable subscribable, final ReadablePosition position)
{
this.publication = (NetworkPublication)subscribable;
this.position = position;
}

public void unlink(final Subscribable subscribable)
{
publication = null;
position = null;
}

public boolean matches(final NetworkPublication publication)
{
return streamId == publication.streamId() &&
udpChannel.canonicalForm().equals(publication.sendChannelEndpoint().udpChannel().canonicalForm());
}

public void close()
{
if (null != publication)
{
publication.removeSubscriber(position);
}
}
}

0 comments on commit 8001590

Please sign in to comment.