diff --git a/doc/UtpBinaryTransfer.md b/doc/UtpBinaryTransfer.md new file mode 100644 index 00000000..75e0db58 --- /dev/null +++ b/doc/UtpBinaryTransfer.md @@ -0,0 +1,65 @@ +# UTP in IPv8 + +uTP is a protocol based on the UDP used to share files in a peer-to-peer fashion. In this project, we have integrated an existing `utp4j` library with the IPv8 infrastructure to allow for binary data transfer to users on the existing network. + +## UtpCommunity + +To distinguish between the capabilities of different peers, we have created our own `UtpCommunity` to show that the peer supports uTP transfer. We use this community to implement some useful control messages in our infrastructure. These are: +- `UTP_HEARTBEAT` - used to signal that the peer has a working uTP endpoint and can accept connections. +- `UTP_TRANSFER_REQUEST` - used as a handshake for establishing a file transfer. + +### UtpHelper + +To separate some of the logic of sending messages over the protocol, we have created a `UtpHelper` class. It holds all the utility methods used by peers sending data and creates a coroutine to send heartbeat messages. + +#### Heartbeats + +The heartbeat service is started right after `UtpCommunity` initialization. It creates a separate coroutine which is active as long as the uTP endpoint is open. A heartbeat message is sent every 5 seconds to each peer in the community to signal own liveness. + +For now the heartbeat has an empty payload, but could be used to hold some useful data to be sent to all the peers. + +#### Data transfer + +For file transfer we have two separate methods exposed to the other classes. One allows to send file data directly, the other is used to generate and send a random, hash verified data. The transfer works as follows: +- First the `UTP_TRANSFER_REQUEST` message is sent to request to the other peer that we want to send them a data with certain metadata (name, size, type (random/file)). +- We wait for the response from the other peer. They can either accept or decline. +- If they decline, we abort the transfer. If it is accepted, we send the data using the uTP endpoint. + +Peers which have not established the handshake cannot transfer data. All the packets that would be sent directly to the endpoint, would be dropped. This mechanism is used as a basic protection against the DOS attacks. Current defaults always accept the connection, but further conditions could be added to combat malicious transfers. + +## UtpIPv8Endpoint + +This endpoint is the main element of the uTP transfer. It uses the previously mentioned existing library to establish transfer of data between two peers using UDP. It is an extension of the existing `UdpEndpoint`. The existing endpoint creates an instance of `UtpIPv8Endpoint` similar to what TFTP has done. Then every packet that is intended to be sent to our endpoint is prefixed with a special byte (`0x42`) and routed to be further processed. + +In the actual endpoint code, we have a logic for both sending (client) and receiving (server) data. Most of the code there is based on the examples shown in the library already, just refactored to use Kotlin features. + +Every routed packed is processed in the `onPacket` function. Stripped off the prefix, checked for the handshake details and finally forwarded to the custom client and server socket. + +### UtpSocket + +As we are using IPv8 layer for our transfer, we could not use usual implementations of the `DatagramSocket`. We have created a middle layer in-between the IPv8 and `utp4j` library to allow for a seamless integration. It uses the raw IPv8 socket to send messages prefixed with out special byte. For receiving it has a separate receiving channel which is filled by the `onPacket` function in the endpoint. This gives us an elegant solution, which does not interfere with the code of the library. + +## Testing + +All the code has been tested and achieved 86% instruction coverage. + +## Remaining issues + +Unfortunately due to the timeframe we were not able to fully check the quality of every element and some solutions could be done better. Smaller existing issues are mentioned directly in the code. + +The main improvements would be: +- code structure +- missing listener API (which would allow for displaying transfer data progress) +- more robust handshake architecture +- test code seems to be flaky due to OutOfMemory errors when run in bulk + +# See also + + +| Name | Link | +| :-: | :-: | +| Main project issue | https://github.com/Tribler/tribler/issues/7911 | +| Main app description | https://github.com/Eragoneq/trustchain-superapp/blob/master/doc/BlockchainNetworkingProject/UtpTesting.md | +| Benchmarking done over 10 weeks | https://github.com/Eragoneq/trustchain-superapp/blob/master/doc/BlockchainNetworkingProject/Benchmarking.md | +| utp4j changes and benchmarks | https://github.com/PieterCarton/utp4j/blob/master/CHANGES.md | +| kotlin-ipv8 changes | https://github.com/Eragoneq/kotlin-ipv8/blob/master/doc/UtpBinaryTransfer.md | diff --git a/ipv8-android/src/main/java/nl/tudelft/ipv8/android/IPv8Android.kt b/ipv8-android/src/main/java/nl/tudelft/ipv8/android/IPv8Android.kt index b7a99bfd..99cf7b7a 100644 --- a/ipv8-android/src/main/java/nl/tudelft/ipv8/android/IPv8Android.kt +++ b/ipv8-android/src/main/java/nl/tudelft/ipv8/android/IPv8Android.kt @@ -109,6 +109,7 @@ object IPv8Android { connectivityManager, ) + val bluetoothManager = application.getSystemService<BluetoothManager>() ?: throw IllegalStateException("BluetoothManager not found") diff --git a/ipv8/build.gradle b/ipv8/build.gradle index 5a00e0e0..71f3adea 100644 --- a/ipv8/build.gradle +++ b/ipv8/build.gradle @@ -35,6 +35,11 @@ ktlint { } } +test { + minHeapSize = "512M" + maxHeapSize = "2048M" +} + jacocoTestReport { reports { xml.required = true @@ -81,6 +86,7 @@ dependencies { // https://mvnrepository.com/artifact/org.bouncycastle/bcprov-jdk15on implementation group: 'org.bouncycastle', name: 'bcprov-jdk15to18', version: '1.63' + api 'com.github.PieterCarton:utp4j:v0.0.2' } diff --git a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/EndpointAggregator.kt b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/EndpointAggregator.kt index d42cb9af..8c6f5d62 100644 --- a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/EndpointAggregator.kt +++ b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/EndpointAggregator.kt @@ -12,7 +12,7 @@ import java.util.* */ class EndpointAggregator( val udpEndpoint: UdpEndpoint?, - val bluetoothEndpoint: BluetoothEndpoint? + val bluetoothEndpoint: BluetoothEndpoint?, ) : Endpoint<Peer>(), EndpointListener { private var isOpen: Boolean = false diff --git a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/Serialization.kt b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/Serialization.kt index de3edf4b..a9e2b73a 100644 --- a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/Serialization.kt +++ b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/Serialization.kt @@ -2,11 +2,15 @@ package nl.tudelft.ipv8.messaging import java.nio.Buffer import java.nio.ByteBuffer +import kotlin.reflect.full.isSubclassOf +import kotlin.reflect.full.memberProperties +import kotlin.reflect.full.primaryConstructor const val SERIALIZED_USHORT_SIZE = 2 const val SERIALIZED_UINT_SIZE = 4 +const val SERIALIZED_INT_SIZE = 4 const val SERIALIZED_ULONG_SIZE = 8 -const val SERIALIZED_LONG_SIZE = 4 +const val SERIALIZED_LONG_SIZE = 8 const val SERIALIZED_UBYTE_SIZE = 1 const val SERIALIZED_PUBLIC_KEY_SIZE = 74 @@ -20,6 +24,80 @@ interface Serializable { interface Deserializable<T> { fun deserialize(buffer: ByteArray, offset: Int = 0): Pair<T, Int> + +} + +/** + * Serializes the object and returns the buffer. + * Alternative to manually defining the serialize function. + */ +interface AutoSerializable : Serializable { + override fun serialize(): ByteArray { + return this::class.primaryConstructor!!.parameters.map { param -> + val value = + this.javaClass.kotlin.memberProperties.find { it.name == param.name }!!.get(this) + simpleSerialize(value) + }.reduce { acc, bytes -> acc + bytes } + } +} + +///** +// * Deserializes the object from the buffer and returns the object and the new offset. +// * Alternative to manually defining the deserialize function. +// */ +//inline fun <reified T> Deserializable<T>.autoDeserialize(buffer: ByteArray, offset: Int = 0): Pair<T, Int> { +// TODO() +//} + +fun <U> simpleSerialize(data: U): ByteArray { + return when (data) { + is Int -> serializeInt(data) + is Long -> serializeLong(data) + is UByte -> serializeUChar(data) + is UInt -> serializeUInt(data) + is UShort -> serializeUShort(data.toInt()) + is ULong -> serializeULong(data) + is String -> serializeVarLen(data.toByteArray()) + is ByteArray -> serializeVarLen(data) + is Boolean -> serializeBool(data) + is Enum<*> -> serializeUInt(data.ordinal.toUInt()) + is Serializable -> data.serialize() + else -> throw IllegalArgumentException("Unsupported serialization type") + } +} + +inline fun <reified U> simpleDeserialize(buffer: ByteArray, offset: Int = 0): Pair<U, Int> { + val (value, off) = when (U::class) { + Int::class -> Pair(deserializeInt(buffer, offset) as U, SERIALIZED_INT_SIZE) + Long::class -> Pair(deserializeLong(buffer, offset) as U, SERIALIZED_LONG_SIZE) + UByte::class -> Pair(deserializeUChar(buffer, offset) as U, SERIALIZED_UBYTE_SIZE) + UShort::class -> Pair( + deserializeUShort(buffer, offset).toUShort() as U, + SERIALIZED_USHORT_SIZE + ) + + UInt::class -> Pair(deserializeUInt(buffer, offset) as U, SERIALIZED_UINT_SIZE) + ULong::class -> Pair(deserializeULong(buffer, offset) as U, SERIALIZED_ULONG_SIZE) + String::class -> { + val (data, len) = deserializeVarLen(buffer, offset) + Pair(data.decodeToString() as U, len) + } + + ByteArray::class -> { + val (data, len) = deserializeVarLen(buffer, offset) + Pair(data as U, len) + } + + Boolean::class -> Pair(deserializeBool(buffer, offset) as U, 1) + else -> { + if (U::class.isSubclassOf(Enum::class)) { + val ordinal = deserializeUInt(buffer, offset).toInt() + val values = U::class.java.enumConstants + Pair(values[ordinal] as U, SERIALIZED_UINT_SIZE) + } else throw IllegalArgumentException("Unsupported deserialization type") + } + } + return (value to (offset + off)) } fun serializeBool(data: Boolean): ByteArray { @@ -40,10 +118,21 @@ fun serializeUShort(value: Int): ByteArray { return bytes } +fun serializeUShort(value: UShort): ByteArray { + val bytes = ByteBuffer.allocate(SERIALIZED_USHORT_SIZE) + bytes.putShort(value.toShort()) + return bytes.array() +} + fun deserializeUShort(buffer: ByteArray, offset: Int = 0): Int { return (((buffer[offset].toInt() and 0xFF) shl 8) or (buffer[offset + 1].toInt() and 0xFF)) } +fun deserializeRealUShort(buffer: ByteArray, offset: Int = 0): UShort { + val buf = ByteBuffer.wrap(buffer, offset, SERIALIZED_USHORT_SIZE) + return buf.short.toUShort() +} + fun serializeUInt(value: UInt): ByteArray { val bytes = UByteArray(SERIALIZED_UINT_SIZE) for (i in 0 until SERIALIZED_UINT_SIZE) { @@ -80,7 +169,7 @@ fun deserializeULong(buffer: ByteArray, offset: Int = 0): ULong { fun serializeLong(value: Long): ByteArray { val buffer = ByteBuffer.allocate(SERIALIZED_LONG_SIZE) - buffer.putInt(value.toInt()) + buffer.putLong(value) return buffer.array() } @@ -89,7 +178,20 @@ fun deserializeLong(bytes: ByteArray, offset: Int = 0): Long { buffer.put(bytes.copyOfRange(offset, offset + SERIALIZED_LONG_SIZE)) // In JDK 8 this returns a Buffer. (buffer as Buffer).flip() - return buffer.int.toLong() + return buffer.long +} + +fun serializeInt(value: Int): ByteArray { + val buffer = ByteBuffer.allocate(SERIALIZED_INT_SIZE) + buffer.putInt(value) + return buffer.array() +} + +fun deserializeInt(bytes: ByteArray, offset: Int = 0): Int { + val buffer = ByteBuffer.allocate(SERIALIZED_INT_SIZE) + buffer.put(bytes.copyOfRange(offset, offset + SERIALIZED_INT_SIZE)) + buffer.flip() + return buffer.int } fun serializeUChar(char: UByte): ByteArray { @@ -107,8 +209,10 @@ fun serializeVarLen(bytes: ByteArray): ByteArray { fun deserializeVarLen(buffer: ByteArray, offset: Int = 0): Pair<ByteArray, Int> { val len = deserializeUInt(buffer, offset).toInt() - val payload = buffer.copyOfRange(offset + SERIALIZED_UINT_SIZE, - offset + SERIALIZED_UINT_SIZE + len) + val payload = buffer.copyOfRange( + offset + SERIALIZED_UINT_SIZE, + offset + SERIALIZED_UINT_SIZE + len + ) return Pair(payload, SERIALIZED_UINT_SIZE + len) } @@ -117,19 +221,31 @@ fun deserializeRecursively(buffer: ByteArray, offset: Int = 0): Array<ByteArray> return arrayOf() } val len = deserializeUInt(buffer, offset).toInt() - val payload = buffer.copyOfRange(offset + SERIALIZED_UINT_SIZE, - offset + SERIALIZED_UINT_SIZE + len) - return arrayOf(payload) + deserializeRecursively(buffer.copyOfRange(offset + SERIALIZED_UINT_SIZE + len, - buffer.size), offset) + val payload = buffer.copyOfRange( + offset + SERIALIZED_UINT_SIZE, + offset + SERIALIZED_UINT_SIZE + len + ) + return arrayOf(payload) + deserializeRecursively( + buffer.copyOfRange( + offset + SERIALIZED_UINT_SIZE + len, + buffer.size + ), offset + ) } -fun deserializeAmount(buffer: ByteArray, amount: Int, offset: Int = 0): Pair<Array<ByteArray>, ByteArray> { +fun deserializeAmount( + buffer: ByteArray, + amount: Int, + offset: Int = 0 +): Pair<Array<ByteArray>, ByteArray> { val returnValues = arrayListOf<ByteArray>() var localOffset = offset for (i in 0 until amount) { val len = deserializeUInt(buffer, localOffset).toInt() - val payload = buffer.copyOfRange(localOffset + SERIALIZED_UINT_SIZE, - localOffset + SERIALIZED_UINT_SIZE + len) + val payload = buffer.copyOfRange( + localOffset + SERIALIZED_UINT_SIZE, + localOffset + SERIALIZED_UINT_SIZE + len + ) localOffset += SERIALIZED_UINT_SIZE + len returnValues.add(payload) } diff --git a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/payload/TransferRequestPayload.kt b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/payload/TransferRequestPayload.kt new file mode 100644 index 00000000..ef0d0a0e --- /dev/null +++ b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/payload/TransferRequestPayload.kt @@ -0,0 +1,38 @@ +package nl.tudelft.ipv8.messaging.payload + +import nl.tudelft.ipv8.messaging.AutoSerializable +import nl.tudelft.ipv8.messaging.Deserializable +import nl.tudelft.ipv8.messaging.Serializable +import nl.tudelft.ipv8.messaging.simpleDeserialize + +data class TransferRequestPayload( + val filename: String, + val status: TransferStatus, + val type: TransferType, + val dataSize: Int = 0 +) : AutoSerializable { + + companion object Deserializer : Deserializable<TransferRequestPayload> { + override fun deserialize( + buffer: ByteArray, + offset: Int + ): Pair<TransferRequestPayload, Int> { + val (filename, newOffset) = simpleDeserialize<String>(buffer, offset) + val (status, newOffset1) = simpleDeserialize<TransferStatus>(buffer, newOffset) + val (type, newOffset2) = simpleDeserialize<TransferType>(buffer, newOffset1) + val (dataSize, newOffset3) = simpleDeserialize<Int>(buffer, newOffset2) + return Pair(TransferRequestPayload(filename, status, type, dataSize), newOffset3) + } + } + + enum class TransferStatus { + REQUEST, + ACCEPT, + DECLINE + } + + enum class TransferType { + FILE, + RANDOM_DATA + } +} diff --git a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/payload/UtpHeartbeatPayload.kt b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/payload/UtpHeartbeatPayload.kt new file mode 100644 index 00000000..297d3cbf --- /dev/null +++ b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/payload/UtpHeartbeatPayload.kt @@ -0,0 +1,18 @@ +package nl.tudelft.ipv8.messaging.payload + +import nl.tudelft.ipv8.messaging.Deserializable +import nl.tudelft.ipv8.messaging.Serializable + +class UtpHeartbeatPayload( + +) : Serializable { + override fun serialize(): ByteArray { + return byteArrayOf() + } + + companion object Deserializer : Deserializable<UtpHeartbeatPayload> { + override fun deserialize(buffer: ByteArray, offset: Int): Pair<UtpHeartbeatPayload, Int> { + return Pair(UtpHeartbeatPayload(), 0) + } + } +} diff --git a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/udp/UdpEndpoint.kt b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/udp/UdpEndpoint.kt index 38779672..06105916 100644 --- a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/udp/UdpEndpoint.kt +++ b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/udp/UdpEndpoint.kt @@ -9,6 +9,7 @@ import nl.tudelft.ipv8.messaging.Endpoint import nl.tudelft.ipv8.messaging.EndpointListener import nl.tudelft.ipv8.messaging.Packet import nl.tudelft.ipv8.messaging.tftp.TFTPEndpoint +import nl.tudelft.ipv8.messaging.utp.UtpIPv8Endpoint import java.io.IOException import java.net.* @@ -18,6 +19,7 @@ open class UdpEndpoint( private val port: Int, private val ip: InetAddress, private val tftpEndpoint: TFTPEndpoint = TFTPEndpoint(), + val utpIPv8Endpoint: UtpIPv8Endpoint = UtpIPv8Endpoint(), ) : Endpoint<Peer>() { private var socket: DatagramSocket? = null @@ -41,6 +43,7 @@ open class UdpEndpoint( } }, ) + this.addListener(utpIPv8Endpoint) } override fun isOpen(): Boolean { @@ -88,6 +91,16 @@ open class UdpEndpoint( } } + /** + * Send whole binary data to the given address using UTP. + */ + fun sendUtp( + address: IPv4Address, + data: ByteArray, + ) { + utpIPv8Endpoint.send(address, data) + } + override fun open() { val socket = getDatagramSocket() this.socket = socket @@ -95,6 +108,9 @@ open class UdpEndpoint( tftpEndpoint.socket = socket tftpEndpoint.open() + utpIPv8Endpoint.udpSocket = socket + utpIPv8Endpoint.open() + logger.info { "Opened UDP socket on port ${socket.localPort}" } startLanEstimation() @@ -126,6 +142,7 @@ open class UdpEndpoint( bindJob = null tftpEndpoint.close() + utpIPv8Endpoint.close() socket?.close() socket = null @@ -185,7 +202,7 @@ open class UdpEndpoint( "${receivePacket.address.hostAddress}:${receivePacket.port}", ) - // Check whether prefix is IPv8 or TFTP + // Check whether prefix is IPv8 or TFTP or UTP when (receivePacket.data[0]) { Community.PREFIX_IPV8 -> { val sourceAddress = @@ -201,6 +218,9 @@ open class UdpEndpoint( TFTPEndpoint.PREFIX_TFTP -> { tftpEndpoint.onPacket(receivePacket) } + UtpIPv8Endpoint.PREFIX_UTP -> { + utpIPv8Endpoint.onPacket(receivePacket) + } else -> { logger.warn { "Invalid packet prefix" } } diff --git a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/UtpCommunity.kt b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/UtpCommunity.kt new file mode 100644 index 00000000..4cda6adb --- /dev/null +++ b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/UtpCommunity.kt @@ -0,0 +1,90 @@ +package nl.tudelft.ipv8.messaging.utp + +import nl.tudelft.ipv8.Community +import nl.tudelft.ipv8.IPv4Address +import nl.tudelft.ipv8.Peer +import nl.tudelft.ipv8.exception.PacketDecodingException +import nl.tudelft.ipv8.messaging.Packet +import nl.tudelft.ipv8.messaging.payload.TransferRequestPayload +import nl.tudelft.ipv8.messaging.payload.TransferRequestPayload.TransferStatus +import nl.tudelft.ipv8.messaging.payload.TransferRequestPayload.TransferType +import nl.tudelft.ipv8.messaging.payload.UtpHeartbeatPayload +import nl.tudelft.ipv8.util.toHex +import java.util.Date + +/** + * A community for the UTP protocol. + */ +class UtpCommunity : Community() { + override val serviceId = "450ded7389134595dadb6b2549f431ad60156931" + + val lastHeartbeat = mutableMapOf<String, Date>() + val transferRequests = mutableMapOf<String, TransferRequestPayload>() + val utpHelper = UtpHelper(this) + + object MessageId { + const val UTP_HEARTBEAT = 1 + const val UTP_TRANSFER_REQUEST = 2 + } + init { + messageHandlers[MessageId.UTP_HEARTBEAT] = ::onHeartbeat + messageHandlers[MessageId.UTP_TRANSFER_REQUEST] = ::onTransferRequest + utpHelper.startHeartbeat() + } + + fun sendHeartbeat() { + val payload = UtpHeartbeatPayload() + val packet = serializePacket(MessageId.UTP_HEARTBEAT, payload, sign = false) + + println("Sending heartbeat ${packet.toHex()}") + + for (peer in getPeers()) { + send(peer, packet) + } + } + + internal fun onHeartbeat(p: Packet) { + val peer = getPeers().find { it.address == p.source } + val payload = p.getPayload(UtpHeartbeatPayload.Deserializer) + + if (peer != null) { + lastHeartbeat[peer.mid] = Date() + } + + println("Received heartbeat from $peer: $payload") + println("Last heartbeat: ${lastHeartbeat[peer?.mid]}") + } + + fun sendTransferRequest(peer: Peer, filename: String, dataSize: Int, dataType: TransferType) { + val payload = TransferRequestPayload(filename, TransferStatus.REQUEST, dataType, dataSize) + val packet = serializePacket(MessageId.UTP_TRANSFER_REQUEST, payload) + + send(peer, packet) + } + + fun sendTransferResponse(peer: Peer, payload: TransferRequestPayload) { + val packet = serializePacket(MessageId.UTP_TRANSFER_REQUEST, payload) + send(peer, packet) + } + + /** + * Allows the user to accept or decline a transfer request, or handle any custom logic. + */ + internal fun onTransferRequest(p: Packet) { + try { + val (peer, payload) = p.getAuthPayload(TransferRequestPayload.Deserializer) + if (payload.status == TransferStatus.REQUEST) { + // Accept the transfer request by default + val acceptedPayload = payload.copy(status = TransferStatus.ACCEPT) + sendTransferResponse(peer, acceptedPayload) + endpoint.udpEndpoint?.utpIPv8Endpoint!!.permittedTransfers[peer.address] = acceptedPayload + } + transferRequests[peer.mid] = payload + + println("Received transfer request from $peer: $payload") + } catch (e: PacketDecodingException) { + println("Failed to handle transfer request: ${e.message}") + } + } + +} diff --git a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/UtpHelper.kt b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/UtpHelper.kt new file mode 100644 index 00000000..c28c6a7f --- /dev/null +++ b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/UtpHelper.kt @@ -0,0 +1,123 @@ +package nl.tudelft.ipv8.messaging.utp + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withTimeoutOrNull +import kotlinx.coroutines.yield +import nl.tudelft.ipv8.IPv4Address +import nl.tudelft.ipv8.Peer +import nl.tudelft.ipv8.messaging.payload.TransferRequestPayload +import nl.tudelft.ipv8.messaging.payload.TransferRequestPayload.TransferType +import java.nio.ByteBuffer +import java.security.MessageDigest +import kotlin.random.Random + +class UtpHelper( + private val utpCommunity: UtpCommunity +) { + + /** + * Scope used for network operations + */ + private val job = SupervisorJob() + private val scope = CoroutineScope(Dispatchers.IO + job) + private lateinit var heartbeatJob : Job + + fun startHeartbeat() { + heartbeatJob = scope.launch(Dispatchers.IO) { + while (isActive) { + if (utpCommunity.endpoint.udpEndpoint?.utpIPv8Endpoint?.isOpen() == true) { + utpCommunity.sendHeartbeat() + delay(5000) + } else { + this.cancel() + } + } + } + } + + fun stopHeartbeat() { + heartbeatJob.cancel() + } + + fun sendFileData(peer: Peer, metadata: NamedResource, data: ByteArray) { + sendData(peer, metadata, data, TransferType.FILE) + } + + fun sendRandomData(peer: Peer, size: Int = UtpIPv8Endpoint.getBufferSize()) { + sendData(peer, NamedResource("random.tmp", 0, size), generateRandomDataBuffer(size), TransferType.RANDOM_DATA) + } + + private fun sendData(peer: Peer, metadata: NamedResource, data: ByteArray, type: TransferType) { + scope.launch(Dispatchers.IO) { + utpCommunity.sendTransferRequest( + peer, + metadata.name, + metadata.size, + type + ) + if (!waitForTransferResponse(peer)) return@launch + println("Sending data to $peer") + utpCommunity.endpoint.udpEndpoint?.sendUtp( + IPv4Address( + peer.address.ip, + peer.address.port + ), data + ) + } + } + + private suspend fun waitForTransferResponse(peer: Peer): Boolean { + // Wait for response or timeout + // TODO: This should be handled with a proper pattern instead of a timeout + println("Waiting for response from $peer") + val payload = withTimeoutOrNull(5000) { + while (!utpCommunity.transferRequests.containsKey(peer.mid)) { + yield() + } + println("Received response from $peer") + return@withTimeoutOrNull utpCommunity.transferRequests.remove(peer.mid) + } + if (payload == null) { + println("No response from $peer, aborting transfer") + return false + } + if (payload.status == TransferRequestPayload.TransferStatus.DECLINE) { + println("Peer $peer declined transfer") + return false + } + return true + } + + data class NamedResource( + val name: String, + val id: Int, + val size: Int = 0, + ) { + override fun toString() = name + } + + companion object { + fun generateRandomDataBuffer(size: Int = UtpIPv8Endpoint.getBufferSize()): ByteArray { + if (size < 32) { + throw IllegalArgumentException("Buffer size must be at least 32 bytes") + } else if (size > UtpIPv8Endpoint.getBufferSize()) { + throw IllegalArgumentException("Buffer size must be at most ${UtpIPv8Endpoint.getBufferSize()} bytes") + } + val rngByteArray = ByteArray(size) + Random.nextBytes(rngByteArray, 0, size - 32) + val buffer = ByteBuffer.wrap(rngByteArray) + val hash = MessageDigest.getInstance("SHA-256").digest(rngByteArray) + buffer.position(size - 32) + buffer.put(hash) + return buffer.array() + } + } +} diff --git a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/UtpIPv8Endpoint.kt b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/UtpIPv8Endpoint.kt new file mode 100644 index 00000000..f0d30eed --- /dev/null +++ b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/UtpIPv8Endpoint.kt @@ -0,0 +1,239 @@ +package nl.tudelft.ipv8.messaging.utp + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import net.utp4j.channels.UtpServerSocketChannel +import net.utp4j.channels.UtpSocketChannel +import net.utp4j.channels.UtpSocketState +import net.utp4j.channels.impl.UtpServerSocketChannelImpl +import net.utp4j.channels.impl.UtpSocketChannelImpl +import net.utp4j.channels.impl.alg.UtpAlgConfiguration +import net.utp4j.channels.impl.recieve.UtpRecieveRunnable +import nl.tudelft.ipv8.IPv4Address +import nl.tudelft.ipv8.Peer +import nl.tudelft.ipv8.messaging.Endpoint +import nl.tudelft.ipv8.messaging.EndpointListener +import nl.tudelft.ipv8.messaging.Packet +import nl.tudelft.ipv8.messaging.payload.TransferRequestPayload +import nl.tudelft.ipv8.messaging.utp.listener.BaseDataListener +import nl.tudelft.ipv8.messaging.utp.listener.RawResourceListener +import nl.tudelft.ipv8.messaging.utp.listener.TransferListener +import java.io.IOException +import java.net.BindException +import java.net.DatagramPacket +import java.net.DatagramSocket +import java.net.InetSocketAddress +import java.nio.ByteBuffer + +class UtpIPv8Endpoint : Endpoint<IPv4Address>(), EndpointListener { + + /** + * Scope used for network operations + */ + private val job = SupervisorJob() + private val scope = CoroutineScope(Dispatchers.IO + job) + + /** + * Job for the UTP server listener + */ + private var listenerJob: Job = Job() + + /** + * Listener for raw resources used by the UTP server (receiver) + */ + var listener: TransferListener = RawResourceListener() + + private val sendBuffer = ByteBuffer.allocate(getBufferSize()) + private val receiveBuffer = ByteBuffer.allocate(getBufferSize()) + + private var serverSocket: CustomUtpServerSocket? = null; + var clientSocket: CustomUtpSocket? = null; + + /** + * The underlying UDP socket used by IPv8 + */ + var udpSocket: DatagramSocket? = null + private var clientUtpSocket: UtpSocket? = null + private var serverUtpSocket: UtpSocket? = null + + var rawPacketListeners: MutableList<(DatagramPacket, Boolean) -> Unit> = ArrayList() + val permittedTransfers = mutableMapOf<IPv4Address, TransferRequestPayload?>() + + private var currentLan: IPv4Address? = null + + /** + * Initializes the UTP IPv8 endpoint and the UTP configuration in the library + */ + init { + UtpAlgConfiguration.MAX_CWND_INCREASE_PACKETS_PER_RTT = 30000 + UtpAlgConfiguration.MAX_PACKET_SIZE = MAX_UTP_PACKET_SIZE + println("Utp IPv8 endpoint initialized!") + } + + override fun isOpen(): Boolean = serverSocket != null && clientSocket != null + + override fun open() { + clientUtpSocket = UtpSocket(udpSocket) + serverUtpSocket = UtpSocket(udpSocket) + + serverSocket = CustomUtpServerSocket() + serverSocket?.bind(serverUtpSocket!!) + + val c = CustomUtpSocket() + try { + c.dgSocket = clientUtpSocket + c.state = UtpSocketState.CLOSED + } catch (exp: IOException) { + throw IOException("Could not open UtpSocketChannel: " + exp.message) + } + clientSocket = c + + println("UTP server started listening!") + serverListen() + } + + override fun close() { + println("Stopping the server!") + listenerJob.cancel() + serverSocket?.close() + clientSocket?.close() + } + + override fun send(peer: IPv4Address, data: ByteArray) { + // Refresh the buffer + sendBuffer.clear() + sendBuffer.put(data) + + scope.launch(Dispatchers.IO) { + val future = clientSocket?.connect(InetSocketAddress(peer.ip, peer.port)) + ?.apply { block() } + if (future != null) { + if (future.isSuccessfull) { + clientSocket?.write(sendBuffer)?.apply { block() } + println("Sent buffer") + } else println("Did not manage to connect to the server!") + } else { + println("Future is null!") + } + clientSocket?.close() + } + } + + private fun serverListen() { + listenerJob = scope.launch(Dispatchers.IO) { + while (isActive) { + serverSocket?.accept()?.run { + block() + println("Receiving new data!") + channel.let { + it.read(receiveBuffer)?.run { + setListener(listener) + block() + } + println("Finished receiving data!") + } + permittedTransfers.clear() + channel.close() + } + } + } + } + + /** + * This method is called when a packet is received by the IPv8 UDP socket. + * It strips the UTP prefix and sends the packet to the UTP socket. + */ + fun onPacket(receivePacket: DatagramPacket) { + val packet = DatagramPacket(ByteArray(receivePacket.length - 1), receivePacket.length - 1) + val data = receivePacket.data.copyOfRange(1, receivePacket.length) + packet.setData(data, 0, data.size) + packet.address = receivePacket.address + packet.port = receivePacket.port + + val receiverIp = IPv4Address(receivePacket.address.hostAddress, receivePacket.port) + + // Send the packet to the UTP socket on both sides + // TODO: Should probably distinguish between client and server connection + clientUtpSocket?.buffer?.trySend(packet)?.isSuccess + + // Only allow transfers for accepted files + if (permittedTransfers.containsKey(receiverIp) || receiverIp == currentLan) { + val payload = permittedTransfers[receiverIp] + if (payload != null) { + // Ensure if the transfer is accepted and the data size is within the buffer size + if (payload.dataSize > getBufferSize() || payload.status != TransferRequestPayload.TransferStatus.ACCEPT) + return + receiveBuffer.limit(payload.dataSize) + listener = when (payload.type) { + TransferRequestPayload.TransferType.FILE -> { + RawResourceListener() + } + TransferRequestPayload.TransferType.RANDOM_DATA -> { + BaseDataListener() + } + } + permittedTransfers[receiverIp] = null + } + // TODO: In some cases this check is printed, but the buffer should have space (needs more investigation) + if (receiveBuffer.remaining() < packet.length) { + println("Buffer overflow!") + return + } + serverUtpSocket?.buffer?.trySend(packet)?.isSuccess + } + + // send packet to rawPacketListeners + rawPacketListeners.forEach { listener -> listener.invoke(packet, true)} + } + + companion object { + const val PREFIX_UTP: Byte = 0x42 + // 1500 - 20 (IPv4 header) - 8 (UDP header) - 1 (UTP prefix) + const val MAX_UTP_PACKET_SIZE = 1471 + // Hardcoded maximum buffer size of 50 MB + UTP packet size (for processing) + private const val BUFFER_SIZE = 50_000_000 + MAX_UTP_PACKET_SIZE + + fun getBufferSize(): Int { + return BUFFER_SIZE + } + } + + + + /** + * A custom UTP server socket implementation to change the bind method to use an existing socket. + * The original method throws an exception as we don't want to bind a new socket. + */ + class CustomUtpServerSocket : UtpServerSocketChannelImpl() { + override fun bind(addr: InetSocketAddress?) { + throw BindException("Cannot bind new socket, use existing one!") + } + + fun bind(utpSocket: DatagramSocket) { + this.socket = utpSocket + listenRunnable = UtpRecieveRunnable(utpSocket, this) + } + } + + /** + * A custom UTP socket implementation to allow for logging of raw packets. + */ + class CustomUtpSocket: UtpSocketChannelImpl() { + var rawPacketListeners: MutableList<(DatagramPacket, Boolean) -> Unit> = ArrayList() + + override fun sendPacket(pkt: DatagramPacket?) { + rawPacketListeners.forEach { listener -> listener.invoke(pkt!!, false) } + super.sendPacket(pkt) + } + } + + override fun onPacket(packet: Packet) {} + + override fun onEstimatedLanChanged(address: IPv4Address) { + currentLan = address + } +} diff --git a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/UtpSocket.kt b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/UtpSocket.kt new file mode 100644 index 00000000..08be98da --- /dev/null +++ b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/UtpSocket.kt @@ -0,0 +1,58 @@ +package nl.tudelft.ipv8.messaging.utp + +import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withTimeout +import nl.tudelft.ipv8.messaging.utp.UtpIPv8Endpoint.Companion.MAX_UTP_PACKET_SIZE +import nl.tudelft.ipv8.messaging.utp.UtpIPv8Endpoint.Companion.PREFIX_UTP +import java.io.IOException +import java.net.DatagramPacket +import java.net.DatagramSocket +import java.net.SocketTimeoutException + +/** + * A custom UTP socket that wraps around a regular DatagramSocket. + * + * This socket is used to send raw packets and receive packets from the custom UTP buffer. + */ +class UtpSocket(private val socket: DatagramSocket?) : DatagramSocket() { + val buffer = Channel<DatagramPacket>(Channel.UNLIMITED) + + /** + * Sends a packet to the buffer. + * + * Removes the custom prefix from the packet and sends it to the buffer + */ + override fun send(packet: DatagramPacket) { + + val data = packet.data.copyOfRange(packet.offset, packet.offset + packet.length) + val wrappedData = byteArrayOf(PREFIX_UTP) + data + packet.setData(wrappedData, 0, wrappedData.size) + + if (socket != null) { + socket.send(packet) +// println("Sending $packet") + } else { + println("UTP socket is missing") + } + + } + + /** + * Receives a packet from the buffer. + */ + override fun receive(packet: DatagramPacket) { + try { + runBlocking { + val received = buffer.receive() + packet.address = received.address + packet.port = received.port + packet.setData(received.data, received.offset, received.length) + } + } catch (e: InterruptedException) { + // This exception is thrown when the connection is not established + throw IOException("Interrupted while waiting for packet") + } + } +} diff --git a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/listener/BaseDataListener.kt b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/listener/BaseDataListener.kt new file mode 100644 index 00000000..989cd630 --- /dev/null +++ b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/listener/BaseDataListener.kt @@ -0,0 +1,49 @@ +package nl.tudelft.ipv8.messaging.utp.listener + +import mu.KotlinLogging +import java.security.MessageDigest + +private val logger = KotlinLogging.logger {} + +/** + * A listener for random hashed data. + * It expects a SHA256 hash at the end of the received data to check for integrity. + * + * Used for testing purposes. + */ +class BaseDataListener : TransferListener() { + + override val queue: ArrayDeque<ByteArray> = ArrayDeque() + override fun actionAfterReading() { + if (exception == null && byteBuffer != null) { + try { + byteBuffer.flip() + // Unpack received hash + val receivedHashData = ByteArray(32) + val dataLength = byteBuffer.remaining() + val data = ByteArray(dataLength) + byteBuffer.get(data, 0, dataLength - 32) + byteBuffer.get(receivedHashData) + byteBuffer.clear() + + // Hash the received data + val hash = MessageDigest.getInstance("SHA-256").digest(data) + + if (MessageDigest.isEqual(hash, receivedHashData)) { + println("Correct hash received") + queue.add(data) + } else { + println("Invalid hash received!") + queue.add(ByteArray(32) { _ -> 0x00 }) + } + + // Display the received data + + } catch (e: Exception) { + e.printStackTrace() + } + } + } + + override fun getThreadName(): String = "BaseDataListenerThread" +} diff --git a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/listener/RawResourceListener.kt b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/listener/RawResourceListener.kt new file mode 100644 index 00000000..8dd9bfa9 --- /dev/null +++ b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/listener/RawResourceListener.kt @@ -0,0 +1,44 @@ +package nl.tudelft.ipv8.messaging.utp.listener + +import mu.KotlinLogging +import net.utp4j.channels.futures.UtpReadListener + +private val logger = KotlinLogging.logger {} + +/** + * A listener for raw resources such as text files. + * Currently, it only prints the received data. + * The exact data should be saved to some internal storage. + */ +class RawResourceListener : TransferListener() { + + override val queue: ArrayDeque<ByteArray> = ArrayDeque() + + override fun actionAfterReading() { + if (exception == null && byteBuffer != null) { + try { + byteBuffer.flip() + + // Print the received text file data + val buf = StringBuffer().apply { + while (byteBuffer.hasRemaining()) { + append(byteBuffer.get().toInt().toChar()) + } + println("Received data: ${this.substring(0..minOf(200, this.length - 1))}") + } + + byteBuffer.clear() + queue.add(buf.toString().toByteArray()) + // Save the received data to a file + + + } catch (e: Exception) { + e.printStackTrace() + } + } + } + + + + override fun getThreadName(): String = "RawResourceListenerThread" +} diff --git a/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/listener/TransferListener.kt b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/listener/TransferListener.kt new file mode 100644 index 00000000..9b8719e5 --- /dev/null +++ b/ipv8/src/main/java/nl/tudelft/ipv8/messaging/utp/listener/TransferListener.kt @@ -0,0 +1,11 @@ +package nl.tudelft.ipv8.messaging.utp.listener + +import net.utp4j.channels.futures.UtpReadListener + +abstract class TransferListener : UtpReadListener() { + + abstract val queue: ArrayDeque<ByteArray> + +// fun onTransferComplete(): ByteArray + +} diff --git a/ipv8/src/test/java/nl/tudelft/ipv8/messaging/SerializationTest.kt b/ipv8/src/test/java/nl/tudelft/ipv8/messaging/SerializationTest.kt index 66b1e678..fb9323a6 100644 --- a/ipv8/src/test/java/nl/tudelft/ipv8/messaging/SerializationTest.kt +++ b/ipv8/src/test/java/nl/tudelft/ipv8/messaging/SerializationTest.kt @@ -6,6 +6,170 @@ import org.junit.Test import org.junit.Assert.* class SerializationTest { + + private enum class TestEnum { + A, B, C + } + @Test + fun simpleSerializeInt() { + val value = 248375682 + val simple = simpleSerialize(value) + val explicit = serializeInt(value) + assertEquals(simple.toHex(), explicit.toHex()) + } + + @Test + fun simpleSerializeLong() { + val value = -3483756823489756836 + val simple = simpleSerialize(value) + val explicit = serializeLong(value) + assertEquals(simple.toHex(), explicit.toHex()) + } + + @Test + fun simpleSerializeUInt() { + val value = 248375682u + val simple = simpleSerialize(value) + val explicit = serializeUInt(value) + assertEquals(simple.toHex(), explicit.toHex()) + } + + @Test + fun simpleSerializeULong() { + val value = 9483756823489756836u + val simple = simpleSerialize(value) + val explicit = serializeULong(value) + assertEquals(simple.toHex(), explicit.toHex()) + } + + @Test + fun simpleSerializeUShort() { + val value = 1025.toUShort() + val simple = simpleSerialize(value) + val explicit = serializeUShort(value.toInt()) + assertEquals(simple.toHex(), explicit.toHex()) + } + + @Test + fun simpleSerializeUByte() { + val value = 248u.toUByte() + val simple = simpleSerialize(value) + val explicit = serializeUChar(value) + assertEquals(simple.toHex(), explicit.toHex()) + } + + @Test + fun simpleSerializeByteArray() { + val value = byteArrayOf(0x01, 0x02, 0x03, 0x04) + val simple = simpleSerialize(value) + val explicit = serializeVarLen(value) + assertEquals(simple.toHex(), explicit.toHex()) + } + + @Test + fun simpleSerializeBoolean() { + val value = true + val simple = simpleSerialize(value) + val explicit = serializeBool(value) + assertEquals(simple.toHex(), explicit.toHex()) + } + + @Test + fun simpleSerializeString() { + val value = "Hello, World!" + val simple = simpleSerialize(value) + val explicit = serializeVarLen(value.toByteArray()) + assertEquals(simple.toHex(), explicit.toHex()) + } + + @Test + fun simpleSerializeEnum() { + val value = TestEnum.B + val simple = simpleSerialize(value) + val explicit = serializeUInt(value.ordinal.toUInt()) + assertEquals(simple.toHex(), explicit.toHex()) + } + + @Test + fun simpleDeserializeString() { + val value = "Hello, World!" + val serialized = serializeVarLen(value.toByteArray()) + val (deserialized, _) = simpleDeserialize<String>(serialized) + assertEquals(value, deserialized) + } + + @Test + fun simpleDeserializeEnum() { + val value = TestEnum.B + val serialized = simpleSerialize(value) + val deserialized = simpleDeserialize<UInt>(serialized) + assertEquals(value, TestEnum.entries[deserialized.first.toInt()]) + } + + @Test + fun simpleDeserializeBoolean() { + val value = true + val serialized = simpleSerialize(value) + val deserialized = simpleDeserialize<Boolean>(serialized) + assertEquals(value, deserialized.first) + } + + @Test + fun simpleDeserializeByteArray() { + val value = byteArrayOf(0x01, 0x02, 0x03, 0x04) + val serialized = serializeVarLen(value) + val (deserialized, _) = simpleDeserialize<ByteArray>(serialized) + assertArrayEquals(value, deserialized) + } + + @Test + fun simpleDeserializeUByte() { + val value = 248u.toUByte() + val serialized = serializeUChar(value) + val (deserialized, _) = simpleDeserialize<UByte>(serialized) + assertEquals(value, deserialized) + } + + @Test + fun simpleDeserializeUShort() { + val value = 1025.toUShort() + val serialized = serializeUShort(value.toInt()) + val (deserialized, _) = simpleDeserialize<UShort>(serialized) + assertEquals(value, deserialized) + } + + @Test + fun simpleDeserializeUInt() { + val value = 248375682u + val serialized = serializeUInt(value) + val (deserialized, _) = simpleDeserialize<UInt>(serialized) + assertEquals(value, deserialized) + } + + @Test + fun simpleDeserializeULong() { + val value = 9483756823489756836u + val serialized = serializeULong(value) + val (deserialized, _) = simpleDeserialize<ULong>(serialized) + assertEquals(value, deserialized) + } + + @Test + fun simpleDeserializeLong() { + val value: Long = -3483756823489756836L + val serialized = serializeLong(value) + val (deserialized, _) = simpleDeserialize<Long>(serialized) + assertEquals(value, deserialized) + } + + @Test + fun simpleDeserializeInt() { + val value = 248375682 + val serialized = serializeInt(value) + val (deserialized, _) = simpleDeserialize<Int>(serialized) + assertEquals(value, deserialized) + } + @Test fun serializeBool_true() { val serialized = serializeBool(true) @@ -36,6 +200,20 @@ class SerializationTest { assertEquals("0401", serialized.toHex()) } + @Test + fun serializeUShort_max() { + val uShort = UShort.MAX_VALUE + val serialized = serializeUShort(uShort) + assertEquals("ffff", serialized.toHex()) + } + + @Test + fun deserializeRealUShort() { + val uShort = UShort.MAX_VALUE + val serialized = serializeUShort(uShort) + assertEquals(uShort, deserializeRealUShort(serialized)) + } + @Test fun deserializeUShort_simple() { val value = 1025 @@ -62,6 +240,45 @@ class SerializationTest { assertEquals("ffffffffffffffff", serialized.toHex()) } + @Test + fun serializeUInt() { + val serialized = serializeUInt(UInt.MAX_VALUE) + assertEquals("ffffffff", serialized.toHex()) + } + @Test + fun deserializeUInt_simple() { + val value = 248375682u + val serialized = serializeUInt(value) + assertEquals(value, deserializeUInt(serialized)) + } + + @Test + fun deserializeUInt_max() { + val value = UInt.MAX_VALUE + val serialized = serializeUInt(value) + assertEquals(value, deserializeUInt(serialized)) + } + + @Test + fun serializeInt() { + val serialized = serializeInt(Int.MAX_VALUE) + assertEquals("7fffffff", serialized.toHex()) + } + + @Test + fun deserializeInt_simple() { + val value = 248375682 + val serialized = serializeInt(value) + assertEquals(value, deserializeInt(serialized)) + } + + @Test + fun deserializeInt_max() { + val value = Int.MAX_VALUE + val serialized = serializeInt(value) + assertEquals(value, deserializeInt(serialized)) + } + @Test fun deserializeULong_test() { val value = 18446744073709551615uL diff --git a/ipv8/src/test/java/nl/tudelft/ipv8/messaging/payload/TransferRequestPayloadTest.kt b/ipv8/src/test/java/nl/tudelft/ipv8/messaging/payload/TransferRequestPayloadTest.kt new file mode 100644 index 00000000..adc577b3 --- /dev/null +++ b/ipv8/src/test/java/nl/tudelft/ipv8/messaging/payload/TransferRequestPayloadTest.kt @@ -0,0 +1,61 @@ +package nl.tudelft.ipv8.messaging.payload + +import nl.tudelft.ipv8.messaging.payload.TransferRequestPayload.TransferStatus.* +import nl.tudelft.ipv8.messaging.payload.TransferRequestPayload.TransferType.* +import nl.tudelft.ipv8.messaging.serializeInt +import nl.tudelft.ipv8.messaging.serializeLong +import nl.tudelft.ipv8.messaging.serializeUInt +import nl.tudelft.ipv8.messaging.serializeVarLen +import nl.tudelft.ipv8.util.toHex +import org.junit.Assert.assertEquals +import org.junit.Test + +class TransferRequestPayloadTest { + + @Test + fun testSerialize() { + val payload = TransferRequestPayload("test1.txt", REQUEST, FILE, 1024) + val serialized = payload.serialize() + + // Manual serialization + val filename = serializeVarLen(payload.filename.toByteArray()) + val status = serializeUInt(REQUEST.ordinal.toUInt()) + val type = serializeUInt(FILE.ordinal.toUInt()) + val dataSize = serializeInt(payload.dataSize) + + assertEquals((filename + status + type + dataSize).toHex(), serialized.toHex()) + } + + @Test + fun testSerializeEmpty() { + val payload = TransferRequestPayload("test2.txt", REQUEST, FILE) + val serialized = payload.serialize() + + // Manual serialization + val filename = serializeVarLen(payload.filename.toByteArray()) + val status = serializeUInt(payload.status.ordinal.toUInt()) + val type = serializeUInt(payload.type.ordinal.toUInt()) + val dataSize = serializeInt(payload.dataSize) + + assertEquals(serialized.toHex(), (filename + status + type + dataSize).toHex()) + } + + @Test + fun testDeserialize() { + val payload = TransferRequestPayload("test3.txt", DECLINE, RANDOM_DATA, 1024) + + val serialized = payload.serialize() + val deserialized = TransferRequestPayload.Deserializer.deserialize(serialized, 0).first + assertEquals(payload, deserialized) + } + + @Test + fun testDeserializeEmpty() { + val payload = TransferRequestPayload("test4.txt", REQUEST, FILE) + + val serialized = payload.serialize() + val deserialized = TransferRequestPayload.Deserializer.deserialize(serialized, 0).first + assertEquals(payload, deserialized) + } + +} diff --git a/ipv8/src/test/java/nl/tudelft/ipv8/messaging/utp/UtpIPv8EndpointTest.kt b/ipv8/src/test/java/nl/tudelft/ipv8/messaging/utp/UtpIPv8EndpointTest.kt new file mode 100644 index 00000000..2588bb11 --- /dev/null +++ b/ipv8/src/test/java/nl/tudelft/ipv8/messaging/utp/UtpIPv8EndpointTest.kt @@ -0,0 +1,220 @@ +package nl.tudelft.ipv8.messaging.utp + +import io.mockk.* +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest +import net.utp4j.data.UtpPacket +import net.utp4j.data.UtpPacketUtils +import net.utp4j.data.bytes.UnsignedTypesUtil +import nl.tudelft.ipv8.IPv4Address +import nl.tudelft.ipv8.messaging.utp.UtpIPv8Endpoint.Companion.PREFIX_UTP +import org.junit.Assert +import org.junit.Before +import org.junit.Test +import java.net.DatagramPacket +import java.net.DatagramSocket +import java.net.InetSocketAddress +import java.net.SocketAddress +import kotlin.time.Duration + +class UtpIPv8EndpointTest { + + @Before + fun setUp() { + mockkObject(UtpIPv8Endpoint.Companion) + every { UtpIPv8Endpoint.Companion.getBufferSize() } returns 10_000 + } + + @OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class) + @Test + fun connectionTest() = runTest{ + val endpoint = UtpIPv8Endpoint() + val socket = mockk<DatagramSocket>(relaxed = true) + endpoint.udpSocket = socket + endpoint.open() + + val datagramSlot = slot<DatagramPacket>() + every { + socket.send(capture(datagramSlot)) + } answers { + Unit + } + + val peer = IPv4Address("127.0.0.1", 8090) + val payload = "payload".toByteArray(Charsets.US_ASCII) + + // start sending file + endpoint.send(peer, payload) + runBlocking { delay(10) } + + // first packet to be sent should be SYN packet + verify { + // verify whether first packet was sent + socket.send(any()) + + // verify it has the correct prefix for UTP + val prefixedPacket = datagramSlot.captured + val prefix = prefixedPacket.data[0] + + Assert.assertEquals(prefix, PREFIX_UTP) + + // verify first packet was formatted correctly + val packet = extractUtpPacket(prefixedPacket) + Assert.assertTrue(UtpPacketUtils.isSynPkt(packet)) + } + + endpoint.close() + } + + @OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class) + @Test + fun fileSendingTest() = runTest{ + val endpoint = UtpIPv8Endpoint() + val socket = mockk<DatagramSocket>(relaxed = true) + endpoint.udpSocket = socket + endpoint.open() + + // capture all sent datagrams + val datagramsSent = mutableListOf<DatagramPacket>() + every { + socket.send(capture(datagramsSent)) + } answers { + Unit + } + + val peer = IPv4Address("127.0.0.1", 8090) + val payload = "Hello World".toByteArray(Charsets.US_ASCII) + + // start sending file + endpoint.send(peer, payload) + + // wait until SYN packet is sent + runBlocking { while(datagramsSent.isEmpty()){ delay(1)} } + val synPacket = extractUtpPacket(datagramsSent.get(0)); + + // then send back ack to complete connection + val ackPacket = createAckPacket(synPacket, 10, 40000, true) + val datagramAckPacket = createDatagramPacket(ackPacket!!, datagramsSent.get(0).socketAddress) + endpoint.onPacket(datagramAckPacket) + + // wait until first data packet is sent + runBlocking { while(datagramsSent.size == 1){ delay(1)} } + val dataPacket = extractUtpPacket(datagramsSent.get(1)); + println(synPacket) + println(dataPacket) + // verify that received packet is final packet + Assert.assertTrue(dataPacket.windowSize == 0) + // verify that payload is as expected + Assert.assertArrayEquals(payload, dataPacket.payload) + } + + @OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class) + @Test + fun fileReceivingTest() = runTest { + val endpoint = UtpIPv8Endpoint() + val socket = mockk<DatagramSocket>(relaxed = true) + endpoint.udpSocket = socket + endpoint.open() + endpoint.permittedTransfers.put(IPv4Address("127.0.0.2", 8090), null) + + // capture all sent datagrams + val datagramsSent = mutableListOf<DatagramPacket>() + every { + socket.send(capture(datagramsSent)) + } answers { + Unit + } + + val senderSocketAddress = InetSocketAddress("127.0.0.2", 8090) + val payload = "Hello World".toByteArray(Charsets.US_ASCII) + val connectionIdSend: Short = 43 + val connectionIdReceive: Short = 42 + + // send syn packet to endpoint + val synPacket = UtpPacket() + synPacket.sequenceNumber = 1 + synPacket.timestampDifference = 0 + synPacket.timestamp = 0 + synPacket.connectionId = connectionIdReceive + synPacket.typeVersion = UtpPacketUtils.SYN + synPacket.windowSize = 60000 + + endpoint.onPacket(createDatagramPacket(synPacket, senderSocketAddress)) + + // wait until ack packet is sent + runBlocking { while(datagramsSent.isEmpty()){ delay(1)} } + val ackPacket = extractUtpPacket(datagramsSent.get(0)); + + // verify this packet acknowledges start of connection with id 42 + Assert.assertEquals(connectionIdReceive, ackPacket.connectionId) + Assert.assertEquals(1.toShort(), ackPacket.ackNumber) + + + // send data packet containing entire payload to endpoint + val dataPacket = UtpPacket() + dataPacket.sequenceNumber = 2 + dataPacket.ackNumber = 1 + dataPacket.timestampDifference = 10 + dataPacket.timestamp = 100 + dataPacket.connectionId = connectionIdSend + dataPacket.typeVersion = UtpPacketUtils.DATA + // only data packet, so set windowsize to 0 to indicate this is final packet + dataPacket.windowSize = 0 + dataPacket.payload = payload + + // Utp uses selective acks, so we need to send datapacket twice before ack is produced + // This can be considered a bug in utp4j, as we should probably directly acknowledge + // the final data packet + endpoint.onPacket(createDatagramPacket(dataPacket, senderSocketAddress)) + endpoint.onPacket(createDatagramPacket(dataPacket, senderSocketAddress)) + + // wait until ack packet is sent + runBlocking { while(datagramsSent.size == 1){ delay(1)} } + val ackPacket2 = extractUtpPacket(datagramsSent.get(1)); + + // verify this packet acknowledges the previous data packet + Assert.assertEquals(connectionIdReceive, ackPacket2.connectionId) + Assert.assertEquals(2.toShort(), ackPacket2.ackNumber) + } + + private fun extractUtpPacket(prefixedPacket: DatagramPacket): UtpPacket { + val packet = DatagramPacket(ByteArray(prefixedPacket.length - 1), prefixedPacket.length - 1) + val data = prefixedPacket.data.copyOfRange(1, prefixedPacket.length) + packet.setData(data, 0, data.size) + packet.address = prefixedPacket.address + packet.port = prefixedPacket.port + + return UtpPacketUtils.extractUtpPacket(packet) + } + + private fun createAckPacket( + pkt: UtpPacket, timedifference: Int, + advertisedWindow: Long, + syn: Boolean = false + ): UtpPacket? { + val ackPacket = UtpPacket() + + ackPacket.ackNumber = pkt.ackNumber + ackPacket.timestampDifference = timedifference + ackPacket.timestamp = pkt.timestamp + 10 + if (syn) { + ackPacket.connectionId = (pkt.connectionId).toShort() + } else { + ackPacket.connectionId = (pkt.connectionId + 1).toShort() + } + + ackPacket.typeVersion = UtpPacketUtils.STATE + ackPacket.windowSize = UnsignedTypesUtil.longToUint(advertisedWindow) + return ackPacket + } + + private fun createDatagramPacket(packet: UtpPacket, address: SocketAddress): DatagramPacket { + val utpPacketBytes: ByteArray = byteArrayOf(PREFIX_UTP) + packet.toByteArray() + val length: Int = packet.getPacketLength() + 1 + return DatagramPacket( + utpPacketBytes, length, + address + ) + } +} diff --git a/ipv8/src/test/java/nl/tudelft/ipv8/utp/UtpCommunityTest.kt b/ipv8/src/test/java/nl/tudelft/ipv8/utp/UtpCommunityTest.kt new file mode 100644 index 00000000..9cbe7dc7 --- /dev/null +++ b/ipv8/src/test/java/nl/tudelft/ipv8/utp/UtpCommunityTest.kt @@ -0,0 +1,190 @@ +package nl.tudelft.ipv8.utp + +import io.mockk.every +import io.mockk.mockk +import io.mockk.mockkObject +import io.mockk.spyk +import io.mockk.verify +import nl.tudelft.ipv8.BaseCommunityTest +import nl.tudelft.ipv8.IPv4Address +import nl.tudelft.ipv8.Peer +import nl.tudelft.ipv8.exception.PacketDecodingException +import nl.tudelft.ipv8.keyvault.defaultCryptoProvider +import nl.tudelft.ipv8.messaging.Packet +import nl.tudelft.ipv8.messaging.payload.TransferRequestPayload +import nl.tudelft.ipv8.messaging.payload.TransferRequestPayload.TransferStatus.ACCEPT +import nl.tudelft.ipv8.messaging.payload.TransferRequestPayload.TransferStatus.REQUEST +import nl.tudelft.ipv8.messaging.payload.TransferRequestPayload.TransferType.FILE +import nl.tudelft.ipv8.messaging.payload.UtpHeartbeatPayload +import nl.tudelft.ipv8.messaging.utp.UtpCommunity +import nl.tudelft.ipv8.messaging.utp.UtpCommunity.MessageId.UTP_HEARTBEAT +import nl.tudelft.ipv8.messaging.utp.UtpCommunity.MessageId.UTP_TRANSFER_REQUEST +import nl.tudelft.ipv8.messaging.utp.UtpIPv8Endpoint +import nl.tudelft.ipv8.peerdiscovery.Network +import nl.tudelft.ipv8.util.hexToBytes +import org.junit.After +import org.junit.Assert.assertEquals +import org.junit.Before +import org.junit.Test + +class UtpCommunityTest : BaseCommunityTest() { + + private val peer1: Peer = Peer(defaultCryptoProvider.generateKey(), IPv4Address("5.2.3.4", 5234)) + private val peer2: Peer = Peer(defaultCryptoProvider.generateKey(), IPv4Address("1.2.3.4", 2342)) + + private fun getCommunity(): UtpCommunity { + val myPrivateKey = getPrivateKey() + val myPeer = Peer(myPrivateKey) + val endpoint = getEndpoint() + val network = Network() + val community = UtpCommunity() + community.myPeer = myPeer + community.endpoint = endpoint + community.network = network + return community + } + + @Before + fun setup() { + mockkObject(UtpIPv8Endpoint.Companion) + every { UtpIPv8Endpoint.Companion.getBufferSize() } returns 10_000 + } + + @Test + fun sendHeartbeatTest() { + val community = spyk(getCommunity(), recordPrivateCalls = true) + every { community.getPeers() } returns listOf(peer1, peer2) + + community.sendHeartbeat() + + verify { + community.serializePacket( + UTP_HEARTBEAT, + any<UtpHeartbeatPayload>(), + sign = false, + peer = any(), + prefix = any() + ) + } + verify(exactly = 1) { community.getPeers() } + // I can't verify the send method because it's protected (using this as a workaround) + verify(exactly = 2) { community.endpoint } + } + + @Test + fun heartbeatUpdatesLastHeartbeatForPeer() { + val community = spyk(getCommunity(), recordPrivateCalls = true) + every { community.getPeers() } returns listOf(peer1, peer2) + + + val payload = "0002450ded7389134595dadb6b2549f431ad60156931010000000000000001" + val packet = Packet(peer1.address, payload.hexToBytes()) + community.onHeartbeat(packet) + assert(community.lastHeartbeat.containsKey(peer1.mid)) + } + + @Test + fun heartbeatDoesNotUpdateLastHeartbeatForUnknownPeer() { + val community = spyk(getCommunity(), recordPrivateCalls = true) + + val payload = "0002450ded7389134595dadb6b2549f431ad60156931010000000000000001" + val unknownPeer = Peer(defaultCryptoProvider.generateKey(), IPv4Address("1.1.1.1", 1111)) + val packet = Packet(unknownPeer.address, payload.hexToBytes()) + community.onHeartbeat(packet) + assert(!community.lastHeartbeat.containsKey(unknownPeer.mid)) + } + + @Test + fun onHeartbeatPacketTest() { + val community = spyk(getCommunity(), recordPrivateCalls = true) + + val payload = "0002450ded7389134595dadb6b2549f431ad60156931010000000000000001" + val handler = mockk<(Packet) -> Unit>(relaxed = true) + community.messageHandlers[UTP_HEARTBEAT] = handler + + community.onPacket(Packet(peer1.address, payload.hexToBytes())) + verify { handler(any()) } + } + + @Test + fun sendTransferRequestTest() { + val community = spyk(getCommunity(), recordPrivateCalls = true) + + community.sendTransferRequest(peer1, "test.txt", 100, FILE) + + verify { + community.serializePacket( + UTP_TRANSFER_REQUEST, + any<TransferRequestPayload>(), + sign = true, + peer = any(), + prefix = any() + ) + } + // I can't verify the send method because it's protected (using this as a workaround) + verify(exactly = 1) { community.endpoint } + } + + + @Test + fun sendTransferRequestResponseTest() { + val community = spyk(getCommunity(), recordPrivateCalls = true) + + val payload = TransferRequestPayload("test.txt", REQUEST, FILE, 100) + community.sendTransferResponse(peer1, payload) + + verify { + community.serializePacket( + UTP_TRANSFER_REQUEST, + payload, + sign = true, + peer = any(), + prefix = any() + ) + } + // I can't verify the send method because it's protected (using this as a workaround) + verify(exactly = 1) { community.endpoint } + } + + @Test + fun onTransferRequestPacketTest() { + val community = spyk(getCommunity(), recordPrivateCalls = true) + + val payload = "0002450ded7389134595dadb6b2549f431ad60156931020000000000000001" + val handler = mockk<(Packet) -> Unit>(relaxed = true) + community.messageHandlers[UTP_TRANSFER_REQUEST] = handler + + community.onPacket(Packet(peer1.address, payload.hexToBytes())) + verify { handler(any()) } + } + + @Test + fun transferRequestUpdatesTransferRequestsForPeer() { + val community = spyk(getCommunity(), recordPrivateCalls = true) + + val packet = mockk<Packet>() + val payload = TransferRequestPayload("test.txt", REQUEST, FILE, 100) + + every { packet.source } returns peer1.address + every { packet.getAuthPayload(TransferRequestPayload.Deserializer) } returns (peer1 to payload) + + community.onTransferRequest(packet) + verify { community.sendTransferResponse(any(), payload.copy(status = ACCEPT)) } + assert(community.transferRequests.containsKey(peer1.mid)) + } + + @Test + fun transferRequestDoesNotUpdateTransferRequestsForUnknownPeer() { + val community = spyk(getCommunity(), recordPrivateCalls = true) + + val packet = mockk<Packet>() + + every { packet.source } returns peer2.address + every { packet.getAuthPayload(TransferRequestPayload.Deserializer) } throws PacketDecodingException("Invalid signature!") + + community.onTransferRequest(packet) + verify(exactly = 0) { community.sendTransferResponse(any(), any()) } + assert(!community.transferRequests.containsKey(peer2.mid)) + } + +} diff --git a/ipv8/src/test/java/nl/tudelft/ipv8/utp/UtpHelperTest.kt b/ipv8/src/test/java/nl/tudelft/ipv8/utp/UtpHelperTest.kt new file mode 100644 index 00000000..7451b146 --- /dev/null +++ b/ipv8/src/test/java/nl/tudelft/ipv8/utp/UtpHelperTest.kt @@ -0,0 +1,221 @@ +package nl.tudelft.ipv8.utp + +import io.mockk.every +import io.mockk.mockk +import io.mockk.mockkObject +import io.mockk.verify +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest +import nl.tudelft.ipv8.IPv4Address +import nl.tudelft.ipv8.Peer +import nl.tudelft.ipv8.messaging.payload.TransferRequestPayload +import nl.tudelft.ipv8.messaging.payload.TransferRequestPayload.TransferStatus.ACCEPT +import nl.tudelft.ipv8.messaging.payload.TransferRequestPayload.TransferStatus.DECLINE +import nl.tudelft.ipv8.messaging.payload.TransferRequestPayload.TransferType.FILE +import nl.tudelft.ipv8.messaging.payload.TransferRequestPayload.TransferType.RANDOM_DATA +import nl.tudelft.ipv8.messaging.utp.UtpCommunity +import nl.tudelft.ipv8.messaging.utp.UtpHelper +import nl.tudelft.ipv8.messaging.utp.UtpIPv8Endpoint +import org.junit.Assert.assertArrayEquals +import org.junit.Assert.assertEquals +import org.junit.Assert.assertNotEquals +import org.junit.Before +import org.junit.Test +import java.security.MessageDigest +import kotlin.time.Duration.Companion.seconds + +class UtpHelperTest { + + private val community = mockk<UtpCommunity>(relaxed = true) + private val utpHelper = UtpHelper(community) + + @Before + fun setup() { + mockkObject(UtpIPv8Endpoint) + every { UtpIPv8Endpoint.Companion.getBufferSize() } returns 10_000 + every { community.sendHeartbeat() } returns Unit + } + + @Test + fun testHeartbeatCoroutineOpen() { + every { community.endpoint.udpEndpoint?.utpIPv8Endpoint?.isOpen() } returns true + + runBlocking { + utpHelper.startHeartbeat() + delay(6000) + utpHelper.stopHeartbeat() + } + + verify(exactly = 2) { community.sendHeartbeat() } + } + + @Test + fun testHeartbeatCoroutineClosed() { + every { community.endpoint.udpEndpoint?.utpIPv8Endpoint?.isOpen() } returns false + + utpHelper.startHeartbeat() + + verify(exactly = 0) { community.sendHeartbeat() } + } + + @Test + fun testSendFileData() { + val peer = mockk<Peer>() + + every { peer.mid } returns "test_mid" + every { peer.address } returns IPv4Address("1.1.1.1",1111) + every { community.transferRequests } returns mutableMapOf(("test_mid" to TransferRequestPayload("test", ACCEPT, FILE, 10))) + + val metadata = UtpHelper.NamedResource("test", 10) + val data = ByteArray(10) + + utpHelper.sendFileData(peer, metadata, data) + + verify(exactly = 1) { community.sendTransferRequest(peer, metadata.name, metadata.size, FILE) } + verify(exactly = 2) { community.transferRequests } + verify(exactly = 1) { community.endpoint.udpEndpoint?.sendUtp(any(), any()) } + } + + @Test + fun testSendFileDataNoResponse() { + val peer = mockk<Peer>() + + every { peer.mid } returns "test_mid" + every { community.transferRequests } returns mutableMapOf() + + val metadata = UtpHelper.NamedResource("test", 10) + val data = ByteArray(10) + + utpHelper.sendFileData(peer, metadata, data) + + verify(exactly = 1) { community.sendTransferRequest(peer, metadata.name, metadata.size, FILE) } + verify(exactly = 0) { community.endpoint.udpEndpoint?.sendUtp(any(), any()) } + } + + @Test + fun testSendFileDataDecline() { + val peer = mockk<Peer>() + + every { peer.mid } returns "test_mid" + every { community.transferRequests } returns mutableMapOf(("test_mid" to TransferRequestPayload("test", DECLINE, FILE, 10))) + + val metadata = UtpHelper.NamedResource("test", 10) + val data = ByteArray(10) + + utpHelper.sendFileData(peer, metadata, data) + + verify(exactly = 1) { community.sendTransferRequest(peer, metadata.name, metadata.size, FILE) } + verify(exactly = 2) { community.transferRequests } + verify(exactly = 0) { community.endpoint.udpEndpoint?.sendUtp(any(), any()) } + } + + @Test + fun testSendRandomData() { + val peer = mockk<Peer>() + + every { peer.mid } returns "test_mid" + every { peer.address } returns IPv4Address("1.1.1.1",1111) + every { community.transferRequests } returns mutableMapOf(("test_mid" to TransferRequestPayload("test", ACCEPT, FILE, 10))) + + val size = 5_000 + val metadata = UtpHelper.NamedResource("random.tmp", 0, size) + + utpHelper.sendRandomData(peer, size) + + verify(exactly = 1) { community.sendTransferRequest(peer, metadata.name, metadata.size, RANDOM_DATA) } + verify(exactly = 2) { community.transferRequests } + verify(exactly = 1) { community.endpoint.udpEndpoint?.sendUtp(any(), any()) } + } + + @Test + fun testSendRandomDataDefault() { + val peer = mockk<Peer>() + + every { peer.mid } returns "test_mid" + every { peer.address } returns IPv4Address("1.1.1.1",1111) + every { community.transferRequests } returns mutableMapOf(("test_mid" to TransferRequestPayload("test", ACCEPT, FILE, 10))) + + val metadata = UtpHelper.NamedResource("random.tmp", 0, UtpIPv8Endpoint.getBufferSize()) + + utpHelper.sendRandomData(peer) + + verify(exactly = 1) { community.sendTransferRequest(peer, metadata.name, metadata.size, RANDOM_DATA) } + verify(exactly = 2) { community.transferRequests } + verify(exactly = 1) { community.endpoint.udpEndpoint?.sendUtp(any(), any()) } + } + + @Test + fun testSendRandomDataNoResponse() { + val peer = mockk<Peer>() + + every { peer.mid } returns "test_mid" + every { community.transferRequests } returns mutableMapOf() + + val size = 5_000 + val metadata = UtpHelper.NamedResource("random.tmp", 0, size) + + utpHelper.sendRandomData(peer, size) + + verify(exactly = 1) { community.sendTransferRequest(peer, metadata.name, metadata.size, RANDOM_DATA) } + verify(exactly = 0) { community.endpoint.udpEndpoint?.sendUtp(any(), any()) } + } + + @Test + fun testSendRandomDataDecline() { + val peer = mockk<Peer>() + + every { peer.mid } returns "test_mid" + every { community.transferRequests } returns mutableMapOf(("test_mid" to TransferRequestPayload("test", DECLINE, FILE, 10))) + + val size = 5_000 + val metadata = UtpHelper.NamedResource("random.tmp", 0, size) + + utpHelper.sendRandomData(peer, size) + + verify(exactly = 1) { community.sendTransferRequest(peer, metadata.name, metadata.size, RANDOM_DATA) } + verify(exactly = 2) { community.transferRequests } + verify(exactly = 0) { community.endpoint.udpEndpoint?.sendUtp(any(), any()) } + } + + @Test + fun generateRandomDataBufferReturnsCorrectSize() { + val size = 1000 + val data = UtpHelper.generateRandomDataBuffer(size) + assertEquals(size, data.size) + } + + @Test + fun generateRandomDataBufferReturnsUniqueData() { + val data1 = UtpHelper.generateRandomDataBuffer() + val data2 = UtpHelper.generateRandomDataBuffer() + assertNotEquals(data1, data2) + } + + @Test + fun generateRandomDataBufferContainsSHA256Hash() { + val size = 1000 + val data = UtpHelper.generateRandomDataBuffer(size) + val messageDigest = MessageDigest.getInstance("SHA-256") + val hash = messageDigest.digest(data.copyInto(ByteArray(size), 0, 0, size - 32)) + val dataHash = data.copyOfRange(size - 32, size) + assertArrayEquals(hash, dataHash) + } + + @Test(expected = IllegalArgumentException::class) + fun generateRandomDataBufferThrowsExceptionForSizeLessThan32() { + UtpHelper.generateRandomDataBuffer(31) + } + + @Test(expected = IllegalArgumentException::class) + fun generateRandomDataBufferThrowsExceptionForSizeGreaterThanBufferSize() { + UtpHelper.generateRandomDataBuffer(UtpIPv8Endpoint.getBufferSize() + 1) + } + + @Test + fun namedResourceToString() { + val namedResource = UtpHelper.NamedResource("test", 10) + assertEquals("test", namedResource.toString()) + } + +}