Skip to content

Commit

Permalink
RATIS-1942. GrpcLogAppender has ILLEGAL TRANSITION: STARTING -> STARTING
Browse files Browse the repository at this point in the history
  • Loading branch information
adoroszlai committed Dec 13, 2023
1 parent 8944035 commit 47fc81d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class GrpcServerProtocolClient implements Closeable {
public GrpcServerProtocolClient(RaftPeer target, int flowControlWindow,
TimeDuration requestTimeout, GrpcTlsConfig tlsConfig, boolean separateHBChannel) {
raftPeerId = target.getId();
LOG.info("Build channel for {}", raftPeerId);
LOG.info("Build channel for {}", target);
useSeparateHBChannel = separateHBChannel;
channel = buildChannel(target, flowControlWindow, tlsConfig);
blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,27 +250,27 @@ public PeerChanges(RaftPeer[] all, RaftPeer[] newPeers, RaftPeer[] removed) {
public static RaftGroup initRaftGroup(Collection<String> ids, Collection<String> listenerIds) {
Iterator<InetSocketAddress> addresses = NetUtils.createLocalServerAddress(4 * (ids.size() + listenerIds.size())).iterator();
Stream<RaftPeer> peer = ids.stream()
.map(RaftPeerId::valueOf)
.map(id -> RaftPeer.newBuilder().setId(id)
.setAddress(addresses.next())
.setAdminAddress(addresses.next())
.setClientAddress(addresses.next())
.setDataStreamAddress(addresses.next())
.build());
.map(id -> RaftPeer.newBuilder().setId(id))
.map(p -> assignAddresses(p, addresses))
.map(RaftPeer.Builder::build);
Stream<RaftPeer> listener = listenerIds.stream()
.map(RaftPeerId::valueOf)
.map(id -> RaftPeer.newBuilder().setId(id)
.setAddress(addresses.next())
.setAdminAddress(addresses.next())
.setClientAddress(addresses.next())
.setDataStreamAddress(addresses.next())
.setStartupRole(RaftProtos.RaftPeerRole.LISTENER)
.build());
.map(id -> RaftPeer.newBuilder().setId(id))
.map(p -> assignAddresses(p, addresses))
.map(p -> p.setStartupRole(RaftProtos.RaftPeerRole.LISTENER))
.map(RaftPeer.Builder::build);
final RaftPeer[] peers = Stream.concat(peer, listener).toArray(RaftPeer[]::new);

return RaftGroup.valueOf(RaftGroupId.randomId(), peers);
}

private static RaftPeer.Builder assignAddresses(RaftPeer.Builder builder, Iterator<InetSocketAddress> addresses) {
return builder
.setAddress(addresses.next())
.setAdminAddress(addresses.next())
.setClientAddress(addresses.next())
.setDataStreamAddress(addresses.next());
}

private final Supplier<File> rootTestDir = JavaUtils.memoize(
() -> new File(BaseTest.getRootTestDir(),
JavaUtils.getClassSimpleName(getClass()) + Integer.toHexString(ThreadLocalRandom.current().nextInt())));
Expand Down Expand Up @@ -468,10 +468,13 @@ public PeerChanges addNewPeers(String[] ids, boolean startNewPeer,
if (emptyPeer) {
raftGroup = RaftGroup.valueOf(group.getGroupId(), Collections.emptyList());
} else {
Iterator<InetSocketAddress> addresses = NetUtils.createLocalServerAddress(4 * ids.length).iterator();
final Collection<RaftPeer> newPeers = StreamSupport.stream(peerIds.spliterator(), false)
.map(id -> RaftPeer.newBuilder().setId(id)
.setStartupRole(startRole)
.build()).collect(Collectors.toSet());
.setStartupRole(startRole))
.map(p -> assignAddresses(p, addresses))
.map(RaftPeer.Builder::build)
.collect(Collectors.toSet());
newPeers.addAll(group.getPeers());
raftGroup = RaftGroup.valueOf(group.getGroupId(), newPeers);
}
Expand Down

0 comments on commit 47fc81d

Please sign in to comment.