Skip to content

Commit

Permalink
ReactiveSocket d'tor can be called from signal handler of a stream. (#37
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Mateusz Machalica committed May 18, 2016
1 parent f88b352 commit a68eee4
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 77 deletions.
9 changes: 9 additions & 0 deletions reactivesocket-cpp/src/ConnectionAutomaton.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ void ConnectionAutomaton::connect() {
connection_->setInput(*this);
}

void ConnectionAutomaton::disconnect() {
// Send terminal signals to the DuplexConnection's input and output before
// tearing it down. We must do this per DuplexConnection specification (see
// interface definition).
connectionOutput_.onComplete();
connectionInputSub_.cancel();
connection_.reset();
}

ConnectionAutomaton::~ConnectionAutomaton() {
// We rely on SubscriptionPtr and SubscriberPtr to dispatch appropriate
// terminal signals.
Expand Down
25 changes: 18 additions & 7 deletions reactivesocket-cpp/src/ConnectionAutomaton.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ using StreamId = uint32_t;
using StreamAutomatonFactory = std::function<bool(StreamId, Payload&)>;

/// Handles connection-level frames and (de)multiplexes streams.
///
/// Instances of this class should be accessed and managed via shared_ptr,
/// instead of the pattern reflected in MemoryMixin and IntrusiveDeleter.
/// The reason why such a simple memory management story is possible lies in the
/// fact that there is no request(n)-based flow control between stream
/// automata and ConnectionAutomaton.
class ConnectionAutomaton :
/// Registered as an input in the DuplexConnection.
public Subscriber<Payload>,
Expand All @@ -48,11 +54,17 @@ class ConnectionAutomaton :
/// processing of one or more frames.
void connect();

/// Terminates underlying connection.
///
/// This may synchronously deliver terminal signals to all
/// AbstractStreamAutomaton attached to this ConnectionAutomaton.
void disconnect();

~ConnectionAutomaton();

/// @{
/// A contract exposed to AbstractStreamAutomaton, modelled after Subscriber
/// and Subscription contracts, while omitting flow control related signals
/// and Subscription contracts, while omitting flow control related signals.

/// Adds a stream automaton to the connection.
///
Expand Down Expand Up @@ -82,11 +94,14 @@ class ConnectionAutomaton :
/// Per ReactiveStreams specification:
/// 1. no other signal can be delivered during or after this one,
/// 2. "unsubscribe handshake" guarantees that the signal will be delivered
/// exactly once, even if the automaton initiated stream closure,
/// at least once, even if the automaton initiated stream closure,
/// 3. per "unsubscribe handshake", the automaton must deliver corresponding
/// terminal signal to the connection.
///
/// Additionally, the signal is idempotent.
/// Additionally, in order to simplify implementation of stream automaton:
/// 4. the signal bound with a particular StreamId is idempotent and may be
/// delivered multiple times as long as the caller holds shared_ptr to
/// ConnectionAutomaton.
void endStream(StreamId streamId, StreamCompletionSignal signal);
/// @}

Expand Down Expand Up @@ -127,10 +142,6 @@ class ConnectionAutomaton :
connectionOutput_;
reactivestreams::SubscriptionPtr<Subscription> connectionInputSub_;

enum class State : uint8_t {
OPEN,
CLOSED,
} state_{State::OPEN};
std::unordered_map<StreamId, AbstractStreamAutomaton*> streams_;
reactivestreams::AllowanceSemaphore writeAllowance_;
std::deque<Payload> pendingWrites_; // TODO(stupaq): two vectors?
Expand Down
35 changes: 20 additions & 15 deletions reactivesocket-cpp/src/ReactiveSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <folly/Memory.h>
#include <folly/MoveWrapper.h>

#include "reactivesocket-cpp/src/ConnectionAutomaton.h"
#include "reactivesocket-cpp/src/DuplexConnection.h"
#include "reactivesocket-cpp/src/Frame.h"
#include "reactivesocket-cpp/src/Payload.h"
Expand All @@ -22,14 +23,18 @@
namespace lithium {
namespace reactivesocket {

ReactiveSocket::~ReactiveSocket() {}
ReactiveSocket::~ReactiveSocket() {
// Force connection closure, this will trigger terminal signals to be
// delivered to all stream automata.
connection_->disconnect();
}

std::unique_ptr<ReactiveSocket> ReactiveSocket::fromClientConnection(
std::unique_ptr<DuplexConnection> connection,
std::unique_ptr<RequestHandler> handler) {
std::unique_ptr<ReactiveSocket> socket(
new ReactiveSocket(false, std::move(connection), std::move(handler)));
socket->connection_.connect();
socket->connection_->connect();
return socket;
}

Expand All @@ -38,7 +43,7 @@ std::unique_ptr<ReactiveSocket> ReactiveSocket::fromServerConnection(
std::unique_ptr<RequestHandler> handler) {
std::unique_ptr<ReactiveSocket> socket(
new ReactiveSocket(true, std::move(connection), std::move(handler)));
socket->connection_.connect();
socket->connection_->connect();
return socket;
}

Expand All @@ -47,9 +52,9 @@ Subscriber<Payload>& ReactiveSocket::requestChannel(
// TODO(stupaq): handle any exceptions
StreamId streamId = nextStreamId_;
nextStreamId_ += 2;
ChannelResponder::Parameters params = {&connection_, streamId};
ChannelResponder::Parameters params = {connection_, streamId};
auto automaton = new ChannelRequester(params);
connection_.addStream(streamId, *automaton);
connection_->addStream(streamId, *automaton);
automaton->subscribe(responseSink);
responseSink.onSubscribe(*automaton);
automaton->start();
Expand All @@ -62,9 +67,9 @@ void ReactiveSocket::requestSubscription(
// TODO(stupaq): handle any exceptions
StreamId streamId = nextStreamId_;
nextStreamId_ += 2;
SubscriptionRequester::Parameters params = {&connection_, streamId};
SubscriptionRequester::Parameters params = {connection_, streamId};
auto automaton = new SubscriptionRequester(params);
connection_.addStream(streamId, *automaton);
connection_->addStream(streamId, *automaton);
automaton->subscribe(responseSink);
responseSink.onSubscribe(*automaton);
automaton->onNext(std::move(request));
Expand All @@ -75,15 +80,15 @@ ReactiveSocket::ReactiveSocket(
bool isServer,
std::unique_ptr<DuplexConnection> connection,
std::unique_ptr<RequestHandler> handler)
: handler_(std::move(handler)),
nextStreamId_(isServer ? 1 : 2),
connection_(
: connection_(new ConnectionAutomaton(
std::move(connection),
std::bind(
&ReactiveSocket::createResponder,
this,
std::placeholders::_1,
std::placeholders::_2)) {}
std::placeholders::_2))),
handler_(std::move(handler)),
nextStreamId_(isServer ? 1 : 2) {}

bool ReactiveSocket::createResponder(
StreamId streamId,
Expand All @@ -95,9 +100,9 @@ bool ReactiveSocket::createResponder(
if (!frame.deserializeFrom(std::move(serializedFrame))) {
return false;
}
ChannelResponder::Parameters params = {&connection_, streamId};
ChannelResponder::Parameters params = {connection_, streamId};
auto automaton = new ChannelResponder(params);
connection_.addStream(streamId, *automaton);
connection_->addStream(streamId, *automaton);
auto& requestSink =
handler_->handleRequestChannel(std::move(frame.data_), *automaton);
automaton->subscribe(requestSink);
Expand All @@ -111,9 +116,9 @@ bool ReactiveSocket::createResponder(
if (!frame.deserializeFrom(std::move(serializedFrame))) {
return false;
}
SubscriptionResponder::Parameters params = {&connection_, streamId};
SubscriptionResponder::Parameters params = {connection_, streamId};
auto automaton = new SubscriptionResponder(params);
connection_.addStream(streamId, *automaton);
connection_->addStream(streamId, *automaton);
handler_->handleRequestSubscription(std::move(frame.data_), *automaton);
automaton->onNextFrame(frame);
automaton->start();
Expand Down
4 changes: 2 additions & 2 deletions reactivesocket-cpp/src/ReactiveSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
#include <functional>
#include <memory>

#include "reactivesocket-cpp/src/ConnectionAutomaton.h"
#include "reactivesocket-cpp/src/Payload.h"
#include "reactivesocket-cpp/src/ReactiveStreamsCompat.h"

namespace lithium {
namespace reactivesocket {

class ConnectionAutomaton;
class DuplexConnection;
class RequestHandler;
enum class FrameType : uint16_t;
Expand Down Expand Up @@ -58,9 +58,9 @@ class ReactiveSocket {

bool createResponder(StreamId streamId, Payload& frame);

const std::shared_ptr<ConnectionAutomaton> connection_;
std::unique_ptr<RequestHandler> handler_;
StreamId nextStreamId_;
ConnectionAutomaton connection_;
};
}
}
24 changes: 12 additions & 12 deletions reactivesocket-cpp/src/automata/ChannelRequester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void ChannelRequesterBase::onNext(Payload request) {
// We must inform ConsumerMixin about an implicit allowance we have
// requested from the remote end.
addImplicitAllowance(initialN);
connection_.onNextFrame(frame);
connection_->onNextFrame(frame);
// Pump the remaining allowance into the ConsumerMixin _after_ sending the
// initial request.
if (remainingN) {
Expand All @@ -64,13 +64,13 @@ void ChannelRequesterBase::onComplete() {
switch (state_) {
case State::NEW:
state_ = State::CLOSED;
connection_.endStream(streamId_, StreamCompletionSignal::GRACEFUL);
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
break;
case State::REQUESTED: {
state_ = State::CLOSED;
Frame_REQUEST_CHANNEL frame(streamId_, FrameFlags_COMPLETE, 0, nullptr);
connection_.onNextFrame(frame);
connection_.endStream(streamId_, StreamCompletionSignal::GRACEFUL);
connection_->onNextFrame(frame);
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
} break;
case State::CLOSED:
break;
Expand All @@ -81,13 +81,13 @@ void ChannelRequesterBase::onError(folly::exception_wrapper ex) {
switch (state_) {
case State::NEW:
state_ = State::CLOSED;
connection_.endStream(streamId_, StreamCompletionSignal::GRACEFUL);
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
break;
case State::REQUESTED: {
state_ = State::CLOSED;
Frame_CANCEL frame(streamId_);
connection_.onNextFrame(frame);
connection_.endStream(streamId_, StreamCompletionSignal::GRACEFUL);
connection_->onNextFrame(frame);
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
} break;
case State::CLOSED:
break;
Expand Down Expand Up @@ -115,13 +115,13 @@ void ChannelRequesterBase::cancel() {
switch (state_) {
case State::NEW:
state_ = State::CLOSED;
connection_.endStream(streamId_, StreamCompletionSignal::GRACEFUL);
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
break;
case State::REQUESTED: {
state_ = State::CLOSED;
Frame_CANCEL frame(streamId_);
connection_.onNextFrame(frame);
connection_.endStream(streamId_, StreamCompletionSignal::GRACEFUL);
connection_->onNextFrame(frame);
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
} break;
case State::CLOSED:
break;
Expand Down Expand Up @@ -160,7 +160,7 @@ void ChannelRequesterBase::onNextFrame(Frame_RESPONSE& frame) {
}
Base::onNextFrame(frame);
if (end) {
connection_.endStream(streamId_, StreamCompletionSignal::GRACEFUL);
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
}
}

Expand All @@ -172,7 +172,7 @@ void ChannelRequesterBase::onNextFrame(Frame_ERROR& frame) {
break;
case State::REQUESTED:
state_ = State::CLOSED;
connection_.endStream(streamId_, StreamCompletionSignal::GRACEFUL);
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
break;
case State::CLOSED:
break;
Expand Down
16 changes: 8 additions & 8 deletions reactivesocket-cpp/src/automata/ChannelResponder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ void ChannelResponderBase::onComplete() {
case State::RESPONDING: {
state_ = State::CLOSED;
Frame_RESPONSE frame(streamId_, FrameFlags_COMPLETE, nullptr);
connection_.onNextFrame(frame);
connection_.endStream(streamId_, StreamCompletionSignal::GRACEFUL);
connection_->onNextFrame(frame);
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
} break;
case State::CLOSED:
break;
Expand All @@ -42,8 +42,8 @@ void ChannelResponderBase::onError(folly::exception_wrapper ex) {
case State::RESPONDING: {
state_ = State::CLOSED;
Frame_ERROR frame(streamId_, ErrorCode::APPLICATION_ERROR);
connection_.onNextFrame(frame);
connection_.endStream(streamId_, StreamCompletionSignal::GRACEFUL);
connection_->onNextFrame(frame);
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
} break;
case State::CLOSED:
break;
Expand All @@ -64,8 +64,8 @@ void ChannelResponderBase::cancel() {
case State::RESPONDING: {
state_ = State::CLOSED;
Frame_RESPONSE frame(streamId_, FrameFlags_COMPLETE, nullptr);
connection_.onNextFrame(frame);
connection_.endStream(streamId_, StreamCompletionSignal::GRACEFUL);
connection_->onNextFrame(frame);
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
} break;
case State::CLOSED:
break;
Expand Down Expand Up @@ -99,15 +99,15 @@ void ChannelResponderBase::onNextFrame(Frame_REQUEST_CHANNEL& frame) {
}
Base::onNextFrame(frame);
if (end) {
connection_.endStream(streamId_, StreamCompletionSignal::GRACEFUL);
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
}
}

void ChannelResponderBase::onNextFrame(Frame_CANCEL& frame) {
switch (state_) {
case State::RESPONDING:
state_ = State::CLOSED;
connection_.endStream(streamId_, StreamCompletionSignal::GRACEFUL);
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
break;
case State::CLOSED:
break;
Expand Down
12 changes: 6 additions & 6 deletions reactivesocket-cpp/src/automata/SubscriptionRequester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void SubscriptionRequesterBase::onNext(Payload request) {
// We must inform ConsumerMixin about an implicit allowance we have
// requested from the remote end.
addImplicitAllowance(initialN);
connection_.onNextFrame(frame);
connection_->onNextFrame(frame);
// Pump the remaining allowance into the ConsumerMixin _after_ sending the
// initial request.
if (remainingN) {
Expand Down Expand Up @@ -73,13 +73,13 @@ void SubscriptionRequesterBase::cancel() {
switch (state_) {
case State::NEW:
state_ = State::CLOSED;
connection_.endStream(streamId_, StreamCompletionSignal::GRACEFUL);
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
break;
case State::REQUESTED: {
state_ = State::CLOSED;
Frame_CANCEL frame(streamId_);
connection_.onNextFrame(frame);
connection_.endStream(streamId_, StreamCompletionSignal::GRACEFUL);
connection_->onNextFrame(frame);
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
} break;
case State::CLOSED:
break;
Expand Down Expand Up @@ -118,7 +118,7 @@ void SubscriptionRequesterBase::onNextFrame(Frame_RESPONSE& frame) {
}
Base::onNextFrame(frame);
if (end) {
connection_.endStream(streamId_, StreamCompletionSignal::GRACEFUL);
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
}
}

Expand All @@ -130,7 +130,7 @@ void SubscriptionRequesterBase::onNextFrame(Frame_ERROR& frame) {
break;
case State::REQUESTED:
state_ = State::CLOSED;
connection_.endStream(streamId_, StreamCompletionSignal::GRACEFUL);
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
break;
case State::CLOSED:
break;
Expand Down
Loading

0 comments on commit a68eee4

Please sign in to comment.