Skip to content

Commit

Permalink
Merge branch 'JGRP-2767'
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Feb 22, 2024
2 parents 2c55cb7 + 92f36bc commit c6cc942
Show file tree
Hide file tree
Showing 13 changed files with 56 additions and 62 deletions.
2 changes: 1 addition & 1 deletion src/org/jgroups/blocks/cs/BaseServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public void receive(Address sender, DataInput in, int len) throws Exception {
"max accepted length (%s): discarding the message",
Util.printBytes(len), sender, Util.printBytes(max_length)));
if(this.receiver != null)
this.receiver.receive(sender, in);
this.receiver.receive(sender, in, len);
else {
// discard len bytes (in.skip() is not guaranteed to discard *all* len bytes)
byte[] buf=new byte[len];
Expand Down
16 changes: 15 additions & 1 deletion src/org/jgroups/blocks/cs/Receiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,19 @@ default void receive(Address sender, ByteBuffer buf) {
Util.bufferToArray(sender, buf, this);
}

void receive(Address sender, DataInput in) throws Exception;
@Deprecated(since="5.3.3",forRemoval=true)
default void receive(Address sender, DataInput in) throws Exception {
receive(sender, in, -1);
}

/**
* Receive data from the given sender
* @param sender The sender
* @param in The data input from which to read
* @param length The number of bytes to read
* @throws Exception
*/
default void receive(Address sender, DataInput in, int length) throws Exception {

}
}
5 changes: 0 additions & 5 deletions src/org/jgroups/blocks/cs/ReceiverAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import org.jgroups.Address;

import java.io.DataInput;

/**
* An impl of {@link Receiver}. Will get removed with the switch to Java 8; instead we'll use a default impl in Receiver
* @author Bela Ban
Expand All @@ -14,7 +12,4 @@ public void receive(Address sender, byte[] buf, int offset, int length) {

}

public void receive(Address sender, DataInput in) throws Exception {

}
}
20 changes: 6 additions & 14 deletions src/org/jgroups/demos/PubClient.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package org.jgroups.demos;

import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.blocks.cs.*;
import org.jgroups.util.Bits;
import org.jgroups.util.Util;

import java.io.BufferedInputStream;
Expand Down Expand Up @@ -32,23 +30,20 @@ public PubClient(String name) {
@Override
public void receive(Address sender, ByteBuffer buf) {
byte[] buffer=buf.array();
int len=Bits.readInt(buffer, buf.arrayOffset());
String msg=new String(buffer, buf.arrayOffset()+Global.INT_SIZE, len);
String msg=new String(buffer, buf.arrayOffset(), buf.remaining());
System.out.printf("-- %s\n", msg);
}

@Override
public void receive(Address sender, byte[] buf, int offset, int length) {
int len=Bits.readInt(buf, offset);
String msg=new String(buf, offset+Global.INT_SIZE, len);
String msg=new String(buf, offset, length);
System.out.printf("-- %s\n", msg);
}

@Override public void receive(Address sender, DataInput in) throws Exception {
int len=in.readInt();
byte[] buf=new byte[len];
@Override public void receive(Address sender, DataInput in, int length) throws Exception {
byte[] buf=new byte[length];
in.readFully(buf);
String msg=new String(buf, 0, buf.length);
String msg=new String(buf);
System.out.printf("-- %s\n", msg);
}

Expand Down Expand Up @@ -98,10 +93,7 @@ protected void eventLoop() {

protected void send(String str) throws Exception {
byte[] buf=str.getBytes();
byte[] data=new byte[Global.INT_SIZE + buf.length];
Bits.writeInt(buf.length, data, 0);
System.arraycopy(buf, 0, data, Global.INT_SIZE, buf.length);
((Client)client).send(data, 0, data.length);
((Client)client).send(buf, 0, buf.length);
}

public static void main(String[] args) throws Exception {
Expand Down
15 changes: 7 additions & 8 deletions src/org/jgroups/demos/PubServer.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package org.jgroups.demos;

import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.blocks.cs.*;
import org.jgroups.blocks.cs.BaseServer;
import org.jgroups.blocks.cs.NioServer;
import org.jgroups.blocks.cs.Receiver;
import org.jgroups.blocks.cs.TcpServer;
import org.jgroups.jmx.JmxConfigurator;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
import org.jgroups.stack.IpAddress;
import org.jgroups.util.Bits;
import org.jgroups.util.DefaultSocketFactory;
import org.jgroups.util.DefaultThreadFactory;
import org.jgroups.util.Util;
Expand Down Expand Up @@ -63,11 +64,9 @@ public void receive(Address sender, byte[] buf, int offset, int length) {
}
}

public void receive(Address sender, DataInput in) throws Exception {
int len=in.readInt();
byte[] buf=new byte[len + Global.INT_SIZE];
Bits.writeInt(len, buf, 0);
in.readFully(buf, Global.INT_SIZE, len);
public void receive(Address sender, DataInput in, int length) throws Exception {
byte[] buf=new byte[length];
in.readFully(buf, 0, length);
server.send(null, buf, 0, buf.length);
}

Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/protocols/FD_SOCK2.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,14 @@ public Object down(Event evt) {

public void receive(Address sender, byte[] buf, int offset, int length) {
try {
receive(sender, new ByteArrayDataInputStream(buf, offset, length));
receive(sender, new ByteArrayDataInputStream(buf, offset, length), length);
}
catch(Exception e) {
log.error("failed handling message received from " + sender, e);
}
}

public void receive(Address sender, DataInput in) throws Exception {
public void receive(Address sender, DataInput in, int length) throws Exception {
Message msg=new EmptyMessage();
msg.readFrom(in);
FdHeader hdr=msg.getHeader(id);
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,7 @@ public void receive(Address sender, byte[] data, int offset, int length) {
handleSingleMessage(in, multicast);
}

public void receive(Address sender, DataInput in) throws Exception {
public void receive(Address sender, DataInput in, int ignoredLength) throws Exception {
if(in == null) return;

// drop message from self; it has already been looped back up (https://issues.redhat.com/browse/JGRP-1765)
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/stack/GossipRouter.java
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public void receive(Address sender, ByteBuffer buf) {
}
}

public void receive(Address sender, DataInput in) throws Exception {
public void receive(Address sender, DataInput in, int length) throws Exception {
GossipType type=GossipType.values()[in.readByte()];

GossipData request=null;
Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/stack/RouterStub.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ public void sendToMember(String group, Address dest, Address sender, byte[] data

@Override
public void receive(Address sender, byte[] buf, int offset, int length) {
receive(sender, new ByteArrayDataInputStream(buf, offset, length));
receive(sender, new ByteArrayDataInputStream(buf, offset, length), length);
}

@Override
public void receive(Address sender, DataInput in) {
public void receive(Address sender, DataInput in, int length) {
try {
GossipData data=new GossipData();
data.readFrom(in);
Expand Down
19 changes: 8 additions & 11 deletions tests/byteman/org/jgroups/tests/byteman/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import org.jboss.byteman.contrib.bmunit.BMScript;
import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.blocks.cs.*;
import org.jgroups.util.Bits;
import org.jgroups.blocks.cs.BaseServer;
import org.jgroups.blocks.cs.NioServer;
import org.jgroups.blocks.cs.ReceiverAdapter;
import org.jgroups.blocks.cs.TcpServer;
import org.jgroups.util.ResourceManager;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -137,11 +139,8 @@ protected static BaseServer create(boolean nio, int port) {

protected static void send(String str, BaseServer server, Address dest) {
byte[] request=str.getBytes();
byte[] data=new byte[request.length + Global.INT_SIZE];
Bits.writeInt(request.length, data, 0);
System.arraycopy(request, 0, data, Global.INT_SIZE, request.length);
try {
server.send(dest, data, 0, data.length);
server.send(dest, request, 0, request.length);
}
catch(Exception e) {
System.err.println("Failed sending a request to " + dest + ": " + e);
Expand Down Expand Up @@ -179,17 +178,15 @@ public MyReceiver(String name) {
public void clear() {reqs.clear();}

public void receive(Address sender, byte[] data, int offset, int length) {
int len=Bits.readInt(data, offset);
String str=new String(data, offset+Global.INT_SIZE, len);
String str=new String(data, offset, length);
System.out.println("[" + name + "] received request \"" + str + "\" from " + sender);
synchronized(reqs) {
reqs.add(str);
}
}

public void receive(Address sender, DataInput in) throws Exception {
int len=in.readInt();
byte[] data=new byte[len];
public void receive(Address sender, DataInput in, int length) throws Exception {
byte[] data=new byte[length];
in.readFully(data, 0, data.length);
String str=new String(data);
System.out.println("[" + name + "] received request \"" + str + "\" from " + sender);
Expand Down
15 changes: 9 additions & 6 deletions tests/junit-functional/org/jgroups/tests/ServerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,19 @@ protected static BaseServer create(boolean nio, int port) {
}
}

protected static void send(String str, BaseServer server, Address dest) throws Exception {
protected static void sendOld(String str, BaseServer server, Address dest) throws Exception {
byte[] request=str.getBytes();
byte[] data=new byte[request.length + Global.INT_SIZE];
Bits.writeInt(request.length, data, 0);
System.arraycopy(request, 0, data, Global.INT_SIZE, request.length);
server.send(dest, data, 0, data.length);
}

protected static void send(String str, BaseServer server, Address dest) throws Exception {
byte[] request=str.getBytes();
server.send(dest, request, 0, request.length);
}


protected static class Sender extends Thread {
protected final CountDownLatch latch;
Expand Down Expand Up @@ -230,18 +235,16 @@ public MyReceiver(String name) {


public void receive(Address sender, byte[] data, int offset, int length) {
int len=Bits.readInt(data, offset);
String str=new String(data, offset+Global.INT_SIZE, len);
String str=new String(data, offset, length);
if(verbose)
System.out.println("[" + name + "] received request \"" + str + "\" from " + sender);
synchronized(reqs) {
reqs.add(str);
}
}

public void receive(Address sender, DataInput in) throws Exception {
int len=in.readInt();
byte[] data=new byte[len];
public void receive(Address sender, DataInput in, int length) throws Exception {
byte[] data=new byte[length];
in.readFully(data, 0, data.length);
String str=new String(data);
if(verbose)
Expand Down
11 changes: 3 additions & 8 deletions tests/junit-functional/org/jgroups/tests/ServerUnitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.jgroups.blocks.cs.NioServer;
import org.jgroups.blocks.cs.ReceiverAdapter;
import org.jgroups.blocks.cs.TcpServer;
import org.jgroups.util.Bits;
import org.jgroups.util.CondVar;
import org.jgroups.util.Util;
import org.testng.Assert;
Expand Down Expand Up @@ -274,11 +273,8 @@ public void testAsyncConnectThenSend() throws Exception {
}

protected static void send(byte[] request, BaseServer server, Address dest) {
byte[] data=new byte[request.length + Global.INT_SIZE];
Bits.writeInt(request.length, data, 0);
System.arraycopy(request, 0, data, Global.INT_SIZE, request.length);
try {
server.send(dest, data, 0, data.length);
server.send(dest, request, 0, request.length);
}
catch(Exception e) {
System.err.println("Failed sending a request to " + dest + ": " + e);
Expand Down Expand Up @@ -369,9 +365,8 @@ public synchronized void receive(Address sender, byte[] data, int offset, int le
}
}

public synchronized void receive(Address sender, DataInput in) throws Exception {
int len=in.readInt();
byte[] buf=new byte[len];
public synchronized void receive(Address sender, DataInput in, int length) throws Exception {
byte[] buf=new byte[length];
in.readFully(buf);
// System.out.printf("[tcp] from %s: %d bytes\n", sender, len);
long tmp=num_received.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.jgroups.tests.rt.transports;

import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.blocks.cs.*;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
Expand Down Expand Up @@ -121,7 +120,7 @@ public void receive(Address sender, byte[] buf, int offset, int length) {
}
}

public void receive(Address sender, DataInput in) throws Exception {
public void receive(Address sender, DataInput in, int length) throws Exception {
if(receiver == null)
return;
byte[] buf=new byte[RoundTrip.PAYLOAD];
Expand Down

0 comments on commit c6cc942

Please sign in to comment.