From 1d2695ee808cb4d6154257c67218479530a70cdf Mon Sep 17 00:00:00 2001 From: Nemanja Boric Date: Thu, 6 Apr 2017 14:55:46 +0200 Subject: [PATCH] Initial commit for public release of swarm v4.0.0 --- Build.mak | 5 +- README.rst | 13 +- docker/swarm.sh | 2 - src/swarm/README_client.rst | 0 src/swarm/client/request/model/INodeInfo.d | 0 .../client/request/params/IRequestParams.d | 4 +- src/swarm/neo/IPAddress.d | 0 src/swarm/neo/authentication/Credentials.d | 0 .../neo/authentication/CredentialsFile.d | 24 +- src/swarm/neo/authentication/HmacAuthCode.d | 7 +- src/swarm/neo/authentication/HmacDef.d | 0 src/swarm/neo/client/ClientSocket.d | 0 src/swarm/neo/client/Connection.d | 9 +- src/swarm/neo/client/ConnectionSet.d | 48 ++- src/swarm/neo/client/IRequestSet.d | 0 src/swarm/neo/client/NotifierTypes.d | 230 +++++++++++-- src/swarm/neo/client/RequestOnConn.d | 0 src/swarm/neo/client/RequestSet.d | 0 src/swarm/neo/client/RetryTimer.d | 0 src/swarm/neo/client/mixins/ClientCore.d | 8 + src/swarm/neo/client/mixins/RequestCore.d | 29 +- .../neo/client/mixins/TaskBlockingCore.d | 81 ++++- .../client/requests/NotificationFormatter.d | 309 ++++++++++++++++++ src/swarm/neo/connection/ConnectionBase.d | 8 +- src/swarm/neo/connection/RequestOnConnBase.d | 38 ++- src/swarm/neo/node/Connection.d | 4 +- src/swarm/neo/node/RequestOnConn.d | 1 + src/swarm/neo/node/RequestSet.d | 14 +- src/swarm/neo/protocol/Message.d | 0 src/swarm/neo/protocol/MessageParser.d | 0 src/swarm/neo/protocol/ProtocolError.d | 0 .../neo/protocol/connect/ClientConnect.d | 0 .../neo/protocol/connect/ConnectProtocol.d | 0 src/swarm/neo/protocol/connect/NodeConnect.d | 9 +- .../neo/protocol/socket/MessageGenerator.d | 0 .../neo/protocol/socket/MessageReceiver.d | 3 + src/swarm/neo/request/Command.d | 0 src/swarm/neo/util/FiberTokenHashGenerator.d | 0 src/swarm/neo/util/FieldSizeSum.d | 0 src/swarm/neo/util/FixedSizeMap.d | 0 src/swarm/neo/util/MessageFiber.d | 5 +- src/swarm/neo/util/TreeMap.d | 2 +- src/swarm/neo/util/TreeQueue.d | 2 +- src/swarm/neo/util/Util.d | 0 src/swarm/node/connection/ConnectionHandler.d | 30 -- src/swarm/node/model/RecordActionCounters.d | 0 .../node/storage/model/IStorageChannels.d | 2 +- src/swarm/util/RecordStream.d | 17 +- src/swarm/util/log/ClientStats.d | 2 +- src/swarm/util/node/log/Stats.d | 1 + submodules/ocean | 2 +- test/neo/client/Client.d | 260 +++++++++++++++ test/neo/client/request/Get.d | 71 ++++ test/neo/client/request/Put.d | 69 ++++ test/neo/client/request/internal/Get.d | 250 ++++++++++++++ test/neo/client/request/internal/Put.d | 226 +++++++++++++ test/neo/common/Get.d | 30 ++ test/neo/common/Put.d | 29 ++ test/neo/common/RequestCodes.d | 20 ++ test/neo/main.d | 117 +++++++ test/neo/node/Node.d | 111 +++++++ test/neo/node/Storage.d | 21 ++ test/neo/node/request/Get.d | 118 +++++++ test/neo/node/request/Put.d | 101 ++++++ 64 files changed, 2174 insertions(+), 158 deletions(-) mode change 100755 => 100644 src/swarm/README_client.rst mode change 100755 => 100644 src/swarm/client/request/model/INodeInfo.d mode change 100755 => 100644 src/swarm/neo/IPAddress.d mode change 100755 => 100644 src/swarm/neo/authentication/Credentials.d mode change 100755 => 100644 src/swarm/neo/authentication/HmacDef.d mode change 100755 => 100644 src/swarm/neo/client/ClientSocket.d mode change 100755 => 100644 src/swarm/neo/client/Connection.d mode change 100755 => 100644 src/swarm/neo/client/ConnectionSet.d mode change 100755 => 100644 src/swarm/neo/client/IRequestSet.d mode change 100755 => 100644 src/swarm/neo/client/RequestOnConn.d mode change 100755 => 100644 src/swarm/neo/client/RequestSet.d mode change 100755 => 100644 src/swarm/neo/client/RetryTimer.d mode change 100755 => 100644 src/swarm/neo/client/mixins/RequestCore.d create mode 100644 src/swarm/neo/client/requests/NotificationFormatter.d mode change 100755 => 100644 src/swarm/neo/connection/ConnectionBase.d mode change 100755 => 100644 src/swarm/neo/connection/RequestOnConnBase.d mode change 100755 => 100644 src/swarm/neo/node/Connection.d mode change 100755 => 100644 src/swarm/neo/node/RequestOnConn.d mode change 100755 => 100644 src/swarm/neo/node/RequestSet.d mode change 100755 => 100644 src/swarm/neo/protocol/Message.d mode change 100755 => 100644 src/swarm/neo/protocol/MessageParser.d mode change 100755 => 100644 src/swarm/neo/protocol/ProtocolError.d mode change 100755 => 100644 src/swarm/neo/protocol/connect/ClientConnect.d mode change 100755 => 100644 src/swarm/neo/protocol/connect/ConnectProtocol.d mode change 100755 => 100644 src/swarm/neo/protocol/connect/NodeConnect.d mode change 100755 => 100644 src/swarm/neo/protocol/socket/MessageGenerator.d mode change 100755 => 100644 src/swarm/neo/request/Command.d mode change 100755 => 100644 src/swarm/neo/util/FiberTokenHashGenerator.d mode change 100755 => 100644 src/swarm/neo/util/FieldSizeSum.d mode change 100755 => 100644 src/swarm/neo/util/FixedSizeMap.d mode change 100755 => 100644 src/swarm/neo/util/MessageFiber.d mode change 100755 => 100644 src/swarm/neo/util/TreeMap.d mode change 100755 => 100644 src/swarm/neo/util/Util.d mode change 100755 => 100644 src/swarm/node/model/RecordActionCounters.d mode change 100755 => 100644 src/swarm/util/RecordStream.d mode change 100755 => 100644 src/swarm/util/node/log/Stats.d create mode 100644 test/neo/client/Client.d create mode 100644 test/neo/client/request/Get.d create mode 100644 test/neo/client/request/Put.d create mode 100644 test/neo/client/request/internal/Get.d create mode 100644 test/neo/client/request/internal/Put.d create mode 100644 test/neo/common/Get.d create mode 100644 test/neo/common/Put.d create mode 100644 test/neo/common/RequestCodes.d create mode 100644 test/neo/main.d create mode 100644 test/neo/node/Node.d create mode 100644 test/neo/node/Storage.d create mode 100644 test/neo/node/request/Get.d create mode 100644 test/neo/node/request/Put.d diff --git a/Build.mak b/Build.mak index 89578857..a80fbe87 100644 --- a/Build.mak +++ b/Build.mak @@ -1,4 +1,5 @@ override DFLAGS += -w +override LDFLAGS += -lebtree -llzo2 -lrt -lgcrypt -lgpg-error -lglib-2.0 ifeq ($(DVER),1) override DFLAGS += -v2 -v2=-static-arr-params -v2=-volatile @@ -9,9 +10,5 @@ endif # Remove deprecated modules from testing: TEST_FILTER_OUT += \ - -# Add link flags for unittests -$O/%unittests: override LDFLAGS += -lebtree -llzo2 -lrt -lgcrypt -lgpg-error -lglib-2.0 - .PHONY: d2conv d2conv: $O/d2conv.stamp diff --git a/README.rst b/README.rst index 8abda87a..3d15defd 100644 --- a/README.rst +++ b/README.rst @@ -40,10 +40,17 @@ An overview of the features of the legacy and neo client architecture can be found here `Legacy client documentation -`_. +`_. `Neo client documentation -`_. +`_. + +Example +------- + +A simple example of how to construct a client and node using the neo protocol +can be found `here +`_. Build / Use =========== @@ -54,7 +61,7 @@ Dependencies ========== ======= Dependency Version ========== ======= -ocean v2.5.x +ocean v3.1.x makd v1.5.x ========== ======= diff --git a/docker/swarm.sh b/docker/swarm.sh index e9eb23e7..2ff920bb 100755 --- a/docker/swarm.sh +++ b/docker/swarm.sh @@ -8,5 +8,3 @@ apt-get install -y \ libgcrypt-dev \ libglib2.0-dev \ libgpg-error-dev - - \ No newline at end of file diff --git a/src/swarm/README_client.rst b/src/swarm/README_client.rst old mode 100755 new mode 100644 diff --git a/src/swarm/client/request/model/INodeInfo.d b/src/swarm/client/request/model/INodeInfo.d old mode 100755 new mode 100644 diff --git a/src/swarm/client/request/params/IRequestParams.d b/src/swarm/client/request/params/IRequestParams.d index 51c1bc9c..0e3c9e90 100644 --- a/src/swarm/client/request/params/IRequestParams.d +++ b/src/swarm/client/request/params/IRequestParams.d @@ -35,7 +35,7 @@ import ocean.core.Traits; import ocean.io.select.EpollSelectDispatcher; -import ocean.io.serialize.SimpleSerializer; +import ocean.io.serialize.SimpleStreamSerializer; import ocean.io.model.IConduit: IOStream, InputStream, OutputStream; @@ -89,7 +89,7 @@ public abstract class IRequestParams protected alias .IOStream IOStream; protected alias .InputStream InputStream; protected alias .OutputStream OutputStream; - protected alias .SimpleSerializer SimpleSerializer; + protected alias .SimpleStreamSerializer SimpleStreamSerializer; protected alias .copyFields copyFields; protected alias .SizeofTuple SizeofTuple; diff --git a/src/swarm/neo/IPAddress.d b/src/swarm/neo/IPAddress.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/authentication/Credentials.d b/src/swarm/neo/authentication/Credentials.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/authentication/CredentialsFile.d b/src/swarm/neo/authentication/CredentialsFile.d index 9cd0e76c..1a40f181 100644 --- a/src/swarm/neo/authentication/CredentialsFile.d +++ b/src/swarm/neo/authentication/CredentialsFile.d @@ -52,6 +52,7 @@ extern (C) private int g_ascii_xdigit_value ( char c ); // glib-2.0 class CredentialsFile { import swarm.neo.authentication.Credentials; + import HmacDef = swarm.neo.authentication.HmacDef; import ocean.io.device.File; @@ -66,7 +67,7 @@ class CredentialsFile ***************************************************************************/ - private Credentials.Key[istring] credentials_; + private HmacDef.Key[istring] credentials_; /*************************************************************************** @@ -105,7 +106,7 @@ class CredentialsFile ***************************************************************************/ - public Const!(Credentials.Key[istring])* credentials ( ) + public Const!(HmacDef.Key[istring])* credentials ( ) { return &this.credentials_; } @@ -175,8 +176,8 @@ class CredentialsFile ***************************************************************************/ - private static Credentials.Key[istring] parse ( istring content, - cstring filepath = null ) + private static HmacDef.Key[istring] parse ( istring content, + cstring filepath = null ) { int line = 0; // The current line in the input file. @@ -199,7 +200,7 @@ class CredentialsFile scope parser = new QueryParams('\n', ':'); - Credentials.Key[istring] keys; + HmacDef.Key[istring] keys; foreach (name, hex_key; parser.set(content)) { @@ -222,7 +223,7 @@ class CredentialsFile size_t i = 0; - Credentials.Key key; + HmacDef.Key key; foreach (ref b; key.content) { @@ -296,6 +297,7 @@ class CredentialsFile version ( UnitTest ) { import swarm.neo.authentication.Credentials; + import HmacDef = swarm.neo.authentication.HmacDef; import ocean.core.Test; import ocean.core.ByteSwap; @@ -312,7 +314,7 @@ version ( UnitTest ) ***************************************************************************/ - void registryTest ( istring name, istring file_content, Credentials.Key[istring] expected ) + void registryTest ( istring name, istring file_content, HmacDef.Key[istring] expected ) { auto t = new NamedTest(name); auto reg = CredentialsFile.parse(file_content, name); @@ -339,14 +341,14 @@ version ( UnitTest ) ***************************************************************************/ - Credentials.Key makeKey ( ulong[] words ... ) + HmacDef.Key makeKey ( ulong[] words ... ) in { - assert(words.length == Credentials.Key.length / words[0].sizeof); + assert(words.length == HmacDef.Key.length / words[0].sizeof); } body { - Credentials.Key key; + HmacDef.Key key; (cast(ulong[])key.content)[] = words; ByteSwap.swap64(key.content); return key; @@ -364,7 +366,7 @@ version ( UnitTest ) unittest { // Empty - registryTest("Empty", [], (Credentials.Key[istring]).init); + registryTest("Empty", [], (HmacDef.Key[istring]).init); // One entry then EOF registryTest( diff --git a/src/swarm/neo/authentication/HmacAuthCode.d b/src/swarm/neo/authentication/HmacAuthCode.d index 8870acc4..27db10e0 100644 --- a/src/swarm/neo/authentication/HmacAuthCode.d +++ b/src/swarm/neo/authentication/HmacAuthCode.d @@ -32,8 +32,9 @@ struct HmacAuthCode import swarm.neo.authentication.HmacDef: Key, Code, Nonce; import ocean.util.cipher.gcrypt.HMAC; - import ocean.util.cipher.gcrypt.c.random; import ocean.util.cipher.gcrypt.c.gpgerror; + import ocean.util.cipher.gcrypt.c.md; + import ocean.util.cipher.gcrypt.c.random; import core.stdc.time: time_t; @@ -49,7 +50,7 @@ struct HmacAuthCode ***************************************************************************/ - const hash_algoritm = HMAC.gcry_md_algos.GCRY_MD_SHA512; + const hash_algoritm = gcry_md_algos.GCRY_MD_SHA512; private HMAC hmac; @@ -203,7 +204,7 @@ struct HmacAuthCode ***********************************************************************/ ulong timestamp; - + Nonce nonce; char[] name; diff --git a/src/swarm/neo/authentication/HmacDef.d b/src/swarm/neo/authentication/HmacDef.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/client/ClientSocket.d b/src/swarm/neo/client/ClientSocket.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/client/Connection.d b/src/swarm/neo/client/Connection.d old mode 100755 new mode 100644 index c237553c..a8c561e7 --- a/src/swarm/neo/client/Connection.d +++ b/src/swarm/neo/client/Connection.d @@ -33,6 +33,7 @@ public final class Connection: ConnectionBase import swarm.neo.client.RetryTimer; import ocean.core.Enforce; + import ocean.io.select.EpollSelectDispatcher; import ocean.transition; debug ( SwarmConn ) import ocean.io.Stdout; @@ -448,13 +449,9 @@ public final class Connection: ConnectionBase case this.status_.Disconnected: /* - * If the shutdown was requested during startup, report that - * we are now stopping further connection attempts. + * Shutdown was requested during startup, stopping further + * connection attempts. */ - // TODO: do we need to notify the user at this stage? - // they are probably the one who requested the shutdown... - //~ if (this.startup_notifier !is null) - //~ this.startup_notifier(this); return false; default: diff --git a/src/swarm/neo/client/ConnectionSet.d b/src/swarm/neo/client/ConnectionSet.d old mode 100755 new mode 100644 index 612c1079..3f091536 --- a/src/swarm/neo/client/ConnectionSet.d +++ b/src/swarm/neo/client/ConnectionSet.d @@ -26,10 +26,12 @@ import swarm.neo.client.RequestOnConn; /// ditto public final class ConnectionSet : RequestOnConn.IConnectionGetter { - import ocean.core.Exception_tango; + import ocean.core.ExceptionDefinitions; + import ocean.core.SmartUnion; import swarm.neo.client.Connection; import swarm.neo.client.RequestSet; + import swarm.neo.client.NotifierTypes; import swarm.neo.IPAddress; import swarm.neo.authentication.Credentials; import ocean.io.select.EpollSelectDispatcher; @@ -100,22 +102,42 @@ public final class ConnectionSet : RequestOnConn.IConnectionGetter private uint n_nodes_starting = 0; + /*************************************************************************** + + Union of notifications about a connection. + + ***************************************************************************/ + + private union ConnNotificationUnion + { + /// The connection has been successfully established. + NodeInfo connected; + + /// An error (indicated by the `e` field) occurred while connecting. The + /// connection attempt will automatically be retried. + NodeExceptionInfo error_while_connecting; + } + + /*************************************************************************** + + Smart-union of notifications about a connection. + + ***************************************************************************/ + + public alias SmartUnion!(ConnNotificationUnion) ConnNotification; + /*************************************************************************** User callback to be notified when a connection, which was added with `start()`, - 1. finished connecting, because either the connection was established - or because it was removed with `stop()` while connecting, or + 1. the connection was established, or 2. detected an error -- for example, the node is unreachable or a socket I/O error, but also a failed protocol handshake or authentication -- and will try connecting again. - In case 1 `e is null`, in case 2 it reflects the error. - ***************************************************************************/ - public alias void delegate ( IPAddress node_address, Exception e ) - ConnectionNotifier; + public alias void delegate ( ConnNotification info ) ConnectionNotifier; /// ditto private ConnectionNotifier conn_notifier; @@ -416,10 +438,20 @@ public final class ConnectionSet : RequestOnConn.IConnectionGetter private void notifyConnectResult ( Connection connection, Exception e = null ) { + ConnNotification info; + if (e is null) + { this.n_nodes_starting--; + info.connected = NodeInfo(connection.remote_address); + } + else + { + info.error_while_connecting = + NodeExceptionInfo(connection.remote_address, e); + } - this.conn_notifier(connection.remote_address, e); + this.conn_notifier(info); } } diff --git a/src/swarm/neo/client/IRequestSet.d b/src/swarm/neo/client/IRequestSet.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/client/NotifierTypes.d b/src/swarm/neo/client/NotifierTypes.d index 31341b85..d5a215b8 100644 --- a/src/swarm/neo/client/NotifierTypes.d +++ b/src/swarm/neo/client/NotifierTypes.d @@ -18,6 +18,7 @@ module swarm.neo.client.NotifierTypes; *******************************************************************************/ import ocean.transition; +import Formatter = ocean.text.convert.Formatter; /******************************************************************************* @@ -31,6 +32,27 @@ public struct NodeInfo /// Address of the remote node for which the notification is occurring. IPAddress node_addr; + + /*************************************************************************** + + Formats a description of the notification to the provided sink delegate. + + Params: + sink = delegate to feed formatted strings to + + ***************************************************************************/ + + public void toString ( void delegate ( cstring chunk ) sink ) + { + Formatter.sformat( + ( cstring chunk ) + { + sink(chunk); + return chunk.length; + }, + "Node {}:{}", + this.node_addr.address_bytes, this.node_addr.port); + } } /******************************************************************************* @@ -49,30 +71,27 @@ public struct RequestNodeInfo /// Address of the remote node for which the notification is occurring. IPAddress node_addr; -} -/******************************************************************************* + /*************************************************************************** - The address of a node plus an enum indicating a type of unsupported error. + Formats a description of the notification to the provided sink delegate. -*******************************************************************************/ + Params: + sink = delegate to feed formatted strings to -deprecated("Use RequestNodeUnsupportedInfo instead") -public struct NodeUnsupportedInfo -{ - import swarm.neo.IPAddress; + ***************************************************************************/ - /// Address of the remote node for which the notification is occurring. - IPAddress node_addr; - - enum Type + public void toString ( void delegate ( cstring chunk ) sink ) { - RequestNotSupported, - RequestVersionNotSupported + Formatter.sformat( + ( cstring chunk ) + { + sink(chunk); + return chunk.length; + }, + "Request #{}, node {}:{}", + this.request_id, this.node_addr.address_bytes, this.node_addr.port); } - - /// Type of unsupported error. - Type type; } /******************************************************************************* @@ -100,6 +119,49 @@ public struct RequestNodeUnsupportedInfo /// Type of unsupported error. Type type; + + /*************************************************************************** + + Formats a description of the notification to the provided sink delegate. + + Params: + sink = delegate to feed formatted strings to + + ***************************************************************************/ + + public void toString ( void delegate ( cstring chunk ) sink ) + { + Formatter.sformat( + ( cstring chunk ) + { + sink(chunk); + return chunk.length; + }, + "Request #{}, node {}:{} reported that the {} is not supported", + this.request_id, this.node_addr.address_bytes, this.node_addr.port, + this.type_explanation); + } + + /*************************************************************************** + + Returns: + a string explaining this.type + + ***************************************************************************/ + + private istring type_explanation ( ) + { + with ( Type ) switch ( this.type ) + { + case RequestNotSupported: + return "request"; + case RequestVersionNotSupported: + return "request version"; + default: + assert(false); + } + assert(false); + } } /******************************************************************************* @@ -117,6 +179,40 @@ public struct NodeExceptionInfo /// Exception associated with notification. Exception e; + + /*************************************************************************** + + Formats a description of the notification to the provided sink delegate. + + Params: + sink = delegate to feed formatted strings to + + ***************************************************************************/ + + public void toString ( void delegate ( cstring chunk ) sink ) + { + size_t size_sink ( cstring chunk ) + { + sink(chunk); + return chunk.length; + } + + if ( this.e !is null ) + { + Formatter.sformat(&size_sink, + "Exception '{}' @ {}:{} occurred in the client while handling the " + "request on node {}:{}", + getMsg(this.e), this.e.file, this.e.line, + this.node_addr.address_bytes, this.node_addr.port); + } + else + { + Formatter.sformat(&size_sink, + "An undefined error (null Exception) occurred in the client " + "while handling the request on node {}:{}", + this.node_addr.address_bytes, this.node_addr.port); + } + } } /******************************************************************************* @@ -138,19 +234,41 @@ public struct RequestNodeExceptionInfo /// Exception associated with notification. Exception e; -} -/******************************************************************************* + /*************************************************************************** - A chunk of untyped data. + Formats a description of the notification to the provided sink delegate. -*******************************************************************************/ + Params: + sink = delegate to feed formatted strings to -deprecated("Use RequestDataInfo instead") -public struct DataInfo -{ - /// Data value associated with notification. - Const!(void)[] value; + ***************************************************************************/ + + public void toString ( void delegate ( cstring chunk ) sink ) + { + size_t size_sink ( cstring chunk ) + { + sink(chunk); + return chunk.length; + } + + if ( this.e !is null ) + { + Formatter.sformat(&size_sink, + "Exception '{}' @ {}:{} occurred in the client while handling " + "request #{} on node {}:{}", + getMsg(this.e), this.e.file, this.e.line, this.request_id, + this.node_addr.address_bytes, this.node_addr.port); + } + else + { + Formatter.sformat(&size_sink, + "An undefined error (null Exception) occurred in the client " + "while handling request #{} on node {}:{}", + this.request_id, this.node_addr.address_bytes, + this.node_addr.port); + } + } } /******************************************************************************* @@ -168,6 +286,27 @@ public struct RequestDataInfo /// Data value associated with notification. Const!(void)[] value; + + /*************************************************************************** + + Formats a description of the notification to the provided sink delegate. + + Params: + sink = delegate to feed formatted strings to + + ***************************************************************************/ + + public void toString ( void delegate ( cstring chunk ) sink ) + { + Formatter.sformat( + ( cstring chunk ) + { + sink(chunk); + return chunk.length; + }, + "Request #{} provided the value {}", + this.request_id, this.value); + } } /******************************************************************************* @@ -176,7 +315,22 @@ public struct RequestDataInfo *******************************************************************************/ -public struct NoInfo {} +public struct NoInfo +{ + /*************************************************************************** + + Formats a description of the notification to the provided sink delegate. + + Params: + sink = delegate to feed formatted strings to + + ***************************************************************************/ + + public void toString ( void delegate ( cstring chunk ) sink ) + { + sink("(empty notification)"); + } +} /******************************************************************************* @@ -190,5 +344,25 @@ public struct RequestInfo /// ID of the request for which the notification is occurring. RequestId request_id; -} + /*************************************************************************** + + Formats a description of the notification to the provided sink delegate. + + Params: + sink = delegate to feed formatted strings to + + ***************************************************************************/ + + public void toString ( void delegate ( cstring chunk ) sink ) + { + Formatter.sformat( + ( cstring chunk ) + { + sink(chunk); + return chunk.length; + }, + "Request #{}", + this.request_id); + } +} diff --git a/src/swarm/neo/client/RequestOnConn.d b/src/swarm/neo/client/RequestOnConn.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/client/RequestSet.d b/src/swarm/neo/client/RequestSet.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/client/RetryTimer.d b/src/swarm/neo/client/RetryTimer.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/client/mixins/ClientCore.d b/src/swarm/neo/client/mixins/ClientCore.d index edc337d8..f2626948 100644 --- a/src/swarm/neo/client/mixins/ClientCore.d +++ b/src/swarm/neo/client/mixins/ClientCore.d @@ -37,6 +37,14 @@ template ClientCore ( ) import swarm.client.helper.NodesConfigReader; + /*************************************************************************** + + Convenience alias for the connection notification union. + + ***************************************************************************/ + + public alias ConnectionSet.ConnNotification ConnNotification; + /*************************************************************************** Convenience alias for the connection notifier. diff --git a/src/swarm/neo/client/mixins/RequestCore.d b/src/swarm/neo/client/mixins/RequestCore.d old mode 100755 new mode 100644 index bec0793e..96e4c2c3 --- a/src/swarm/neo/client/mixins/RequestCore.d +++ b/src/swarm/neo/client/mixins/RequestCore.d @@ -120,6 +120,7 @@ public template RequestCore ( RequestType request_type_, ubyte request_code, import swarm.neo.request.Command; import swarm.neo.IPAddress; + import swarm.neo.client.NotifierTypes; /*************************************************************************** @@ -263,29 +264,12 @@ public template RequestCore ( RequestType request_type_, ubyte request_code, private static bool handleGlobalStatusCodes ( StatusCode status, Context* context, IPAddress remote_address ) { - static if ( is(typeof( - { NotificationUnion n; n.unsupported = RequestNodeUnsupportedInfo(); } )) ) - { - const bool include_request_id = true; - alias RequestNodeUnsupportedInfo UnsupportedInfo; - } - else - { - static assert(is(typeof( - { NotificationUnion n; n.unsupported = NodeUnsupportedInfo(); } ))); - - const bool include_request_id = false; - alias NodeUnsupportedInfo UnsupportedInfo; - } - switch ( status ) { case GlobalStatusCode.RequestNotSupported: NotificationUnion n; - n.unsupported = UnsupportedInfo(); - static if ( include_request_id ) - n.unsupported.request_id = context.request_id; - + n.unsupported = RequestNodeUnsupportedInfo(); + n.unsupported.request_id = context.request_id; n.unsupported.node_addr = remote_address; n.unsupported.type = n.unsupported.type.RequestNotSupported; @@ -294,11 +278,8 @@ public template RequestCore ( RequestType request_type_, ubyte request_code, case GlobalStatusCode.RequestVersionNotSupported: NotificationUnion n; - n.unsupported = UnsupportedInfo(); - - static if ( include_request_id ) - n.unsupported.request_id = context.request_id; - + n.unsupported = RequestNodeUnsupportedInfo(); + n.unsupported.request_id = context.request_id; n.unsupported.node_addr = remote_address; n.unsupported.type = n.unsupported.type.RequestVersionNotSupported; diff --git a/src/swarm/neo/client/mixins/TaskBlockingCore.d b/src/swarm/neo/client/mixins/TaskBlockingCore.d index 34fbfe40..4dd404e2 100644 --- a/src/swarm/neo/client/mixins/TaskBlockingCore.d +++ b/src/swarm/neo/client/mixins/TaskBlockingCore.d @@ -6,6 +6,10 @@ connected. * A method to block the current task until at least one registered node is connected. + * A Task-derivative which connects to all registered nodes. (Intended + for use with Scheduler.await().) + * A Task-derivative which connects to at least the specified number of + nodes. (Intended for use with Scheduler.await().) Copyright: Copyright (c) 2016-2017 sociomantic labs GmbH. All rights reserved @@ -62,6 +66,79 @@ template TaskBlockingCore ( ) this.waitConnect(&finished); } + /*************************************************************************** + + Task class which connects to all registered nodes. Intended for use + with Scheduler.await(). + + ***************************************************************************/ + + public class AllNodesConnected : Task + { + /*********************************************************************** + + Task main method. Exits when the client has established a connection + to all registered nodes. + + ***********************************************************************/ + + public override void run ( ) + { + this.outer.waitAllNodesConnected(); + } + } + + /*************************************************************************** + + Task class which connects to at least the specified number of nodes. + Intended for use with Scheduler.await() or Scheduler.awaitResult(). + + ***************************************************************************/ + + public class NodesConnected : Task + { + /// When the task exits, holds the number of nodes which are connected. + public size_t result; + + /// The minimum number of nodes to connect to. + private size_t minimum_connected; + + /*********************************************************************** + + Constructor. + + Params: + minimum_connected = the minimum number of nodes to connect to + + ***********************************************************************/ + + public this ( size_t minimum_connected ) + { + this.minimum_connected = minimum_connected; + } + + /*********************************************************************** + + Task main method. Exits only when the client has established + connections to at least this.minimum_connected nodes. + + ***********************************************************************/ + + public override void run ( ) + { + scope stats = this.outer.outer.neo.new Stats; + + bool finished ( ) + { + return stats.num_connected_nodes >= this.minimum_connected; + } + + this.outer.waitConnect(&finished); + + this.result = stats.num_connected_nodes; + } + } + /*************************************************************************** Suspends the current task until the specified finished condition is @@ -81,9 +158,9 @@ template TaskBlockingCore ( ) ConnectionSet.ConnectionNotifier user_conn_notifier; - void notifier ( IPAddress node_address, Exception e ) + void notifier ( ConnectionSet.ConnNotification info ) { - user_conn_notifier(node_address, e); + user_conn_notifier(info); task.resume(); } diff --git a/src/swarm/neo/client/requests/NotificationFormatter.d b/src/swarm/neo/client/requests/NotificationFormatter.d new file mode 100644 index 00000000..72215366 --- /dev/null +++ b/src/swarm/neo/client/requests/NotificationFormatter.d @@ -0,0 +1,309 @@ +/******************************************************************************* + + Helper functions to format human-readable information about a request + notification. + + copyright: Copyright (c) 2017 sociomantic labs GmbH. All rights reserved + + License: + Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. + +*******************************************************************************/ + +module swarm.neo.client.requests.NotificationFormatter; + +import ocean.transition; +import ocean.core.SmartUnion; +import ocean.core.Traits : TemplateInstanceArgs, hasMethod; +import Formatter = ocean.text.convert.Formatter; + +import swarm.neo.client.NotifierTypes; + +/// Alias for a formatting sink delegate. +private alias void delegate ( cstring chunk ) Sink; + +/******************************************************************************* + + Formats human-readable information -- suitable for logging -- about the + provided request notification into the specified buffer. + + Params: + SU = type of the notification (must be an instantiation of ocean's + SmartUnion) + notification = request notification to format info about + buf = buffer to format into + + Returns: + formatted buffer + +*******************************************************************************/ + +public cstring formatNotification ( SU ) ( SU notification, ref mstring buf ) +{ + buf.length = 0; + enableStomping(buf); + + formatNotification(notification, + ( cstring chunk ) + { + buf ~= chunk; + } + ); + + return buf; +} + +/******************************************************************************* + + Formats human-readable information -- suitable for logging -- about the + provided request notification, via the specified sink delegate. + + Params: + SU = type of the notification (must be an instantiation of ocean's + SmartUnion) + notification = request notification to format info about + sink = sink delegate to receive formatted info + +*******************************************************************************/ + +public void formatNotification ( SU ) ( SU notification, Sink sink ) +{ + static assert(is(TemplateInstanceArgs!(SmartUnion, SU))); + + Formatter.sformat( + ( cstring chunk ) + { + sink(chunk); + return chunk.length; + }, + "<{}>", notification.active_name + ); + + .format_sink = sink; + callWithActive!(.format)(notification); +} + +/// +unittest +{ + // Imaginary request arguments. + struct Args + { + } + + // Class in your application that assigns and handles swarm client requests. + static class RequestHandler + { + import swarm.neo.client.requests.NotificationFormatter; + + // Reused formatting buffer. + mstring format_buf; + + /*********************************************************************** + + Request notification callback for a request which we imagine has + been assigned. + + Params: + info = notification smart-union + args = original request arguments + + ***********************************************************************/ + + void request_notifier ( Notification info, Args args ) + { + // Format a description of the notification `info` + auto formatted = formatNotification(info, this.format_buf); + + // ... log or print `formatted` + } + } +} + +/******************************************************************************* + + Sink delegate to use in `format`, below. Must be set before `format` is + called. + + Stored as a global variable as a workaround for the fact that the + SmartUnion helper template, `callWithActive`, cannot access the frame of + a nested function. (The intuitive location for the function to pass to + `callWithActive` would be inside `formatNotification`, above, where the + function is called and where the user-provided sink delegate is + available in context.) + +*******************************************************************************/ + +private Sink format_sink; + +/******************************************************************************* + + If the provided notification has a `format` method, calls it, passing the + sink delegate above (`format_sink`, which was provided by the user in + `formatNotification`, above) to do the actual writing. + + Note that this method is only declared public so that `callWithActive` + (in SmartUnion) can access it. It is not intended to be called directly + by the user. + + Params: + T = type of notification + notification = notification to format info about, using + `format_sink` + +*******************************************************************************/ + +public void format ( T ) ( T notification ) +in +{ + assert(.format_sink !is null); +} +body +{ + static if ( hasMethod!(T, "toString", void delegate ( Sink )) ) + { + .format_sink(": "); + notification.toString(.format_sink); + } + else + { + .format_sink(": notification without toString method: " ~ T.stringof); + } +} + +version ( UnitTest ) +{ + import ocean.core.Test; + + // Dummy notification type with no toString method + struct Unknown { } + + // Imaginary union of notification types. + union NotificationUnion + { + NoInfo hello; // 1 + NodeInfo connected; // 2 + NodeExceptionInfo connect_error; // 3 + RequestInfo request_succeeded; // 4 + RequestNodeInfo request_node_error; // 5 + RequestNodeUnsupportedInfo request_unsupported; // 6 + RequestNodeExceptionInfo request_client_error; // 7 + RequestDataInfo request_data; // 8 + Unknown unknown; // 9 + } + + // Imaginary smart-union of notification types. + alias SmartUnion!(NotificationUnion) Notification; +} + +// Test the results of `formatNotification` +unittest +{ + Notification notification; + mstring buf; + + // NoInfo + { + NoInfo n; + notification.hello = n; + formatNotification(notification, buf); + test!("==")(buf, ": (empty notification)"); + } + + // NodeInfo + { + NodeInfo n; + n.node_addr.setAddress("127.0.0.1"); + n.node_addr.port = 23; + notification.connected = n; + formatNotification(notification, buf); + test!("==")(buf, ": Node [127, 0, 0, 1]:23"); + } + + // NodeExceptionInfo without an exception (weird, but just to test it works) + { + NodeExceptionInfo n; + notification.connect_error = n; + formatNotification(notification, buf); + test!("==")(buf, ": An undefined error (null Exception) occurred in the client while handling the request on node [0, 0, 0, 0]:0"); + } + + // NodeExceptionInfo with an exception + { + NodeExceptionInfo n; + n.e = new Exception("Error", "file.d", 23); + notification.connect_error = n; + formatNotification(notification, buf); + test!("==")(buf, ": Exception 'Error' @ file.d:23 occurred in the client while handling the request on node [0, 0, 0, 0]:0"); + } + + // RequestInfo + { + RequestInfo n; + notification.request_succeeded = n; + formatNotification(notification, buf); + test!("==")(buf, ": Request #0"); + } + + // RequestNodeInfo + { + RequestNodeInfo n; + n.node_addr.setAddress("127.0.0.1"); + n.node_addr.port = 23; + notification.request_node_error = n; + formatNotification(notification, buf); + test!("==")(buf, ": Request #0, node [127, 0, 0, 1]:23"); + } + + // RequestNodeUnsupportedInfo with an unsupported request + { + RequestNodeUnsupportedInfo n; + n.type = n.type.RequestNotSupported; + notification.request_unsupported = n; + formatNotification(notification, buf); + test!("==")(buf, ": Request #0, node [0, 0, 0, 0]:0 reported that the request is not supported"); + } + + // RequestNodeUnsupportedInfo with an unsupported request version + { + RequestNodeUnsupportedInfo n; + n.type = n.type.RequestVersionNotSupported; + notification.request_unsupported = n; + formatNotification(notification, buf); + test!("==")(buf, ": Request #0, node [0, 0, 0, 0]:0 reported that the request version is not supported"); + } + + // RequestNodeExceptionInfo without an exception (weird, but just to test it works) + { + RequestNodeExceptionInfo n; + notification.request_client_error = n; + formatNotification(notification, buf); + test!("==")(buf, ": An undefined error (null Exception) occurred in the client while handling request #0 on node [0, 0, 0, 0]:0"); + } + + // RequestNodeExceptionInfo with an exception + { + RequestNodeExceptionInfo n; + n.e = new Exception("Error", "file.d", 23); + notification.request_client_error = n; + formatNotification(notification, buf); + test!("==")(buf, ": Exception 'Error' @ file.d:23 occurred in the client while handling request #0 on node [0, 0, 0, 0]:0"); + } + + // RequestDataInfo + { + RequestDataInfo n; + n.value = [1, 2, 3, 4]; + notification.request_data = n; + formatNotification(notification, buf); + test!("==")(buf, ": Request #0 provided the value [1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0]"); + } + + // Unknown + { + Unknown n; + notification.unknown = n; + formatNotification(notification, buf); + test!("==")(buf, ": notification without toString method: Unknown"); + } +} diff --git a/src/swarm/neo/connection/ConnectionBase.d b/src/swarm/neo/connection/ConnectionBase.d old mode 100755 new mode 100644 index f995873f..6b98fcb4 --- a/src/swarm/neo/connection/ConnectionBase.d +++ b/src/swarm/neo/connection/ConnectionBase.d @@ -26,7 +26,7 @@ abstract class ConnectionBase: ISelectClient import swarm.neo.util.TreeQueue; - import swarm.neo.protocol.Message: MessageType, RequestId; + import Message = swarm.neo.protocol.Message; import swarm.neo.protocol.socket.MessageReceiver; import swarm.neo.protocol.socket.MessageSender; import swarm.neo.protocol.MessageParser; @@ -49,6 +49,12 @@ abstract class ConnectionBase: ISelectClient debug ( SwarmConn ) import ocean.io.Stdout; + + /// Convenience aliases for derived class + protected alias Message.RequestId RequestId; + /// Ditto + protected alias Message.MessageType MessageType; + /*************************************************************************** Stack size of sender/receiver fibers diff --git a/src/swarm/neo/connection/RequestOnConnBase.d b/src/swarm/neo/connection/RequestOnConnBase.d old mode 100755 new mode 100644 index 58a6afad..a7c00c8b --- a/src/swarm/neo/connection/RequestOnConnBase.d +++ b/src/swarm/neo/connection/RequestOnConnBase.d @@ -76,13 +76,16 @@ module swarm.neo.connection.RequestOnConnBase; abstract class RequestOnConnBase { - import swarm.neo.protocol.Message: RequestId; + import Message = swarm.neo.protocol.Message; import swarm.neo.protocol.ProtocolError; import swarm.neo.connection.YieldedRequestOnConns; import Util = swarm.neo.util.Util; import ocean.transition; + /// Convenience alias for derived classes + protected alias Message.RequestId RequestId; + /*************************************************************************** Codes returned by `suspendFiber()` to indicate that a message has been @@ -244,7 +247,7 @@ abstract class RequestOnConnBase ***********************************************************************/ - private scope class Payload + public scope class Payload { import ocean.core.Traits : hasIndirections; @@ -271,7 +274,7 @@ abstract class RequestOnConnBase *******************************************************************/ - public this ( ) + private this ( ) { assert(this.outer.outer.send_payload.length == 0); } @@ -704,6 +707,26 @@ abstract class RequestOnConnBase return value; } + /*********************************************************************** + + Waits until a message for this request is received from the node. + + Do not resume the fiber before this method has returned or thrown. + + Params: + received = called with the payload of the next message that + arrives for this request + + Throws: + Exception on protocol or I/O error. + + ***********************************************************************/ + + public void receive ( void delegate ( in void[] payload ) received ) + { + int resume_code = this.receiveAndHandleEvents(received); + assert(resume_code <= 0, "receive: User unexpectedy resumed the fiber"); + } /*********************************************************************** @@ -727,15 +750,6 @@ abstract class RequestOnConnBase ***********************************************************************/ - // TODO: when this deprecated method is removed, replace it with a - // receive() which asserts on the return value of receiveAndHandleEvents - deprecated("Replace calls to EventDispatcher.receive with receiveAndHandleEvents") - public int receive ( void delegate ( in void[] payload ) received ) - { - return this.receiveAndHandleEvents(received); - } - - /// ditto public int receiveAndHandleEvents ( void delegate ( in void[] payload ) received ) in diff --git a/src/swarm/neo/node/Connection.d b/src/swarm/neo/node/Connection.d old mode 100755 new mode 100644 index 715360b2..4ab1055f --- a/src/swarm/neo/node/Connection.d +++ b/src/swarm/neo/node/Connection.d @@ -25,9 +25,9 @@ class Connection: ConnectionBase import swarm.neo.authentication.HmacDef: Key; import swarm.neo.connection.YieldedRequestOnConns; - import ocean.sys.socket.AddressIPSocket; - import ocean.core.Enforce; + import ocean.io.select.EpollSelectDispatcher; + import ocean.sys.socket.AddressIPSocket; import core.sys.posix.netinet.in_: SOL_SOCKET, IPPROTO_TCP, SO_KEEPALIVE; diff --git a/src/swarm/neo/node/RequestOnConn.d b/src/swarm/neo/node/RequestOnConn.d old mode 100755 new mode 100644 index 6e98542c..ce92779f --- a/src/swarm/neo/node/RequestOnConn.d +++ b/src/swarm/neo/node/RequestOnConn.d @@ -32,6 +32,7 @@ abstract class RequestOnConn: RequestOnConnBase { import swarm.neo.protocol.Message: RequestId; import swarm.neo.connection.ConnectionBase; + import swarm.neo.connection.YieldedRequestOnConns; import swarm.neo.util.MessageFiber; import ocean.transition; diff --git a/src/swarm/neo/node/RequestSet.d b/src/swarm/neo/node/RequestSet.d old mode 100755 new mode 100644 index a3be441a..e03ec570 --- a/src/swarm/neo/node/RequestSet.d +++ b/src/swarm/neo/node/RequestSet.d @@ -116,6 +116,18 @@ class RequestSet return this.request_id = request_id; } + /*********************************************************************** + + Returns: + the id of this request + + ***********************************************************************/ + + public RequestId id ( ) /* d1to2fix_inject: const */ + { + return this.request_id; + } + /*********************************************************************** Called when ready to send a message for this request. @@ -395,7 +407,7 @@ class RequestSet out (request) { assert(request); - assert(request.request_id == id); + assert(request.id == id); assert(this.n_active_requests <= max_requests); } body diff --git a/src/swarm/neo/protocol/Message.d b/src/swarm/neo/protocol/Message.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/protocol/MessageParser.d b/src/swarm/neo/protocol/MessageParser.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/protocol/ProtocolError.d b/src/swarm/neo/protocol/ProtocolError.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/protocol/connect/ClientConnect.d b/src/swarm/neo/protocol/connect/ClientConnect.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/protocol/connect/ConnectProtocol.d b/src/swarm/neo/protocol/connect/ConnectProtocol.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/protocol/connect/NodeConnect.d b/src/swarm/neo/protocol/connect/NodeConnect.d old mode 100755 new mode 100644 index c9466c28..2e1d398a --- a/src/swarm/neo/protocol/connect/NodeConnect.d +++ b/src/swarm/neo/protocol/connect/NodeConnect.d @@ -21,6 +21,7 @@ class NodeConnect import swarm.neo.protocol.ProtocolError; import swarm.neo.authentication.HmacAuthCode; + import HmacDef = swarm.neo.authentication.HmacDef; import swarm.neo.authentication.Credentials; import swarm.neo.util.MessageFiber; @@ -40,7 +41,7 @@ class NodeConnect ***************************************************************************/ - private Const!(HmacAuthCode.Key[istring])* credentials; + private Const!(HmacDef.Key[istring])* credentials; /*************************************************************************** @@ -68,7 +69,7 @@ class NodeConnect ***************************************************************************/ - public this ( ref Const!(HmacAuthCode.Key[istring]) credentials ) + public this ( ref Const!(HmacDef.Key[istring]) credentials ) { this.credentials = &credentials; this.e_auth_rejected = new HmacAuthCode.RejectedException; @@ -113,7 +114,7 @@ class NodeConnect fiber, socket_fd, receiver, sender); scope ( exit ) this.protocol.unregisterEpoll(); - const request_type = this.protocol.MessageType.Authentication, + const request_type = MessageType.Authentication, protocol_version = 1; ubyte client_protocol_version = this.protocol.receiveProtocolVersion(); @@ -157,7 +158,7 @@ class NodeConnect this.e_auth_rejected.resetAuthParams(); // receive() callback, does the authentication. - void authenticate ( Const!(char)[] client_name, HmacAuthCode.Code client_code ) + void authenticate ( Const!(char)[] client_name, HmacDef.Code client_code ) { try { diff --git a/src/swarm/neo/protocol/socket/MessageGenerator.d b/src/swarm/neo/protocol/socket/MessageGenerator.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/protocol/socket/MessageReceiver.d b/src/swarm/neo/protocol/socket/MessageReceiver.d index b16e1947..86a4eb93 100644 --- a/src/swarm/neo/protocol/socket/MessageReceiver.d +++ b/src/swarm/neo/protocol/socket/MessageReceiver.d @@ -440,6 +440,8 @@ private class MessageReceiverBase class MessageReceiver: MessageReceiverBase { + import swarm.neo.protocol.ProtocolError; + import ocean.io.select.protocol.generic.ErrnoIOException: IOError; import unistd = core.sys.posix.unistd: read; @@ -598,6 +600,7 @@ unittest static class MessageReceiverTest: MessageReceiverBase { import swarm.neo.protocol.Message: MessageType, MessageHeader; + import swarm.neo.protocol.ProtocolError; import ocean.core.Test; import core.sys.posix.stdlib: erand48; diff --git a/src/swarm/neo/request/Command.d b/src/swarm/neo/request/Command.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/util/FiberTokenHashGenerator.d b/src/swarm/neo/util/FiberTokenHashGenerator.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/util/FieldSizeSum.d b/src/swarm/neo/util/FieldSizeSum.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/util/FixedSizeMap.d b/src/swarm/neo/util/FixedSizeMap.d old mode 100755 new mode 100644 diff --git a/src/swarm/neo/util/MessageFiber.d b/src/swarm/neo/util/MessageFiber.d old mode 100755 new mode 100644 index aa8668b8..df7bec3d --- a/src/swarm/neo/util/MessageFiber.d +++ b/src/swarm/neo/util/MessageFiber.d @@ -591,7 +591,10 @@ public class OceanMessageFiber // throw `msg.exc` if it isn't caught by the fiber. However, this method // should throw only exceptions passed to the `suspend()` call that // continues the call of this method. - auto throwable = this.fiber.call(false); + version (D_Version2) + auto throwable = this.fiber.call(Fiber.Rethrow.no); + else + auto throwable = this.fiber.call(false); // If the fiber terminated with an unhandled Error, rethrow it. if (cast(RethrowType)throwable) diff --git a/src/swarm/neo/util/TreeMap.d b/src/swarm/neo/util/TreeMap.d old mode 100755 new mode 100644 index e37b474a..d4edfb74 --- a/src/swarm/neo/util/TreeMap.d +++ b/src/swarm/neo/util/TreeMap.d @@ -141,7 +141,7 @@ struct TreeMap ( Node = eb64_node ) static if (is(typeof(Node.user_element_with_treemap_backlink) UserElement)) { - import ocean.core.Traits_tango: isReferenceType; + import ocean.core.Traits: isReferenceType; static assert(isReferenceType!(UserElement)); /*********************************************************************** diff --git a/src/swarm/neo/util/TreeQueue.d b/src/swarm/neo/util/TreeQueue.d index 16555533..5a3b48f5 100644 --- a/src/swarm/neo/util/TreeQueue.d +++ b/src/swarm/neo/util/TreeQueue.d @@ -37,7 +37,7 @@ template TreeQueue ( T ) // Provide a wrapper for all public TreeQueueCore methods. struct TreeQueue { - import ocean.core.Traits_tango: isAssocArrayType; + import ocean.core.Traits: isAssocArrayType; /******************************************************************* diff --git a/src/swarm/neo/util/Util.d b/src/swarm/neo/util/Util.d old mode 100755 new mode 100644 diff --git a/src/swarm/node/connection/ConnectionHandler.d b/src/swarm/node/connection/ConnectionHandler.d index 3749bfcc..461fef51 100644 --- a/src/swarm/node/connection/ConnectionHandler.d +++ b/src/swarm/node/connection/ConnectionHandler.d @@ -341,36 +341,6 @@ public abstract class ConnectionHandlerTemplate ( Commands : ICommandCodes ) } - /*************************************************************************** - - Handles the request according to handleRequest(), above, with the - addition of stats tracking for the request. - - Template params: - Resources = type of struct defining the types and names of resources - which a request can acquire from the shared pools - Acquired = type of class with getters for the resources acquired by - a request. Assumed to be generated by instantiating the - SharedResources_T template (see - swarm.common.connection.ISharedResources) with Resources. - - Params: - request = request handler to run - acquired = resources acquired while handling the request - rq_name = name of request for stats tracking - - ***************************************************************************/ - - deprecated("Call handleRequest() with the `stats` template argument set to " - "RequestStatsTracking.Count and the `rq_name` argument set") - protected void handleRequestTrackStats ( Resources, Acquired ) - ( IRequest request, Acquired acquired, cstring rq_name ) - { - this.handleRequest!(Resources, Acquired, RequestStatsTracking.Count)( - request, acquired, rq_name); - } - - /*************************************************************************** Mix-in protected abstract methods to handle individual commands. diff --git a/src/swarm/node/model/RecordActionCounters.d b/src/swarm/node/model/RecordActionCounters.d old mode 100755 new mode 100644 diff --git a/src/swarm/node/storage/model/IStorageChannels.d b/src/swarm/node/storage/model/IStorageChannels.d index 8ca65f58..3d37031c 100644 --- a/src/swarm/node/storage/model/IStorageChannels.d +++ b/src/swarm/node/storage/model/IStorageChannels.d @@ -34,7 +34,7 @@ import ocean.util.container.map.Map; import ocean.util.container.pool.ObjectPool; -import ocean.core.Exception_tango; +import ocean.core.ExceptionDefinitions; import ocean.util.log.Log; diff --git a/src/swarm/util/RecordStream.d b/src/swarm/util/RecordStream.d old mode 100755 new mode 100644 index dd1e795f..7ccde100 --- a/src/swarm/util/RecordStream.d +++ b/src/swarm/util/RecordStream.d @@ -20,7 +20,7 @@ import ocean.transition; *******************************************************************************/ import ocean.io.model.ISuspendable; -import ocean.io.serialize.SimpleSerializer; +import ocean.io.serialize.SimpleStreamSerializer; import ocean.io.Console : Cin, Cout; version ( UnitTest ) @@ -39,8 +39,7 @@ public struct Record { import swarm.util.Hash; - import ocean.core.Array : copy; - import ocean.core.Array_tango : find; + import ocean.core.Array : copy, find; import ocean.core.TypeConvert; import ocean.core.Enforce; @@ -126,13 +125,13 @@ public struct Record { if ( this.key.length ) { - SimpleSerializer.writeData(stream, this.key); + SimpleStreamSerializer.writeData(stream, this.key); } - SimpleSerializer.write(stream, Separator); + SimpleStreamSerializer.write(stream, Separator); buf.length = Base64.allocateEncodeSize(this.value); - SimpleSerializer.writeData(stream, Base64.encode(this.value, buf)); - SimpleSerializer.write(stream, '\n'); + SimpleStreamSerializer.writeData(stream, Base64.encode(this.value, buf)); + SimpleStreamSerializer.write(stream, '\n'); } @@ -243,12 +242,12 @@ public struct Record static assert(C.sizeof == char.sizeof); char c; - SimpleSerializer.read(stream, c); + SimpleStreamSerializer.read(stream, c); while ( c != sep ) { dst ~= cast(C)c; - SimpleSerializer.read(stream, c); + SimpleStreamSerializer.read(stream, c); } } diff --git a/src/swarm/util/log/ClientStats.d b/src/swarm/util/log/ClientStats.d index 86c813ea..1925f3ee 100644 --- a/src/swarm/util/log/ClientStats.d +++ b/src/swarm/util/log/ClientStats.d @@ -41,7 +41,7 @@ import ocean.io.select.EpollSelectDispatcher; import ocean.io.select.client.TimerEvent; -import ocean.util.log.Log; +import ocean.util.log.Appender; import ocean.util.log.Stats; diff --git a/src/swarm/util/node/log/Stats.d b/src/swarm/util/node/log/Stats.d old mode 100755 new mode 100644 index a7a2f5cd..7f358970 --- a/src/swarm/util/node/log/Stats.d +++ b/src/swarm/util/node/log/Stats.d @@ -234,6 +234,7 @@ private class ChannelsNodeStatsTemplate ( Logger = StatsLog ) : NodeStatsTemplate!(Logger) { import swarm.node.model.IChannelsNodeInfo; + import swarm.node.model.INodeInfo; /*************************************************************************** diff --git a/submodules/ocean b/submodules/ocean index 68199a60..bcbdd8a5 160000 --- a/submodules/ocean +++ b/submodules/ocean @@ -1 +1 @@ -Subproject commit 68199a60d87cc2e26ae44623eb29ab24e48aec90 +Subproject commit bcbdd8a5b159f9dbd7d7c02af5fa5803b3bd6bb3 diff --git a/test/neo/client/Client.d b/test/neo/client/Client.d new file mode 100644 index 00000000..745ad90a --- /dev/null +++ b/test/neo/client/Client.d @@ -0,0 +1,260 @@ +/******************************************************************************* + + Example client with the following features: + * Connects to a single node, specified in the constructor. + * Supports two simple requests: Put -- to write a value to the node -- + and Get -- to read a value from the node. + * A Task-blocking interface for connection and both requests. + + Copyright: + Copyright (c) 2017 sociomantic labs GmbH. All rights reserved + + License: + Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. + +*******************************************************************************/ + +module test.neo.client.Client; + +import ocean.transition; + +/// ditto +public class Client +{ + import ocean.io.select.EpollSelectDispatcher; + import swarm.neo.authentication.HmacDef: Key; + + /*************************************************************************** + + Neo API. + + ***************************************************************************/ + + private class Neo + { + import swarm.neo.client.mixins.ClientCore; + import swarm.neo.client.IRequestSet : IRequestWorkingData; + + /*********************************************************************** + + Imports of requests' public APIs. + + ***********************************************************************/ + + public import Get = test.neo.client.request.Get; + public import Put = test.neo.client.request.Put; + + /*********************************************************************** + + Imports of requests' internal implementations. + + ***********************************************************************/ + + struct Internals + { + import test.neo.client.request.internal.Get; + import test.neo.client.request.internal.Put; + } + + /// Instantiation of ClientCore. + mixin ClientCore!(); + + /// Instantiation of RequestStatsTemplate. + alias RequestStatsTemplate!("Put", "Get") RequestStats; + + /*********************************************************************** + + Assigns a Put request, instructing the node to associate the + specified value and key. + + Params: + key = key of record to add to node + value = value of record to add to node + notifier = notifier, called when interesting events occur for + this request + + ***********************************************************************/ + + public void put ( hash_t key, cstring value, Put.Notifier notifier ) + { + Internals.Put.UserSpecifiedParams params; + params.args.key = key; + params.args.value = value; + params.notifier.set(notifier); + + this.assign!(Internals.Put)(params); + } + + /*********************************************************************** + + Assigns a Get request, retrieving the value associated in the node + with the specified key (if one exists). + + Params: + key = key of record to retrieve from node + notifier = notifier, called when interesting events occur for + this request + + ***********************************************************************/ + + public void get ( hash_t key, Get.Notifier notifier ) + { + Internals.Get.UserSpecifiedParams params; + params.args.key = key; + params.notifier.set(notifier); + + this.assign!(Internals.Get)(params); + } + } + + /*************************************************************************** + + Task-blocking neo API. + + ***************************************************************************/ + + private class Blocking + { + import swarm.neo.client.mixins.TaskBlockingCore; + import ocean.core.Array : copy; + + /// Instantiation of TaskBlockingCore. + mixin TaskBlockingCore!(); + + /*********************************************************************** + + Assigns a Put request, instructing the node to associate the + specified value and key. The calling Task is blocked until the + request finishes. + + Params: + key = key of record to add to node + value = value of record to add to node + notifier = notifier, called when interesting events occur for + this request + + Returns: + true if the Put request succeeded, false on error + + ***********************************************************************/ + + public bool put ( hash_t key, cstring value, Neo.Put.Notifier notifier ) + { + auto task = Task.getThis(); + assert(task !is null); + + bool succeeded, finished; + void internalNotifier ( Neo.Put.Notification info, Neo.Put.Args args ) + { + notifier(info, args); + + if ( info.active == info.active.succeeded ) + succeeded = true; + + finished = true; + if ( task.suspended ) + task.resume(); + } + + this.outer.neo.put(key, value, &internalNotifier); + if ( !finished ) + task.suspend(); + + return succeeded; + } + + /*********************************************************************** + + Assigns a Get request, retrieving the value associated in the node + with the specified key (if one exists). The calling Task is blocked + until the request finishes. + + Params: + key = key of record to retrive from node + value = output value, receives the value of the record (null, if + no value exists for the specified key) + notifier = notifier, called when interesting events occur for + this request + + Returns: + true if the Get request succeeded, false on error + + ***********************************************************************/ + + public bool get ( hash_t key, ref void[] value, Neo.Get.Notifier notifier ) + { + auto task = Task.getThis(); + assert(task !is null); + + bool succeeded, finished; + void internalNotifier ( Neo.Get.Notification info, Neo.Get.Args args ) + { + notifier(info, args); + + with ( info.Active ) switch ( info.active ) + { + case received: + value.copy(info.received.value); + goto case nothing; + + case nothing: + succeeded = true; + break; + + default: + succeeded = false; + break; + } + + finished = true; + if ( task.suspended ) + task.resume(); + } + + this.outer.neo.get(key, &internalNotifier); + if ( !finished ) + task.suspend(); + + return succeeded; + } + } + + /// Epoll instance used by client. + private EpollSelectDispatcher epoll; + + /// Public neo API. + public Neo neo; + + /// Public Task-blocking API. + public Blocking blocking; + + /// Connection notifier passed to the ctor. + private Neo.ConnectionNotifier conn_notifier; + + /*************************************************************************** + + Constructor. + + Params: + epoll = epoll instance to be used by the client + addr = address of node to connect to + port = port on which the node is listening for neo protocol + connections + conn_notifier = connection notifier + + ***************************************************************************/ + + public this ( EpollSelectDispatcher epoll, cstring addr, ushort port, + Neo.ConnectionNotifier conn_notifier ) + { + this.epoll = epoll; + this.conn_notifier = conn_notifier; + + auto auth_name = "dummy"; + ubyte[] auth_key = Key.init.content; + this.neo = new Neo(auth_name, auth_key, this.conn_notifier); + this.neo.addNode(addr, port); + + this.blocking = new Blocking; + } +} diff --git a/test/neo/client/request/Get.d b/test/neo/client/request/Get.d new file mode 100644 index 00000000..40a8479d --- /dev/null +++ b/test/neo/client/request/Get.d @@ -0,0 +1,71 @@ +/******************************************************************************* + + User-facing API for the client's Get request. + + Copyright: + Copyright (c) 2017 sociomantic labs GmbH. All rights reserved + + License: + Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. + +*******************************************************************************/ + +module test.neo.client.request.Get; + +import ocean.transition; +import ocean.core.SmartUnion; +import swarm.neo.client.NotifierTypes; + +/******************************************************************************* + + Request-specific arguments provided by the user and passed to the notifier. + +*******************************************************************************/ + +public struct Args +{ + hash_t key; +} + +/******************************************************************************* + + Union of possible notifications. + +*******************************************************************************/ + +private union NotificationUnion +{ + /// The request succeeded, but no record was present. + NoInfo nothing; + + /// The request succeeded and a value was retrieved. + RequestDataInfo received; + + /// The request was tried and failed due to a connection error. + RequestNodeExceptionInfo node_disconnected; + + /// The request was tried and failed due to an internal node error. + RequestNodeInfo node_error; + + /// The request was tried and failed because it is unsupported. + RequestNodeUnsupportedInfo unsupported; + + /// The request failed for an unknown reason, presumably an internal error. + NoInfo error; +} + +/******************************************************************************* + + Notification smart union. + +*******************************************************************************/ + +public alias SmartUnion!(NotificationUnion) Notification; + +/******************************************************************************* + + Type of notifcation delegate. + +*******************************************************************************/ + +public alias void delegate ( Notification, Args ) Notifier; diff --git a/test/neo/client/request/Put.d b/test/neo/client/request/Put.d new file mode 100644 index 00000000..95bfdd67 --- /dev/null +++ b/test/neo/client/request/Put.d @@ -0,0 +1,69 @@ +/******************************************************************************* + + User-facing API for the client's Put request. + + Copyright: + Copyright (c) 2017 sociomantic labs GmbH. All rights reserved + + License: + Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. + +*******************************************************************************/ + +module test.neo.client.request.Put; + +import ocean.transition; +import ocean.core.SmartUnion; +import swarm.neo.client.NotifierTypes; + +/******************************************************************************* + + Request-specific arguments provided by the user and passed to the notifier. + +*******************************************************************************/ + +public struct Args +{ + hash_t key; + cstring value; +} + +/******************************************************************************* + + Union of possible notifications. + +*******************************************************************************/ + +private union NotificationUnion +{ + /// The request succeeded. + NoInfo succeeded; + + /// The request was tried and failed due to a connection error. + RequestNodeExceptionInfo node_disconnected; + + /// The request was tried and failed due to an internal node error. + RequestNodeInfo node_error; + + /// The request was tried and failed because it is unsupported. + RequestNodeUnsupportedInfo unsupported; + + /// The request failed for an unknown reason, presumably an internal error. + NoInfo error; +} + +/******************************************************************************* + + Notification smart union. + +*******************************************************************************/ + +public alias SmartUnion!(NotificationUnion) Notification; + +/******************************************************************************* + + Type of notifcation delegate. + +*******************************************************************************/ + +public alias void delegate ( Notification, Args ) Notifier; diff --git a/test/neo/client/request/internal/Get.d b/test/neo/client/request/internal/Get.d new file mode 100644 index 00000000..3fb6711e --- /dev/null +++ b/test/neo/client/request/internal/Get.d @@ -0,0 +1,250 @@ +/******************************************************************************* + + Internal implementation of the client's Get request. + + Copyright: + Copyright (c) 2017 sociomantic labs GmbH. All rights reserved + + License: + Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. + +*******************************************************************************/ + +module test.neo.client.request.internal.Get; + +import ocean.transition; + +/******************************************************************************* + + Get request implementation. + + Note that request structs act simply as namespaces for the collection of + symbols required to implement a request. They are never instantiated and + have no fields or non-static functions. + + The client expects several things to be present in a request struct: + 1. The static constants request_type and request_code + 2. The UserSpecifiedParams struct, containing all user-specified request + setup (including a notifier) + 3. The Notifier delegate type + 4. Optionally, the Controller type (if the request can be controlled, + after it has begun) + 5. The handler() function + 6. The all_finished_notifier() function + + The RequestCore mixin provides items 1 and 2. + +*******************************************************************************/ + +public struct Get +{ + import test.neo.common.Get; + import test.neo.client.request.Get; + import test.neo.common.RequestCodes; + import swarm.neo.client.mixins.RequestCore; + import swarm.neo.client.RequestHandlers : UseNodeDg; + + import ocean.io.select.protocol.generic.ErrnoIOException: IOError; + + /*************************************************************************** + + Data which the request needs while it is progress. An instance of this + struct is stored per connection on which the request runs and is passed + to the request handler. + + ***************************************************************************/ + + private static struct SharedWorking + { + /// Enum indicating the ways in which the request may end. + public enum Result + { + Failure, // Default value; unknown error (presumably in client) + Error, // Node or I/O error + Empty, // Record not found + Received // Received record + } + + /// The way in which the request ended. Used by the finished notifier to + /// decide what kind of notification (if any) to send to the user. + Result result; + } + + /*************************************************************************** + + Data which each request-on-conn needs while it is progress. An instance + of this struct is stored per connection on which the request runs and is + passed to the request handler. + + ***************************************************************************/ + + private static struct Working + { + // Dummy (not required by this request) + } + + /*************************************************************************** + + Request core. Mixes in the types `NotificationInfo`, `Notifier`, + `Params`, `Context` plus the static constants `request_type` and + `request_code`. + + ***************************************************************************/ + + mixin RequestCore!(RequestType.SingleNode, RequestCode.Get, 0, Args, + SharedWorking, Working, Notification); + + /*************************************************************************** + + Request handler. Called from RequestOnConn.runHandler(). + + Params: + use_node = delegate to be called to allow the request to send / + receive data over a specific connection. May be called as many + times as required by the request + context_blob = untyped chunk of data containing the serialized + context of the request which is to be handled + working_blob = untyped chunk of data containing the serialized + working data for the request on this connection + + ***************************************************************************/ + + public static void handler ( UseNodeDg use_node, void[] context_blob, + void[] working_blob ) + { + auto context = Get.getContext(context_blob); + context.shared_working.result = SharedWorking.Result.Failure; + + // In a real client, you'd have to have some way for a request to decide + // which node to operate on. In this simple example, we just hard-code + // the node's address/port. + IPAddress node; + node.setAddress("127.0.0.1"); + node.port = 10_000; + + use_node(node, + ( RequestOnConn.EventDispatcher conn ) + { + try + { + // Send request info to node + conn.send( + ( conn.Payload payload ) + { + payload.add(Get.cmd.code); + payload.add(Get.cmd.ver); + payload.add(context.user_params.args.key); + } + ); + + // Receive status from node + auto status = conn.receiveValue!(StatusCode)(); + if ( Get.handleGlobalStatusCodes(status, context, + conn.remote_address) ) + { + // Global codes (not supported / version not supported) + context.shared_working.result = SharedWorking.Result.Error; + } + else + { + // Get-specific codes + with ( RequestStatusCode ) switch ( status ) + { + case Value: + context.shared_working.result = + SharedWorking.Result.Received; + + // Receive record value from node. + conn.receive( + ( in void[] const_payload ) + { + Const!(void)[] payload = const_payload; + auto value = + conn.message_parser.getArray!(void)(payload); + + Notification n; + n.received = RequestDataInfo( + context.request_id, value); + Get.notify(context.user_params, n); + } + ); + break; + + case Empty: + context.shared_working.result = + SharedWorking.Result.Empty; + break; + + case Error: + context.shared_working.result = + SharedWorking.Result.Error; + + // The node returned an error code. Notify the user. + Notification n; + n.node_error = RequestNodeInfo( + context.request_id, conn.remote_address); + Get.notify(context.user_params, n); + break; + + default: + // Treat unknown codes as internal errors. + goto case Error; + } + } + } + catch ( IOError e ) + { + // A connection error occurred. Notify the user. + context.shared_working.result = + SharedWorking.Result.Error; + + Notification n; + n.node_disconnected = RequestNodeExceptionInfo( + context.request_id, conn.remote_address, e); + Get.notify(context.user_params, n); + } + }); + } + + /*************************************************************************** + + Request finished notifier. Called from Request.handlerFinished(). + + Params: + context_blob = untyped chunk of data containing the serialized + context of the request which is finishing + working_data_iter = iterator over the stored working data associated + with each connection on which this request was run + + ***************************************************************************/ + + public static void all_finished_notifier ( void[] context_blob, + IRequestWorkingData working_data_iter ) + { + auto context = Get.getContext(context_blob); + + Notification n; + + with ( SharedWorking.Result ) switch ( context.shared_working.result ) + { + case Empty: + n.nothing = NoInfo(); + break; + case Failure: + n.error = NoInfo(); + break; + case Received: + // Received notification was already handled in handle(), where + // the value received from the node is available. + return; + case Error: + // Error notification was already handled in handle(), where + // we have access to the node's address &/ exception. + return; + default: + assert(false); + } + + Get.notify(context.user_params, n); + } +} diff --git a/test/neo/client/request/internal/Put.d b/test/neo/client/request/internal/Put.d new file mode 100644 index 00000000..8dceebba --- /dev/null +++ b/test/neo/client/request/internal/Put.d @@ -0,0 +1,226 @@ +/******************************************************************************* + + Internal implementation of the client's Put request. + + Copyright: + Copyright (c) 2017 sociomantic labs GmbH. All rights reserved + + License: + Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. + +*******************************************************************************/ + +module test.neo.client.request.internal.Put; + +import ocean.transition; + +/******************************************************************************* + + Put request implementation. + + Note that request structs act simply as namespaces for the collection of + symbols required to implement a request. They are never instantiated and + have no fields or non-static functions. + + The client expects several things to be present in a request struct: + 1. The static constants request_type and request_code + 2. The UserSpecifiedParams struct, containing all user-specified request + setup (including a notifier) + 3. The Notifier delegate type + 4. Optionally, the Controller type (if the request can be controlled, + after it has begun) + 5. The handler() function + 6. The all_finished_notifier() function + + The RequestCore mixin provides items 1 and 2. + +*******************************************************************************/ + +public struct Put +{ + import test.neo.common.Put; + import test.neo.client.request.Put; + import test.neo.common.RequestCodes; + import swarm.neo.client.mixins.RequestCore; + import swarm.neo.client.RequestHandlers : UseNodeDg; + + import ocean.io.select.protocol.generic.ErrnoIOException: IOError; + + /*************************************************************************** + + Data which the request needs while it is progress. An instance of this + struct is stored per connection on which the request runs and is passed + to the request handler. + + ***************************************************************************/ + + private static struct SharedWorking + { + /// Enum indicating the ways in which the request may end. + public enum Result + { + Failure, // Default value; unknown error (presumably in client) + Error, // Node or I/O error + Put // Put record + } + + /// The way in which the request ended. Used by the finished notifier to + /// decide what kind of notification (if any) to send to the user. + Result result; + } + + /*************************************************************************** + + Data which each request-on-conn needs while it is progress. An instance + of this struct is stored per connection on which the request runs and is + passed to the request handler. + + ***************************************************************************/ + + private static struct Working + { + // Dummy (not required by this request) + } + + /*************************************************************************** + + Request core. Mixes in the types `NotificationInfo`, `Notifier`, + `Params`, `Context` plus the static constants `request_type` and + `request_code`. + + ***************************************************************************/ + + mixin RequestCore!(RequestType.SingleNode, RequestCode.Put, 0, Args, + SharedWorking, Working, Notification); + + /*************************************************************************** + + Request handler. Called from RequestOnConn.runHandler(). + + Params: + use_node = delegate to be called to allow the request to send / + receive data over a specific connection. May be called as many + times as required by the request + context_blob = untyped chunk of data containing the serialized + context of the request which is to be handled + working_blob = untyped chunk of data containing the serialized + working data for the request on this connection + + ***************************************************************************/ + + public static void handler ( UseNodeDg use_node, void[] context_blob, + void[] working_blob ) + { + auto context = Put.getContext(context_blob); + context.shared_working.result = SharedWorking.Result.Failure; + + // In a real client, you'd have to have some way for a request to decide + // which node to operate on. In this simple example, we just hard-code + // the node's address/port. + IPAddress node; + node.setAddress("127.0.0.1"); + node.port = 10_000; + + use_node(node, + ( RequestOnConn.EventDispatcher conn ) + { + try + { + // Send request info to node + conn.send( + ( conn.Payload payload ) + { + payload.add(Put.cmd.code); + payload.add(Put.cmd.ver); + payload.add(context.user_params.args.key); + payload.addArray(context.user_params.args.value); + } + ); + + // Receive status from node + auto status = conn.receiveValue!(StatusCode)(); + if ( Put.handleGlobalStatusCodes(status, context, + conn.remote_address) ) + { + // Global codes (not supported / version not supported) + context.shared_working.result = SharedWorking.Result.Error; + } + else + { + // Put-specific codes + with ( RequestStatusCode ) switch ( status ) + { + case Succeeded: + context.shared_working.result = + SharedWorking.Result.Put; + break; + + case Error: + context.shared_working.result = + SharedWorking.Result.Error; + + // The node returned an error code. Notify the user. + Notification n; + n.node_error = RequestNodeInfo( + context.request_id, conn.remote_address); + Put.notify(context.user_params, n); + break; + + default: + // Treat unknown codes as internal errors. + goto case Error; + } + } + } + catch ( IOError e ) + { + // A connection error occurred. Notify the user. + context.shared_working.result = + SharedWorking.Result.Error; + + Notification n; + n.node_disconnected = RequestNodeExceptionInfo( + context.request_id, conn.remote_address, e); + Put.notify(context.user_params, n); + } + }); + } + + /*************************************************************************** + + Request finished notifier. Called from Request.handlerFinished(). + + Params: + context_blob = untyped chunk of data containing the serialized + context of the request which is finishing + working_data_iter = iterator over the stored working data associated + with each connection on which this request was run + + ***************************************************************************/ + + public static void all_finished_notifier ( void[] context_blob, + IRequestWorkingData working_data_iter ) + { + auto context = Put.getContext(context_blob); + + Notification n; + + with ( SharedWorking.Result ) switch ( context.shared_working.result ) + { + case Failure: + n.error = NoInfo(); + break; + case Put: + n.succeeded = NoInfo(); + break; + case Error: + // Error notification was already handled in handle(), where + // we have access to the node's address &/ exception. + return; + default: + assert(false); + } + + Put.notify(context.user_params, n); + } +} diff --git a/test/neo/common/Get.d b/test/neo/common/Get.d new file mode 100644 index 00000000..71058ae0 --- /dev/null +++ b/test/neo/common/Get.d @@ -0,0 +1,30 @@ +/******************************************************************************* + + Common protocol definitions for the Get request. + + Copyright: + Copyright (c) 2017 sociomantic labs GmbH. All rights reserved + + License: + Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. + +*******************************************************************************/ + +module test.neo.common.Get; + +import swarm.neo.request.Command; + +/******************************************************************************* + + Status code enum. Sent from the node to the client. + +*******************************************************************************/ + +public enum RequestStatusCode : StatusCode +{ + None, // Invalid, default value + + Value, // Value fetched + Empty, // Record empty + Error // Internal node error occurred +} diff --git a/test/neo/common/Put.d b/test/neo/common/Put.d new file mode 100644 index 00000000..2235abe4 --- /dev/null +++ b/test/neo/common/Put.d @@ -0,0 +1,29 @@ +/******************************************************************************* + + Common protocol definitions for the Put request. + + Copyright: + Copyright (c) 2017 sociomantic labs GmbH. All rights reserved + + License: + Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. + +*******************************************************************************/ + +module test.neo.common.Put; + +import swarm.neo.request.Command; + +/******************************************************************************* + + Status code enum. Sent from the node to the client. + +*******************************************************************************/ + +public enum RequestStatusCode : StatusCode +{ + None, // Invalid, default value + + Succeeded, // Value fetched + Error // Internal node error occurred +} diff --git a/test/neo/common/RequestCodes.d b/test/neo/common/RequestCodes.d new file mode 100644 index 00000000..e5055d41 --- /dev/null +++ b/test/neo/common/RequestCodes.d @@ -0,0 +1,20 @@ +/******************************************************************************* + + List of request codes. + + Copyright: + Copyright (c) 2017 sociomantic labs GmbH. All rights reserved + + License: + Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. + +*******************************************************************************/ + +module test.neo.common.RequestCodes; + +public enum RequestCode : ubyte +{ + None, + Put, + Get +} diff --git a/test/neo/main.d b/test/neo/main.d new file mode 100644 index 00000000..9d974015 --- /dev/null +++ b/test/neo/main.d @@ -0,0 +1,117 @@ +/******************************************************************************* + + Simple test for the example client and node. Connects and runs two requests. + + Copyright: + Copyright (c) 2017 sociomantic labs GmbH. All rights reserved + + License: + Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. + +*******************************************************************************/ + +module test.neo.main; + +import ocean.transition; +import ocean.task.Scheduler; +import ocean.task.Task; + +/******************************************************************************* + + Task which does the following: + 1. Constructs a node and registers its listening sockets with epoll. + 2. Constructs a client and sets it to connect to the node. + 3. Blocks until the connection has succeeded. + 4. Assigns a Put request to write a record to the node. Blocks until the + request has finished. + 5. Assigns a Get request to retrieve the record from the node. Blocks + until the request has finished. + 6. Shuts down the scheduler to exit. + +*******************************************************************************/ + +class Test : Task +{ + import test.neo.client.Client; + import test.neo.node.Node; + + import swarm.neo.client.requests.NotificationFormatter; + + import ocean.core.Enforce; + import ocean.io.Stdout; + + /// Example node. + Node node; + + /// Example client. + Client client; + + /*************************************************************************** + + Task method to be run in a worker fiber. + + ***************************************************************************/ + + override public void run ( ) + { + mstring msg_buf; + + this.node = new Node(theScheduler.epoll, "127.0.0.1", 10_000); + this.client = new Client(theScheduler.epoll, "127.0.0.1", 10_000, + &this.connNotifier); + + this.client.blocking.waitAllNodesConnected(); + + auto ok = this.client.blocking.put(23, "hello", + ( Client.Neo.Put.Notification info, Client.Neo.Put.Args args ) + { + formatNotification(info, msg_buf); + Stdout.formatln("Put: {}", msg_buf); + } + ); + enforce(ok, "Put request failed"); + + void[] value; + ok = this.client.blocking.get(23, value, + ( Client.Neo.Get.Notification info, Client.Neo.Get.Args args ) + { + formatNotification(info, msg_buf); + Stdout.formatln("Get: {}", msg_buf); + } + ); + enforce(ok, "Get request failed"); + enforce(value == cast(void[])"hello"); + + theScheduler.shutdown(); + } + + /*************************************************************************** + + Delegate called by the client when an event relating to a connection + (e.g. connection established or connection error) occurs. + + Params: + info = smart-union whose active member describes the notification + + ***************************************************************************/ + + private void connNotifier ( Client.Neo.ConnNotification info ) + { + mstring buf; + formatNotification(info, buf); + Stdout.formatln("Conn: {}", buf); + } +} + +/******************************************************************************* + + Initialises the scheduler and runs the test task. + +*******************************************************************************/ + +void main ( ) +{ + initScheduler(SchedulerConfiguration.init); + theScheduler.schedule(new Test); + theScheduler.eventLoop(); +} diff --git a/test/neo/node/Node.d b/test/neo/node/Node.d new file mode 100644 index 00000000..53d8cb12 --- /dev/null +++ b/test/neo/node/Node.d @@ -0,0 +1,111 @@ +/******************************************************************************* + + Example node with the following features: + * Listens on two ports: one with the neo protocol and one with the + legacy protocol. The latter protocol is unused in this example. + * Contains a simplistic key-value storage engine, with string values and + hash_t keys. + * Supports tow requests: Put -- to add or update a value in the storage + engine -- and Get -- to retrieve a value from the storage engine, if + it exists. + + Copyright: + Copyright (c) 2017 sociomantic labs GmbH. All rights reserved + + License: + Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. + +*******************************************************************************/ + +module test.neo.node.Node; + +import ocean.transition; +import swarm.node.model.NeoNode; +import swarm.node.connection.ConnectionHandler; +import swarm.Const : ICommandCodes, NodeItem; + +/// ditto +public class Node : NodeBase!(ConnHandler) +{ + import ocean.io.select.EpollSelectDispatcher; + import swarm.neo.authentication.HmacDef: Key; + + import test.neo.common.RequestCodes; + import test.neo.node.Storage; + + import Get = test.neo.node.request.Get; + import Put = test.neo.node.request.Put; + + /// Storage engine. + private Storage storage; + + /*************************************************************************** + + Constructor. + + Params: + epoll = epoll instance with which to register connections + addr = address to bind to + neo_port = port to listen on for neo connections + + ***************************************************************************/ + + public this ( EpollSelectDispatcher epoll, cstring addr, ushort neo_port ) + in + { + assert(neo_port > 2); + } + body + { + this.storage = new Storage; + + Options options; + options.epoll = epoll; + options.cmd_handlers[RequestCode.Get] = &Get.handle; + options.cmd_handlers[RequestCode.Put] = &Put.handle; + options.credentials_map["dummy"] = Key.init; + options.shared_resources = this.storage; + + const backlog = 1_000; + auto legacy_port = NodeItem(addr.dup, cast(ushort)(neo_port - 1)); + super(legacy_port, neo_port, new ConnectionSetupParams, + options, backlog); + + // Register the listening ports with epoll. + this.register(epoll); + } + + /*************************************************************************** + + Returns: + string identifying the type of this node + + ***************************************************************************/ + + override protected cstring id ( ) + { + return "example"; + } +} + +/******************************************************************************* + + Legacy protocol connection handler. Required by NodeBase but unused in this + example. + +*******************************************************************************/ + +private class ConnHandler : ConnectionHandlerTemplate!(ICommandCodes) +{ + import ocean.net.server.connection.IConnectionHandler; + + public this ( void delegate(IConnectionHandler) finaliser, + ConnectionSetupParams params ) + { + super(finaliser, params); + } + + override protected void handleCommand () {} + + override protected void handleNone () {} +} diff --git a/test/neo/node/Storage.d b/test/neo/node/Storage.d new file mode 100644 index 00000000..f333e88d --- /dev/null +++ b/test/neo/node/Storage.d @@ -0,0 +1,21 @@ +/******************************************************************************* + + Super simplistic storage implementation of the example node. + + Copyright: + Copyright (c) 2017 sociomantic labs GmbH. All rights reserved + + License: + Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. + +*******************************************************************************/ + +module test.neo.node.Storage; + +import ocean.transition; + +public class Storage +{ + /// Values are simply stored in an associative array, indexed by key. + public mstring[hash_t] map; +} diff --git a/test/neo/node/request/Get.d b/test/neo/node/request/Get.d new file mode 100644 index 00000000..136ef052 --- /dev/null +++ b/test/neo/node/request/Get.d @@ -0,0 +1,118 @@ +/******************************************************************************* + + Internal implementation of the node's Get request. + + Copyright: + Copyright (c) 2017 sociomantic labs GmbH. All rights reserved + + License: + Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. + +*******************************************************************************/ + +module test.neo.node.request.Get; + +import ocean.transition; +import test.neo.node.Storage; +import swarm.neo.node.RequestOnConn; +import swarm.neo.request.Command; + +/******************************************************************************* + + The request handler for the table of handlers. When called, runs in a fiber + that can be controlled via `connection`. + + Params: + shared_resources = an opaque object containing resources owned by the + node which are required by the request + connection = performs connection socket I/O and manages the fiber + cmdver = the version number of the Consume command as specified by + the client + msg_payload = the payload of the first message of this request + +*******************************************************************************/ + +public void handle ( Object shared_resources, RequestOnConn connection, + Command.Version cmdver, Const!(void)[] msg_payload ) +{ + auto storage = cast(Storage)shared_resources; + assert(storage); + + switch ( cmdver ) + { + case 0: + scope rq = new GetImpl_v0; + rq.handle(storage, connection, msg_payload); + break; + + default: + auto ed = connection.event_dispatcher; + ed.send( + ( ed.Payload payload ) + { + payload.addConstant(GlobalStatusCode.RequestVersionNotSupported); + } + ); + break; + } +} + +/******************************************************************************* + + Implementation of the v0 Get request protocol. + +*******************************************************************************/ + +private scope class GetImpl_v0 +{ + import test.neo.common.Get; + + /*************************************************************************** + + Request handler. + + Params: + storage = storage engine instance to operate on + connection = connection to client + msg_payload = initial message read from client to begin the request + (the request code and version are assumed to be extracted) + + ***************************************************************************/ + + final public void handle ( Storage storage, RequestOnConn connection, + Const!(void)[] msg_payload ) + { + auto ed = connection.event_dispatcher; + auto parser = ed.message_parser; + + hash_t key; + parser.parseBody(msg_payload, key); + + auto record = key in storage.map; + if ( record is null ) + { + ed.send( + ( ed.Payload payload ) + { + payload.addConstant(RequestStatusCode.Empty); + } + ); + } + else + { + ed.send( + ( ed.Payload payload ) + { + payload.addConstant(RequestStatusCode.Value); + } + ); + + ed.send( + ( ed.Payload payload ) + { + payload.addArray(*record); + } + ); + } + } +} diff --git a/test/neo/node/request/Put.d b/test/neo/node/request/Put.d new file mode 100644 index 00000000..3ffb41ec --- /dev/null +++ b/test/neo/node/request/Put.d @@ -0,0 +1,101 @@ +/******************************************************************************* + + Internal implementation of the node's Put request. + + Copyright: + Copyright (c) 2017 sociomantic labs GmbH. All rights reserved + + License: + Boost Software License Version 1.0. See LICENSE_BOOST.txt for details. + +*******************************************************************************/ + +module test.neo.node.request.Put; + +import ocean.transition; +import test.neo.node.Storage; +import swarm.neo.node.RequestOnConn; +import swarm.neo.request.Command; + +/******************************************************************************* + + The request handler for the table of handlers. When called, runs in a fiber + that can be controlled via `connection`. + + Params: + shared_resources = an opaque object containing resources owned by the + node which are required by the request + connection = performs connection socket I/O and manages the fiber + cmdver = the version number of the Consume command as specified by + the client + msg_payload = the payload of the first message of this request + +*******************************************************************************/ + +public void handle ( Object shared_resources, RequestOnConn connection, + Command.Version cmdver, Const!(void)[] msg_payload ) +{ + auto storage = cast(Storage)shared_resources; + assert(storage); + + switch ( cmdver ) + { + case 0: + scope rq = new PutImpl_v0; + rq.handle(storage, connection, msg_payload); + break; + + default: + auto ed = connection.event_dispatcher; + ed.send( + ( ed.Payload payload ) + { + payload.addConstant(GlobalStatusCode.RequestVersionNotSupported); + } + ); + break; + } +} + +/******************************************************************************* + + Implementation of the v0 Put request protocol. + +*******************************************************************************/ + +private scope class PutImpl_v0 +{ + import test.neo.common.Put; + + /*************************************************************************** + + Request handler. + + Params: + storage = storage engine instance to operate on + connection = connection to client + msg_payload = initial message read from client to begin the request + (the request code and version are assumed to be extracted) + + ***************************************************************************/ + + final public void handle ( Storage storage, RequestOnConn connection, + Const!(void)[] msg_payload ) + { + auto ed = connection.event_dispatcher; + auto parser = ed.message_parser; + + hash_t key; + cstring value; + parser.parseBody(msg_payload, key, value); + + storage.map[key] = value.dup; + + ed.send( + ( ed.Payload payload ) + { + payload.addConstant(RequestStatusCode.Succeeded); + } + ); + } +}