Skip to content

Commit

Permalink
add test for p2p and zookeeper (#1958)
Browse files Browse the repository at this point in the history
  • Loading branch information
htynkn authored and beiwei30 committed Jun 19, 2018
1 parent 27e6fdb commit 9ee83e4
Show file tree
Hide file tree
Showing 10 changed files with 595 additions and 69 deletions.
9 changes: 8 additions & 1 deletion dubbo-remoting/dubbo-remoting-p2p/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba</groupId>
Expand All @@ -34,5 +35,11 @@
<artifactId>dubbo-remoting-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo-remoting-netty</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.p2p.exchange.support;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.ExchangeChannel;
import org.apache.dubbo.remoting.exchange.ExchangeHandler;
import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
import org.apache.dubbo.remoting.p2p.Group;
import org.apache.dubbo.remoting.p2p.Networkers;
import org.apache.dubbo.remoting.p2p.Peer;
import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;

public class MulticastExchangeNetworkerTest {
@Test
public void testJoin() throws RemotingException, InterruptedException {
final String groupURL = "multicast://224.5.6.7:1234";

MulticastExchangeNetworker multicastExchangeNetworker = new MulticastExchangeNetworker();

final CountDownLatch countDownLatch = new CountDownLatch(1);
Peer peer1 = multicastExchangeNetworker.lookup(URL.valueOf(groupURL))
.join(URL.valueOf("dubbo://0.0.0.0:" + NetUtils.getAvailablePort()), new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) throws RemotingException {
countDownLatch.countDown();
return super.reply(channel, msg);
}
});
Peer peer2 = multicastExchangeNetworker.lookup(URL.valueOf(groupURL))
.join(URL.valueOf("dubbo://0.0.0.0:" + NetUtils.getAvailablePort()), mock(ExchangeHandler.class));

while (true) {
for (Channel channel : peer1.getChannels()) {
channel.send("hello multicast exchange network!");
}
TimeUnit.MILLISECONDS.sleep(50);

long count = countDownLatch.getCount();
if (count > 0) {
break;
}
}

Group lookup = Networkers.lookup(groupURL);
assertThat(lookup, not(nullValue()));

assertThat(peer1, instanceOf(ExchangeServerPeer.class));

peer1.close();
peer2.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.p2p.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.p2p.Group;
import org.apache.dubbo.remoting.p2p.Peer;
import org.apache.dubbo.remoting.transport.ChannelHandlerAdapter;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.mock;

public class FileNetworkerTest {
@Rule
public TemporaryFolder folder = new TemporaryFolder();

@Before
public void setUp() throws Exception {
folder.create();
}

@After
public void tearDown() throws Exception {
folder.delete();
}

@Test
public void testJoin() throws RemotingException, InterruptedException, IOException {
final String groupURL = "file://" + folder.newFile();

FileNetworker networker = new FileNetworker();
Group group = networker.lookup(URL.valueOf(groupURL));

final CountDownLatch countDownLatch = new CountDownLatch(1);
Peer peer1 = group.join(URL.valueOf("dubbo://0.0.0.0:" + NetUtils.getAvailablePort()), new ChannelHandlerAdapter() {
@Override
public void received(Channel channel, Object message) {
countDownLatch.countDown();
}
});
Peer peer2 = group.join(URL.valueOf("dubbo://0.0.0.0:" + NetUtils.getAvailablePort()),
mock(ChannelHandlerAdapter.class));

while (true) {
long count = countDownLatch.getCount();
if (count > 0) {
break;
}
for (Channel channel : peer1.getChannels()) {
channel.send(0, false);
channel.send("hello world!");
}
TimeUnit.MILLISECONDS.sleep(50);
}


peer2.close();
peer1.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.p2p.support;

import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.p2p.Group;
import org.apache.dubbo.remoting.p2p.Networkers;
import org.apache.dubbo.remoting.p2p.Peer;
import org.apache.dubbo.remoting.transport.ChannelHandlerAdapter;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;

public class MulticastNetworkerTest {

@Test
public void testJoin() throws RemotingException, InterruptedException {
final String groupURL = "multicast://224.5.6.7:1234";
final String peerURL = "dubbo://0.0.0.0:" + NetUtils.getAvailablePort();

final CountDownLatch countDownLatch = new CountDownLatch(1);
Peer peer1 = Networkers.join(groupURL, peerURL, new ChannelHandlerAdapter() {
@Override
public void received(Channel channel, Object message) {
countDownLatch.countDown();
}
});
Peer peer2 = Networkers.join(groupURL, "dubbo://0.0.0.0:" + NetUtils.getAvailablePort(),
mock(ChannelHandlerAdapter.class));

while (true) {
long count = countDownLatch.getCount();
if (count > 0) {
break;
}
for (Channel channel : peer1.getChannels()) {
channel.send("hello world!");
}
TimeUnit.MILLISECONDS.sleep(50);
}

Group lookup = Networkers.lookup(groupURL);
assertThat(lookup, not(nullValue()));

peer2.close();
peer1.close();
}
}
Loading

0 comments on commit 9ee83e4

Please sign in to comment.