Skip to content

Commit

Permalink
Better discovering
Browse files Browse the repository at this point in the history
  • Loading branch information
devgianlu committed May 11, 2021
1 parent 21006a0 commit b3492b0
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 75 deletions.
11 changes: 10 additions & 1 deletion src/main/java/xyz/gianlu/zeroconf/DiscoveredService.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@ public final class DiscoveredService {
public final String service;
public final String protocol;
public final String domain;
public final String serviceName;
private final long expiration;

DiscoveredService(@NotNull RecordSRV record) {
expiration = System.currentTimeMillis() + record.ttl * 1000L;

target = record.getTarget();
port = record.getPort();
serviceName = record.getName();

String[] split = record.getName().split("\\.");
String[] split = serviceName.split("\\.");
if (split.length != 4) throw new IllegalArgumentException("Invalid service name: " + record.getName());

name = split[0];
Expand All @@ -26,6 +31,10 @@ public final class DiscoveredService {
domain = "." + split[3];
}

public boolean isExpired() {
return System.currentTimeMillis() > expiration;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/xyz/gianlu/zeroconf/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ public static void main(String[] args) throws IOException {
.setUseIpv4(true)
.setUseIpv6(false);

Runtime.getRuntime().addShutdownHook(new Thread(zeroconf::close));

// Announce service
Service service = new Service(args[0], args[1], Integer.parseInt(args[2]));
zeroconf.announce(service);
Runtime.getRuntime().addShutdownHook(new Thread(zeroconf::close));

// Start discovering
Zeroconf.DiscoveredServices services = zeroconf.discover(args[1], "tcp", ".local");
while (true) {
System.out.println(zeroconf.discover(args[1], "tcp", ".local"));
System.out.println(services.getServices());

try {
Thread.sleep(1000);
Expand Down
22 changes: 0 additions & 22 deletions src/main/java/xyz/gianlu/zeroconf/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,26 +160,4 @@ void write(ByteBuffer out, Packet packet) {
if (len > 0) out.putShort(pos, (short) len);
else out.position(pos);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Record record = (Record) o;
if (type != record.type) return false;
if (ttl != record.ttl) return false;
if (clazz != record.clazz) return false;
if (!name.equals(record.name)) return false;
return Arrays.equals(data, record.data);
}

@Override
public int hashCode() {
int result = type;
result = 31 * result + ttl;
result = 31 * result + name.hashCode();
result = 31 * result + clazz;
result = 31 * result + Arrays.hashCode(data);
return result;
}
}
132 changes: 82 additions & 50 deletions src/main/java/xyz/gianlu/zeroconf/Zeroconf.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -47,6 +48,7 @@ public final class Zeroconf implements Closeable {
private final ListenerThread thread;
private final List<Record> registry;
private final Collection<Service> services;
private final CopyOnWriteArrayList<DiscoveredServices> discoverers;
private final CopyOnWriteArrayList<PacketListener> receiveListeners;
private final CopyOnWriteArrayList<PacketListener> sendListeners;
private boolean useIpv4 = true;
Expand All @@ -67,6 +69,7 @@ public Zeroconf() {

receiveListeners = new CopyOnWriteArrayList<>();
sendListeners = new CopyOnWriteArrayList<>();
discoverers = new CopyOnWriteArrayList<>();
thread = new ListenerThread();
registry = new ArrayList<>();
services = new HashSet<>();
Expand Down Expand Up @@ -101,8 +104,15 @@ public Zeroconf setUseIpv6(boolean ipv6) {
*/
@Override
public void close() {
List<Service> list = new ArrayList<>(services);
for (Service service : list) unannounce(service);
for (Service service : new ArrayList<>(services))
unannounce(service);

services.clear();

for (DiscoveredServices discoverer : new ArrayList<>(discoverers))
discoverer.stop();

discoverers.clear();

try {
thread.close();
Expand Down Expand Up @@ -390,61 +400,19 @@ private void handlePacket(@NotNull Packet packet) {
}

/**
* Try to discover services.
* Create a background thread that continuously searches for the given service.
*
* @param service the service name, eg "_http"
* @param protocol the protocol, eg "_tcp"
* @param domain the domain, eg ".local"
* @return a list of discovered services
*/
@NotNull
public Collection<DiscoveredService> discover(@NotNull String service, @NotNull String protocol, @NotNull String domain) {
String serviceName = "_" + service + "._" + protocol + domain;

Packet probe = new Packet();
probe.setResponse(false);
probe.addQuestion(new RecordPTR(serviceName));

Set<DiscoveredService> matches = Collections.synchronizedSet(new HashSet<>());
PacketListener probeListener = packet -> {
if (packet.isResponse()) {
boolean notify = false;
for (Record r : packet.getAnswers()) {
if (r instanceof RecordSRV && r.getName().endsWith(serviceName)) {
matches.add(new DiscoveredService((RecordSRV) r));
notify = true;
}
}

for (Record r : packet.getAdditionals()) {
if (r instanceof RecordSRV && r.getName().endsWith(serviceName)) {
matches.add(new DiscoveredService((RecordSRV) r));
notify = true;
}
}

if (notify) {
synchronized (matches) {
matches.notifyAll();
}
}
}
};

addReceiveListener(probeListener);
for (int i = 0; i < 3 && matches.isEmpty(); i++) {
send(probe);
synchronized (matches) {
try {
matches.wait(250);
} catch (InterruptedException ex) {
// ignore
}
}
}

removeReceiveListener(probeListener);
return matches;
public DiscoveredServices discover(@NotNull String service, @NotNull String protocol, @NotNull String domain) {
DiscoveredServices discoverer = new DiscoveredServices("_" + service + "._" + protocol + domain);
new Thread(discoverer, "zeroconf-discover-" + service + "-" + protocol + "-" + domain).start();
discoverers.add(discoverer);
return discoverer;
}

/**
Expand Down Expand Up @@ -557,6 +525,70 @@ public void unannounce(@NotNull Service service) {
LOGGER.info("Unannounced {}.", service);
}

public class DiscoveredServices implements Runnable {
private final String serviceName;
private final PacketListener listener;
private final Set<DiscoveredService> services = Collections.synchronizedSet(new HashSet<>());
private volatile boolean shouldStop = false;
private int nextInterval = 1000;

DiscoveredServices(@NotNull String serviceName) {
this.serviceName = serviceName;

addReceiveListener(listener = packet -> {
if (!packet.isResponse())
return;

for (Record r : packet.getAnswers())
if (r instanceof RecordSRV)
addService((RecordSRV) r);

for (Record r : packet.getAdditionals())
if (r instanceof RecordSRV)
addService((RecordSRV) r);
});
}

private void addService(@NotNull RecordSRV record) {
if (!record.getName().endsWith(serviceName))
return;

services.removeIf(s -> s.isExpired() || s.serviceName.equals(record.getName()));
services.add(new DiscoveredService(record));
}

public void stop() {
shouldStop = true;
}

@NotNull
public Collection<DiscoveredService> getServices() {
return Collections.unmodifiableCollection(services);
}

@Override
public void run() {
while (!shouldStop) {
Packet probe = new Packet();
probe.setResponse(false);
probe.addQuestion(new RecordPTR(serviceName));
send(probe);

try {
//noinspection BusyWait
Thread.sleep(nextInterval);
nextInterval *= 2;
if (nextInterval >= TimeUnit.MINUTES.toMillis(60))
nextInterval = (int) (TimeUnit.MINUTES.toMillis(60) + 20 + Math.random() * 100);
} catch (InterruptedException ex) {
break;
}
}

removeReceiveListener(listener);
}
}

/**
* The thread that listens to one or more Multicast DatagramChannels using a Selector,
* waiting for incoming packets. This wait can be also interrupted and a packet sent.
Expand Down

0 comments on commit b3492b0

Please sign in to comment.