Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: adding binary transfer with 3 MB/s performance #89

Open
wants to merge 59 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
4403d3b
Implement fresh UTP endpoint into IPv8
Eragoneq Mar 18, 2024
22c2199
Peer selection update
Eragoneq Mar 20, 2024
44906bc
First version transfer request
Eragoneq Mar 22, 2024
1c780c5
Add puncturing into the connection
Eragoneq Mar 24, 2024
b225e79
add possibility to listen for individual utp messages in UtpEndpoint.…
PieterCarton Apr 1, 2024
0fb5aab
fix missing method reference
PieterCarton Apr 4, 2024
fc5ba49
actually add missing reference
PieterCarton Apr 4, 2024
626ab8c
final fix
PieterCarton Apr 4, 2024
1197174
final fix for getServerSocket method
PieterCarton Apr 4, 2024
c6b64cb
First version of IPv8 integration
Eragoneq Apr 6, 2024
6141b53
Add channel for messages
Eragoneq Apr 6, 2024
19efe95
correctly set source ip and port of copied datagram packet in UtpIPv8…
PieterCarton Apr 6, 2024
7b7d7dd
Allow for IPv8 file transfer
Eragoneq Apr 7, 2024
f7e1867
Don't allow client to crash the app when not connected
Eragoneq Apr 7, 2024
0b2e2ee
Remove references to old endpoint
Eragoneq Apr 8, 2024
a6adba0
Add custom UTP community
Eragoneq Apr 8, 2024
854892c
Merge pull request #1 from Eragoneq/ipv8_integration
Eragoneq Apr 8, 2024
1843cec
Add comments to communication code
Eragoneq Apr 8, 2024
2832be0
Merge branch 'ipv8_integration'
Eragoneq Apr 8, 2024
3e4a42d
Update Serialization.kt with auto functionality
Eragoneq Apr 9, 2024
9b900b2
Add custom payloads and basic handlers
Eragoneq Apr 9, 2024
baaa761
Implement TransferRequest
Eragoneq Apr 9, 2024
dd3e50b
Move connection code to ipv8 library
Eragoneq Apr 11, 2024
0c1b03f
Fix missing closing of client connection
Eragoneq Apr 12, 2024
50f5fca
Allow internal transfer without request
Eragoneq Apr 12, 2024
8f766de
Fix infinite loop cancellation issue
Eragoneq Apr 12, 2024
20400fe
Add heartbeat loop
Eragoneq Apr 12, 2024
916b85e
Fix random data functionality
Eragoneq Apr 12, 2024
ab4db56
Fix Heartbeat code
Eragoneq Apr 14, 2024
f3fb94e
Modify random data sending
Eragoneq Apr 14, 2024
d5aa2f9
Merge pull request #2 from Eragoneq/community-extensions
Eragoneq Apr 14, 2024
452e6a1
Fix buffer overflow issue
Eragoneq Apr 14, 2024
02787d9
allow for listening to outgoing packets in UtpIPv8 endpoint
PieterCarton Apr 16, 2024
fea0456
test utp datagram prefix
PieterCarton Apr 16, 2024
1733f4e
add test for utp endpoint behaviour when sending file
PieterCarton Apr 16, 2024
c8a8060
Comment out unused print logs
Eragoneq Apr 16, 2024
9f9b119
Make heartbeat job explicit
Eragoneq Apr 16, 2024
9a577ae
Fix incoming data check
Eragoneq Apr 16, 2024
33f52a7
Fully test UtpCommunity class
Eragoneq Apr 16, 2024
fcffb52
Refactor data sending code
Eragoneq Apr 16, 2024
7094023
Test UtpHelper class
Eragoneq Apr 16, 2024
6a8e1c1
add receiver test for UtpIPv8Endpoint
PieterCarton Apr 17, 2024
9ca91d9
Merge branch 'utp-endpoint-unit-tests'
PieterCarton Apr 17, 2024
ff5053a
merge IPv8UtpEndpoint integration tests from branch utp-endpoint-unit…
PieterCarton Apr 17, 2024
4ec94f1
Merge pull request #3 from Eragoneq/misc-fix-test
Eragoneq Apr 17, 2024
c0b2424
Allow for buffer size mocking
Eragoneq Apr 17, 2024
af6ffa9
Add explicit heap size to stop test from throwing OOM error
Eragoneq Apr 17, 2024
2ddc25e
merge with master
PieterCarton Apr 18, 2024
765bb62
Add updates based on heartbeats
Eragoneq Apr 18, 2024
2ad644d
Use master branch of the library code
Eragoneq Apr 18, 2024
c0590f8
Merge branch 'master' of https://github.com/Eragoneq/kotlin-ipv8
PieterCarton Apr 18, 2024
d02720b
Add documentation
Eragoneq Apr 19, 2024
aba5aeb
Add a few more comments
Eragoneq Apr 19, 2024
3b0953d
Update docs
Eragoneq Apr 19, 2024
6454fa3
Merge pull request #4 from Eragoneq/documentation
Eragoneq Apr 19, 2024
4bef0c3
Use release version of library
Eragoneq Apr 19, 2024
49025f0
modify CWND_INCREASE_PER_RTT of UTP to 30000
PieterCarton Apr 19, 2024
15a134d
Merge branch 'master' of https://github.com/Eragoneq/kotlin-ipv8
PieterCarton Apr 19, 2024
a0f0da6
Update UtpBinaryTransfer.md
Eragoneq Apr 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions doc/UtpBinaryTransfer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# UTP in IPv8

uTP is a protocol based on the UDP used to share files in a peer-to-peer fashion. In this project, we have integrated an existing `utp4j` library with the IPv8 infrastructure to allow for binary data transfer to users on the existing network.

## UtpCommunity

To distinguish between the capabilities of different peers, we have created our own `UtpCommunity` to show that the peer supports uTP transfer. We use this community to implement some useful control messages in our infrastructure. These are:
- `UTP_HEARTBEAT` - used to signal that the peer has a working uTP endpoint and can accept connections.
- `UTP_TRANSFER_REQUEST` - used as a handshake for establishing a file transfer.

### UtpHelper

To separate some of the logic of sending messages over the protocol, we have created a `UtpHelper` class. It holds all the utility methods used by peers sending data and creates a coroutine to send heartbeat messages.

#### Heartbeats

The heartbeat service is started right after `UtpCommunity` initialization. It creates a separate coroutine which is active as long as the uTP endpoint is open. A heartbeat message is sent every 5 seconds to each peer in the community to signal own liveness.

For now the heartbeat has an empty payload, but could be used to hold some useful data to be sent to all the peers.

#### Data transfer

For file transfer we have two separate methods exposed to the other classes. One allows to send file data directly, the other is used to generate and send a random, hash verified data. The transfer works as follows:
- First the `UTP_TRANSFER_REQUEST` message is sent to request to the other peer that we want to send them a data with certain metadata (name, size, type (random/file)).
- We wait for the response from the other peer. They can either accept or decline.
- If they decline, we abort the transfer. If it is accepted, we send the data using the uTP endpoint.

Peers which have not established the handshake cannot transfer data. All the packets that would be sent directly to the endpoint, would be dropped. This mechanism is used as a basic protection against the DOS attacks. Current defaults always accept the connection, but further conditions could be added to combat malicious transfers.

## UtpIPv8Endpoint

This endpoint is the main element of the uTP transfer. It uses the previously mentioned existing library to establish transfer of data between two peers using UDP. It is an extension of the existing `UdpEndpoint`. The existing endpoint creates an instance of `UtpIPv8Endpoint` similar to what TFTP has done. Then every packet that is intended to be sent to our endpoint is prefixed with a special byte (`0x42`) and routed to be further processed.

In the actual endpoint code, we have a logic for both sending (client) and receiving (server) data. Most of the code there is based on the examples shown in the library already, just refactored to use Kotlin features.

Every routed packed is processed in the `onPacket` function. Stripped off the prefix, checked for the handshake details and finally forwarded to the custom client and server socket.

### UtpSocket

As we are using IPv8 layer for our transfer, we could not use usual implementations of the `DatagramSocket`. We have created a middle layer in-between the IPv8 and `utp4j` library to allow for a seamless integration. It uses the raw IPv8 socket to send messages prefixed with out special byte. For receiving it has a separate receiving channel which is filled by the `onPacket` function in the endpoint. This gives us an elegant solution, which does not interfere with the code of the library.

## Testing

All the code has been tested and achieved 86% instruction coverage.

## Remaining issues

Unfortunately due to the timeframe we were not able to fully check the quality of every element and some solutions could be done better. Smaller existing issues are mentioned directly in the code.

The main improvements would be:
- code structure
- missing listener API (which would allow for displaying transfer data progress)
- more robust handshake architecture
- test code seems to be flaky due to OutOfMemory errors when run in bulk

# See also


| Name | Link |
| :-: | :-: |
| Main project issue | https://github.com/Tribler/tribler/issues/7911 |
| Main app description | https://github.com/Eragoneq/trustchain-superapp/blob/master/doc/BlockchainNetworkingProject/UtpTesting.md |
| Benchmarking done over 10 weeks | https://github.com/Eragoneq/trustchain-superapp/blob/master/doc/BlockchainNetworkingProject/Benchmarking.md |
| utp4j changes and benchmarks | https://github.com/PieterCarton/utp4j/blob/master/CHANGES.md |
| kotlin-ipv8 changes | https://github.com/Eragoneq/kotlin-ipv8/blob/master/doc/UtpBinaryTransfer.md |
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ object IPv8Android {
connectivityManager,
)


val bluetoothManager =
application.getSystemService<BluetoothManager>()
?: throw IllegalStateException("BluetoothManager not found")
Expand Down
6 changes: 6 additions & 0 deletions ipv8/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ ktlint {
}
}

test {
minHeapSize = "512M"
maxHeapSize = "2048M"
}

jacocoTestReport {
reports {
xml.required = true
Expand Down Expand Up @@ -81,6 +86,7 @@ dependencies {
// https://mvnrepository.com/artifact/org.bouncycastle/bcprov-jdk15on
implementation group: 'org.bouncycastle', name: 'bcprov-jdk15to18', version: '1.63'

api 'com.github.PieterCarton:utp4j:v0.0.2'

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import java.util.*
*/
class EndpointAggregator(
val udpEndpoint: UdpEndpoint?,
val bluetoothEndpoint: BluetoothEndpoint?
val bluetoothEndpoint: BluetoothEndpoint?,
) : Endpoint<Peer>(), EndpointListener {
private var isOpen: Boolean = false

Expand Down
140 changes: 128 additions & 12 deletions ipv8/src/main/java/nl/tudelft/ipv8/messaging/Serialization.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

import java.nio.Buffer
import java.nio.ByteBuffer
import kotlin.reflect.full.isSubclassOf
import kotlin.reflect.full.memberProperties
import kotlin.reflect.full.primaryConstructor

const val SERIALIZED_USHORT_SIZE = 2
const val SERIALIZED_UINT_SIZE = 4
const val SERIALIZED_INT_SIZE = 4
const val SERIALIZED_ULONG_SIZE = 8
const val SERIALIZED_LONG_SIZE = 4
const val SERIALIZED_LONG_SIZE = 8
const val SERIALIZED_UBYTE_SIZE = 1

const val SERIALIZED_PUBLIC_KEY_SIZE = 74
Expand All @@ -20,6 +24,80 @@

interface Deserializable<T> {
fun deserialize(buffer: ByteArray, offset: Int = 0): Pair<T, Int>

}

/**
* Serializes the object and returns the buffer.
* Alternative to manually defining the serialize function.
*/
interface AutoSerializable : Serializable {
override fun serialize(): ByteArray {
return this::class.primaryConstructor!!.parameters.map { param ->
val value =
this.javaClass.kotlin.memberProperties.find { it.name == param.name }!!.get(this)
simpleSerialize(value)
}.reduce { acc, bytes -> acc + bytes }
}
}

///**
// * Deserializes the object from the buffer and returns the object and the new offset.
// * Alternative to manually defining the deserialize function.
// */
//inline fun <reified T> Deserializable<T>.autoDeserialize(buffer: ByteArray, offset: Int = 0): Pair<T, Int> {
// TODO()
//}

fun <U> simpleSerialize(data: U): ByteArray {
return when (data) {
is Int -> serializeInt(data)
is Long -> serializeLong(data)

Check warning on line 55 in ipv8/src/main/java/nl/tudelft/ipv8/messaging/Serialization.kt

View check run for this annotation

Codecov / codecov/patch

ipv8/src/main/java/nl/tudelft/ipv8/messaging/Serialization.kt#L55

Added line #L55 was not covered by tests
is UByte -> serializeUChar(data)
is UInt -> serializeUInt(data)
is UShort -> serializeUShort(data.toInt())
is ULong -> serializeULong(data)
is String -> serializeVarLen(data.toByteArray())
is ByteArray -> serializeVarLen(data)
is Boolean -> serializeBool(data)
is Enum<*> -> serializeUInt(data.ordinal.toUInt())
is Serializable -> data.serialize()
else -> throw IllegalArgumentException("Unsupported serialization type")
}
}

inline fun <reified U> simpleDeserialize(buffer: ByteArray, offset: Int = 0): Pair<U, Int> {
val (value, off) = when (U::class) {
Int::class -> Pair(deserializeInt(buffer, offset) as U, SERIALIZED_INT_SIZE)
Long::class -> Pair(deserializeLong(buffer, offset) as U, SERIALIZED_LONG_SIZE)

Check warning on line 72 in ipv8/src/main/java/nl/tudelft/ipv8/messaging/Serialization.kt

View check run for this annotation

Codecov / codecov/patch

ipv8/src/main/java/nl/tudelft/ipv8/messaging/Serialization.kt#L72

Added line #L72 was not covered by tests
UByte::class -> Pair(deserializeUChar(buffer, offset) as U, SERIALIZED_UBYTE_SIZE)
UShort::class -> Pair(
deserializeUShort(buffer, offset).toUShort() as U,
SERIALIZED_USHORT_SIZE
)

UInt::class -> Pair(deserializeUInt(buffer, offset) as U, SERIALIZED_UINT_SIZE)
ULong::class -> Pair(deserializeULong(buffer, offset) as U, SERIALIZED_ULONG_SIZE)
String::class -> {
val (data, len) = deserializeVarLen(buffer, offset)
Pair(data.decodeToString() as U, len)
}

ByteArray::class -> {
val (data, len) = deserializeVarLen(buffer, offset)

Check warning on line 87 in ipv8/src/main/java/nl/tudelft/ipv8/messaging/Serialization.kt

View check run for this annotation

Codecov / codecov/patch

ipv8/src/main/java/nl/tudelft/ipv8/messaging/Serialization.kt#L87

Added line #L87 was not covered by tests
Pair(data as U, len)
}

Boolean::class -> Pair(deserializeBool(buffer, offset) as U, 1)
else -> {
if (U::class.isSubclassOf(Enum::class)) {
val ordinal = deserializeUInt(buffer, offset).toInt()
val values = U::class.java.enumConstants
Pair(values[ordinal] as U, SERIALIZED_UINT_SIZE)

Check warning on line 96 in ipv8/src/main/java/nl/tudelft/ipv8/messaging/Serialization.kt

View check run for this annotation

Codecov / codecov/patch

ipv8/src/main/java/nl/tudelft/ipv8/messaging/Serialization.kt#L96

Added line #L96 was not covered by tests
} else throw IllegalArgumentException("Unsupported deserialization type")
}
}
return (value to (offset + off))
}

fun serializeBool(data: Boolean): ByteArray {
Expand All @@ -40,10 +118,21 @@
return bytes
}

fun serializeUShort(value: UShort): ByteArray {
val bytes = ByteBuffer.allocate(SERIALIZED_USHORT_SIZE)
bytes.putShort(value.toShort())

Check warning on line 123 in ipv8/src/main/java/nl/tudelft/ipv8/messaging/Serialization.kt

View check run for this annotation

Codecov / codecov/patch

ipv8/src/main/java/nl/tudelft/ipv8/messaging/Serialization.kt#L121-L123

Added lines #L121 - L123 were not covered by tests
return bytes.array()
}

fun deserializeUShort(buffer: ByteArray, offset: Int = 0): Int {
return (((buffer[offset].toInt() and 0xFF) shl 8) or (buffer[offset + 1].toInt() and 0xFF))
}

fun deserializeRealUShort(buffer: ByteArray, offset: Int = 0): UShort {
val buf = ByteBuffer.wrap(buffer, offset, SERIALIZED_USHORT_SIZE)
return buf.short.toUShort()
}

fun serializeUInt(value: UInt): ByteArray {
val bytes = UByteArray(SERIALIZED_UINT_SIZE)
for (i in 0 until SERIALIZED_UINT_SIZE) {
Expand Down Expand Up @@ -80,7 +169,7 @@

fun serializeLong(value: Long): ByteArray {
val buffer = ByteBuffer.allocate(SERIALIZED_LONG_SIZE)
buffer.putInt(value.toInt())
buffer.putLong(value)
return buffer.array()
}

Expand All @@ -89,7 +178,20 @@
buffer.put(bytes.copyOfRange(offset, offset + SERIALIZED_LONG_SIZE))
// In JDK 8 this returns a Buffer.
(buffer as Buffer).flip()
return buffer.int.toLong()
return buffer.long
}

fun serializeInt(value: Int): ByteArray {
val buffer = ByteBuffer.allocate(SERIALIZED_INT_SIZE)
buffer.putInt(value)
return buffer.array()
}

fun deserializeInt(bytes: ByteArray, offset: Int = 0): Int {
val buffer = ByteBuffer.allocate(SERIALIZED_INT_SIZE)
buffer.put(bytes.copyOfRange(offset, offset + SERIALIZED_INT_SIZE))
buffer.flip()
return buffer.int
}

fun serializeUChar(char: UByte): ByteArray {
Expand All @@ -107,8 +209,10 @@

fun deserializeVarLen(buffer: ByteArray, offset: Int = 0): Pair<ByteArray, Int> {
val len = deserializeUInt(buffer, offset).toInt()
val payload = buffer.copyOfRange(offset + SERIALIZED_UINT_SIZE,
offset + SERIALIZED_UINT_SIZE + len)
val payload = buffer.copyOfRange(
offset + SERIALIZED_UINT_SIZE,
offset + SERIALIZED_UINT_SIZE + len
)
return Pair(payload, SERIALIZED_UINT_SIZE + len)
}

Expand All @@ -117,19 +221,31 @@
return arrayOf()
}
val len = deserializeUInt(buffer, offset).toInt()
val payload = buffer.copyOfRange(offset + SERIALIZED_UINT_SIZE,
offset + SERIALIZED_UINT_SIZE + len)
return arrayOf(payload) + deserializeRecursively(buffer.copyOfRange(offset + SERIALIZED_UINT_SIZE + len,
buffer.size), offset)
val payload = buffer.copyOfRange(
offset + SERIALIZED_UINT_SIZE,
offset + SERIALIZED_UINT_SIZE + len
)
return arrayOf(payload) + deserializeRecursively(
buffer.copyOfRange(
offset + SERIALIZED_UINT_SIZE + len,
buffer.size
), offset
)
}

fun deserializeAmount(buffer: ByteArray, amount: Int, offset: Int = 0): Pair<Array<ByteArray>, ByteArray> {
fun deserializeAmount(
buffer: ByteArray,
amount: Int,
offset: Int = 0
): Pair<Array<ByteArray>, ByteArray> {
val returnValues = arrayListOf<ByteArray>()
var localOffset = offset
for (i in 0 until amount) {
val len = deserializeUInt(buffer, localOffset).toInt()
val payload = buffer.copyOfRange(localOffset + SERIALIZED_UINT_SIZE,
localOffset + SERIALIZED_UINT_SIZE + len)
val payload = buffer.copyOfRange(
localOffset + SERIALIZED_UINT_SIZE,
localOffset + SERIALIZED_UINT_SIZE + len
)
localOffset += SERIALIZED_UINT_SIZE + len
returnValues.add(payload)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package nl.tudelft.ipv8.messaging.payload

import nl.tudelft.ipv8.messaging.AutoSerializable
import nl.tudelft.ipv8.messaging.Deserializable
import nl.tudelft.ipv8.messaging.Serializable
import nl.tudelft.ipv8.messaging.simpleDeserialize

data class TransferRequestPayload(
val filename: String,
val status: TransferStatus,
val type: TransferType,
val dataSize: Int = 0
) : AutoSerializable {

companion object Deserializer : Deserializable<TransferRequestPayload> {
override fun deserialize(
buffer: ByteArray,
offset: Int
): Pair<TransferRequestPayload, Int> {
val (filename, newOffset) = simpleDeserialize<String>(buffer, offset)
val (status, newOffset1) = simpleDeserialize<TransferStatus>(buffer, newOffset)
val (type, newOffset2) = simpleDeserialize<TransferType>(buffer, newOffset1)
val (dataSize, newOffset3) = simpleDeserialize<Int>(buffer, newOffset2)
return Pair(TransferRequestPayload(filename, status, type, dataSize), newOffset3)
}
}

enum class TransferStatus {
REQUEST,
ACCEPT,
DECLINE
}

enum class TransferType {
FILE,
RANDOM_DATA
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package nl.tudelft.ipv8.messaging.payload

import nl.tudelft.ipv8.messaging.Deserializable
import nl.tudelft.ipv8.messaging.Serializable

class UtpHeartbeatPayload(

) : Serializable {
override fun serialize(): ByteArray {
return byteArrayOf()
}

companion object Deserializer : Deserializable<UtpHeartbeatPayload> {
override fun deserialize(buffer: ByteArray, offset: Int): Pair<UtpHeartbeatPayload, Int> {
return Pair(UtpHeartbeatPayload(), 0)
}
}
}
Loading
Loading