Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TCP client API changes for 0.5.0 #317

Closed
NiteshKant opened this issue Jan 21, 2015 · 1 comment
Closed

TCP client API changes for 0.5.0 #317

NiteshKant opened this issue Jan 21, 2015 · 1 comment
Assignees
Milestone

Comments

@NiteshKant
Copy link
Member

This issue contains the API changes to TCP client based on the issues #280 #281

Tcp Client

This client does not extend RxClient, I am preferring delegation instead of inheritance for the clients, so all clients (HTTP/UDP/TCP) will be standalone.
All mutations to this client, creates a new instance.

/**
 * A TCP client for creating TCP connections.
 *
 * <h2>Immutability</h2>
 * An instance of this client is immutable and all mutations produce a new client instance. For this reason it is
 * recommended that the mutations are done during client creation and not during connection creation to avoid repeated
 * object creation overhead.
 *
 * @param <I> The type of objects written to this client.
 * @param <O> The type of objects read from this client.
 */
public abstract class TcpClient<I, O> {

    /**
     * Creates a new {@link ConnectionRequest} which should be subscribed to actually connect to the target server.
     *
     * @return A new {@link ConnectionRequest} which either can be subscribed directly or altered in various ways
     * before subscription.
     */
    public abstract ConnectionRequest<I, O> createConnectionRequest();

    /**
     * Creates a new {@link ConnectionRequest} which should be subscribed to actually connect to the target server.
     * This method overrides the default host and port configured for this client.
     *
     * @param host Target host to connect.
     * @param port Port on the host to connect.
     *
     * @return A new {@link ConnectionRequest} which either can be subscribed directly or altered in various ways
     * before subscription.
     */
    public abstract ConnectionRequest<I, O> createConnectionRequest(String host, int port);

    /**
     * Creates a new client instances, inheriting all configurations from this client and adding a
     * {@link ChannelOption} for the connections created by the newly created client instance.
     *
     * @param option Option to add.
     * @param value Value for the option.
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract <T> TcpClient<I, O> channelOption(ChannelOption<T> option, T value);

    /**
     * Adds a {@link ChannelHandler} to {@link ChannelPipeline} for all connections created by this client. The specified
     * handler is added at the first position of the pipeline as specified by
     * {@link ChannelPipeline#addFirst(String, ChannelHandler)}
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param name Name of the handler.
     * @param handler Handler instance to add.
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract <II, OO> TcpClient<II, OO> addChannelHandlerFirst(String name, ChannelHandler handler);

    /**
     * Adds a {@link ChannelHandler} to {@link ChannelPipeline} for all connections created by this client. The specified
     * handler is added at the first position of the pipeline as specified by
     * {@link ChannelPipeline#addFirst(EventExecutorGroup, String, ChannelHandler)}
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param group   the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
     *                 methods
     * @param name     the name of the handler to append
     * @param handler  the handler to append
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract <II, OO> TcpClient<II, OO> addChannelHandlerFirst(EventExecutorGroup group, String name,
                                                                      ChannelHandler handler);

    /**
     * Adds a {@link ChannelHandler} to {@link ChannelPipeline} for all connections created by this client. The specified
     * handler is added at the last position of the pipeline as specified by
     * {@link ChannelPipeline#addLast(String, ChannelHandler)}
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param name Name of the handler.
     * @param handler Handler instance to add.
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract <II, OO> TcpClient<II, OO>  addChannelHandlerLast(String name, ChannelHandler handler);

    /**
     * Adds a {@link ChannelHandler} to {@link ChannelPipeline} for all connections created by this client. The specified
     * handler is added at the last position of the pipeline as specified by
     * {@link ChannelPipeline#addLast(EventExecutorGroup, String, ChannelHandler)}
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param group   the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
     *                 methods
     * @param name     the name of the handler to append
     * @param handler  the handler to append
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract <II, OO> TcpClient<II, OO> addChannelHandlerLast(EventExecutorGroup group, String name,
                                                                     ChannelHandler handler);

    /**
     * Adds a {@link ChannelHandler} to {@link ChannelPipeline} for all connections created by this client. The specified
     * handler is added before an existing handler with the passed {@code baseName} in the pipeline as specified by
     * {@link ChannelPipeline#addBefore(String, String, ChannelHandler)}
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param baseName  the name of the existing handler
     * @param name Name of the handler.
     * @param handler Handler instance to add.
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract <II, OO> TcpClient<II, OO> addChannelHandlerBefore(String baseName, String name,
                                                                       ChannelHandler handler);

    /**
     * Adds a {@link ChannelHandler} to {@link ChannelPipeline} for all connections created by this client. The specified
     * handler is added before an existing handler with the passed {@code baseName} in the pipeline as specified by
     * {@link ChannelPipeline#addBefore(EventExecutorGroup, String, String, ChannelHandler)}
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param group   the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
     *                 methods
     * @param baseName  the name of the existing handler
     * @param name     the name of the handler to append
     * @param handler  the handler to append
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract <II, OO> TcpClient<II, OO> addChannelHandlerBefore(EventExecutorGroup group, String baseName,
                                                                       String name, ChannelHandler handler);

    /**
     * Adds a {@link ChannelHandler} to {@link ChannelPipeline} for all connections created by this client. The specified
     * handler is added after an existing handler with the passed {@code baseName} in the pipeline as specified by
     * {@link ChannelPipeline#addAfter(String, String, ChannelHandler)}
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param baseName  the name of the existing handler
     * @param name Name of the handler.
     * @param handler Handler instance to add.
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract <II, OO> TcpClient<II, OO> addChannelHandlerAfter(String baseName, String name,
                                                                      ChannelHandler handler);

    /**
     * Adds a {@link ChannelHandler} to {@link ChannelPipeline} for all connections created by this client. The specified
     * handler is added after an existing handler with the passed {@code baseName} in the pipeline as specified by
     * {@link ChannelPipeline#addAfter(EventExecutorGroup, String, String, ChannelHandler)}
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param group   the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
     *                 methods
     * @param baseName  the name of the existing handler
     * @param name     the name of the handler to append
     * @param handler  the handler to append
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract <II, OO> TcpClient<II, OO> addChannelHandlerAfter(EventExecutorGroup group, String baseName,
                                                                      String name, ChannelHandler handler);

    /**
     * Removes the {@link ChannelHandler} with the passed {@code name} from the {@link ChannelPipeline} for all
     * connections created by this client.
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param name Name of the handler.
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract <II, OO> TcpClient<II, OO> removeHandler(String name);

    /**
     * Creates a new client instances, inheriting all configurations from this client and using the passed
     * action to configure all the connections created by the newly created client instance.
     *
     * @param pipelineConfigurator Action to configure {@link ChannelPipeline}.
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract <II, OO> TcpClient<II, OO> pipelineConfigurator(Action1<ChannelPipeline> pipelineConfigurator);

    /**
     * Creates a new client instances, inheriting all configurations from this client and using the passed
     * eventLoopGroup for all the connections created by the newly created client instance.
     *
     * @param eventLoopGroup {@link EventLoopGroup} to use.
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract TcpClient<I, O> eventLoop(EventLoopGroup eventLoopGroup);

    /**
     * Creates a new client instances, inheriting all configurations from this client and using the passed
     * {@code maxConnections} as the maximum number of concurrent connections created by the newly created client instance.
     *
     * @param maxConnections Maximum number of concurrent connections to be created by this client.
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract TcpClient<I, O> maxConnections(int maxConnections);

    /**
     * Creates a new client instances, inheriting all configurations from this client and using the passed
     * {@code idleConnectionsTimeoutMillis} as the time elapsed before an idle connections will be closed by the newly
     * created client instance.
     *
     * @param idleConnectionsTimeoutMillis Time elapsed before an idle connections will be closed by the newly
     * created client instance
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract TcpClient<I, O> withIdleConnectionsTimeoutMillis(long idleConnectionsTimeoutMillis);

    /**
     * Creates a new client instances, inheriting all configurations from this client and using the passed
     * {@code limitDeterminationStrategy} as the strategy to control the maximum concurrent connections created by the
     * newly created client instance.
     *
     * @param limitDeterminationStrategy Strategy to control the maximum concurrent connections created by the
     * newly created client instance.
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract TcpClient<I, O> withConnectionPoolLimitStrategy(PoolLimitDeterminationStrategy limitDeterminationStrategy);

    /**
     * Creates a new client instances, inheriting all configurations from this client and using the passed
     * {@code poolIdleCleanupScheduler} for detecting and cleaning idle connections by the newly created client instance.
     *
     * @param poolIdleCleanupScheduler Scheduled to schedule idle connections cleanup.
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract TcpClient<I, O> withPoolIdleCleanupScheduler(ScheduledExecutorService poolIdleCleanupScheduler);

    /**
     * Creates a new client instances, inheriting all configurations from this client and disabling idle connection
     * cleanup for the newly created client instance.
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract TcpClient<I, O> withNoIdleConnectionCleanup();

    /**
     * Creates a new client instances, inheriting all configurations from this client and disabling connection
     * pooling for the newly created client instance.
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract TcpClient<I, O> withNoConnectionPooling();

    /**
     * Creates a new client instances, inheriting all configurations from this client and enabling wire logging at the
     * passed level for the newly created client instance.
     *
     * @param wireLoggingLevel Logging level at which the wire logs will be logged. The wire logging will only be done if
     *                         logging is enabled at this level for {@link LoggingHandler}
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract TcpClient<I, O> enableWireLogging(LogLevel wireLoggingLevel);

    /**
     * Creates a new client instances, inheriting all configurations from this client and using the passed
     * {@code sslEngineFactory} for all secured connections created by the newly created client instance.
     *
     * @param sslEngineFactory {@link SSLEngineFactory} for all secured connections created by the newly created client
     *                                                 instance.
     *
     * @return A new {@link TcpClient} instance.
     */
    public abstract TcpClient<I, O> withSslEngineFactory(SSLEngineFactory sslEngineFactory);
}

Connection Request

This folds ClientConfig and ChannelPipeline methods into the request.
Every mutation creates a new instance of the request. In order to do multiple mutations, the cost of creating intermediary request instances can be avoided by using ConnectionRequestUpdater, created from this ConnectionRequest using newUpdater() method.

Every subscription to this request, returns one connection.

/**
 * A TCP connection request created via {@link TcpClient#createConnectionRequest()}.
 *
 * <h2>Mutations</h2>
 *
 * All mutations to this request that creates a brand new instance.
 *
 * <h2>Optimizing multiple mutations</h2>
 *
 * A connection creation may include multiple mutations on a {@link ConnectionRequest}. These mutations will create
 * as many objects of {@link ConnectionRequest} and hence create unnecessary garbage. In order to remove this
 * memory overhead, this class provides a {@link ConnectionRequestUpdater} which can be obtained via
 * {@link #newUpdater()}.
 * There is no semantic difference between these two approaches of mutations, this approach, optimizes for lesser
 * object creation.
 *
 * <h2> Inititating connections</h2>
 *
 * A new connection is initiated every time {@link ConnectionRequest#subscribe()} is called and is the only way of
 * creating connections.
 *
 * @param <I> The type of the objects that are read from this connection.
 * @param <O> The type of objects that are written to this connection.
 */
public abstract class ConnectionRequest<I, O> extends Observable<ObservableConnection<I, O>> {

    protected ConnectionRequest(OnSubscribe<ObservableConnection<I, O>> f) {
        super(f);
    }

    /**
     * Enables read timeout for all the connection created by this request.
     *
     * @param timeOut Read timeout duration.
     * @param timeUnit Read timeout time unit.
     *
     * @return A new instance of the {@link ConnectionRequest} sharing all existing state from this request.
     * Use {@link #newUpdater()} if you intend to do multiple mutations to this request, to avoid creating unused
     * intermediary {@link ConnectionRequest} objects.
     */
    public abstract ConnectionRequest<I, O> readTimeOut(int timeOut, TimeUnit timeUnit);

    /**
     * Creates a new client instances, inheriting all configurations from this client and enabling wire logging at the
     * passed level for the newly created client instance.
     *
     * @param wireLogginLevel Logging level at which the wire logs will be logged. The wire logging will only be done if
     *                        logging is enabled at this level for {@link io.netty.handler.logging.LoggingHandler}
     *
     * @return A new instance of the {@link ConnectionRequest} sharing all existing state from this request.
     * Use {@link #newUpdater()} if you intend to do multiple mutations to this request, to avoid creating unused
     * intermediary {@link ConnectionRequest} objects.
     */
    public abstract ConnectionRequest<I, O> enableWireLogging(LogLevel wireLogginLevel);

    /**
     * Creates a new client instances, inheriting all configurations from this client and using the passed
     * {@code sslEngineFactory} for all secured connections created by the newly created client instance.
     *
     * @param sslEngineFactory {@link SSLEngineFactory} for all secured connections created by the newly created client
     *                                                 instance.
     *
     * @return A new instance of the {@link ConnectionRequest} sharing all existing state from this request.
     * Use {@link #newUpdater()} if you intend to do multiple mutations to this request, to avoid creating unused
     * intermediary {@link ConnectionRequest} objects.
     */
    public abstract ConnectionRequest<I, O> sslEngineFactory(SSLEngineFactory sslEngineFactory);

    /**
     * Adds a {@link ChannelHandler} to {@link ChannelPipeline} for the connections created by
     * this request. The specified handler is added at the first position of the pipeline as specified by
     * {@link ChannelPipeline#addFirst(String, ChannelHandler)}
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param name Name of the handler.
     * @param handler Handler instance to add.
     *
     * @return A new instance of the {@link ConnectionRequest} sharing all existing state from this request.
     * Use {@link #newUpdater()} if you intend to do multiple mutations to this request, to avoid creating unused
     * intermediary {@link ConnectionRequest} objects.
     */
    public abstract <II, OO> ConnectionRequest<II, OO> addChannelHandlerFirst(String name, ChannelHandler handler);

    /**
     * Adds a {@link ChannelHandler} to {@link ChannelPipeline} for the connections created by this request. The specified
     * handler is added at the first position of the pipeline as specified by
     * {@link ChannelPipeline#addFirst(EventExecutorGroup, String, ChannelHandler)}
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param group   the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
     *                 methods
     * @param name     the name of the handler to append
     * @param handler  the handler to append
     *
     * @return A new instance of the {@link ConnectionRequest} sharing all existing state from this request.
     * Use {@link #newUpdater()} if you intend to do multiple mutations to this request, to avoid creating unused
     * intermediary {@link ConnectionRequest} objects.
     */
    public abstract <II, OO> ConnectionRequest<II, OO> addChannelHandlerFirst(EventExecutorGroup group,
                                                                              String name,
                                                                              ChannelHandler handler);

    /**
     * Adds a {@link ChannelHandler} to {@link ChannelPipeline} for the connections created by this request. The specified
     * handler is added at the last position of the pipeline as specified by
     * {@link ChannelPipeline#addLast(String, ChannelHandler)}
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param name Name of the handler.
     * @param handler Handler instance to add.
     *
     * @return A new instance of the {@link ConnectionRequest} sharing all existing state from this request.
     * Use {@link #newUpdater()} if you intend to do multiple mutations to this request, to avoid creating unused
     * intermediary {@link ConnectionRequest} objects.
     */
    public abstract <II, OO> ConnectionRequest<II, OO>  addChannelHandlerLast(String name, ChannelHandler handler);

    /**
     * Adds a {@link ChannelHandler} to {@link ChannelPipeline} for the connections created by this request. The specified
     * handler is added at the last position of the pipeline as specified by
     * {@link ChannelPipeline#addLast(EventExecutorGroup, String, ChannelHandler)}
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param group   the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
     *                 methods
     * @param name     the name of the handler to append
     * @param handler  the handler to append
     *
     * @return A new instance of the {@link ConnectionRequest} sharing all existing state from this request.
     * Use {@link #newUpdater()} if you intend to do multiple mutations to this request, to avoid creating unused
     * intermediary {@link ConnectionRequest} objects.
     */
    public abstract <II, OO> ConnectionRequest<II, OO> addChannelHandlerLast(EventExecutorGroup group, String name,
                                                                             ChannelHandler handler);

    /**
     * Adds a {@link ChannelHandler} to {@link ChannelPipeline} for the connections created by this request. The specified
     * handler is added before an existing handler with the passed {@code baseName} in the pipeline as specified by
     * {@link ChannelPipeline#addBefore(String, String, ChannelHandler)}
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param baseName  the name of the existing handler
     * @param name Name of the handler.
     * @param handler Handler instance to add.
     *
     * @return A new instance of the {@link ConnectionRequest} sharing all existing state from this request.
     * Use {@link #newUpdater()} if you intend to do multiple mutations to this request, to avoid creating unused
     * intermediary {@link ConnectionRequest} objects.
     */
    public abstract <II, OO> ConnectionRequest<II, OO> addChannelHandlerBefore(String baseName, String name,
                                                                               ChannelHandler handler);

    /**
     * Adds a {@link ChannelHandler} to {@link ChannelPipeline} for the connections created by this request. The specified
     * handler is added before an existing handler with the passed {@code baseName} in the pipeline as specified by
     * {@link ChannelPipeline#addBefore(EventExecutorGroup, String, String, ChannelHandler)}
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param group   the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
     *                 methods
     * @param baseName  the name of the existing handler
     * @param name     the name of the handler to append
     * @param handler  the handler to append
     *
     * @return A new instance of the {@link ConnectionRequest} sharing all existing state from this request.
     * Use {@link #newUpdater()} if you intend to do multiple mutations to this request, to avoid creating unused
     * intermediary {@link ConnectionRequest} objects.
     */
    public abstract <II, OO> ConnectionRequest<II, OO> addChannelHandlerBefore(EventExecutorGroup group,
                                                                               String baseName,
                                                                               String name, ChannelHandler handler);

    /**
     * Adds a {@link ChannelHandler} to {@link ChannelPipeline} for the connections created by this request. The specified
     * handler is added after an existing handler with the passed {@code baseName} in the pipeline as specified by
     * {@link ChannelPipeline#addAfter(String, String, ChannelHandler)}
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param baseName  the name of the existing handler
     * @param name Name of the handler.
     * @param handler Handler instance to add.
     *
     * @return A new instance of the {@link ConnectionRequest} sharing all existing state from this request.
     * Use {@link #newUpdater()} if you intend to do multiple mutations to this request, to avoid creating unused
     * intermediary {@link ConnectionRequest} objects.
     */
    public abstract <II, OO> ConnectionRequest<II, OO> addChannelHandlerAfter(String baseName, String name,
                                                                              ChannelHandler handler);

    /**
     * Adds a {@link ChannelHandler} to {@link ChannelPipeline} for the connections created by this request. The specified
     * handler is added after an existing handler with the passed {@code baseName} in the pipeline as specified by
     * {@link ChannelPipeline#addAfter(EventExecutorGroup, String, String, ChannelHandler)}
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param group   the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
     *                 methods
     * @param baseName  the name of the existing handler
     * @param name     the name of the handler to append
     * @param handler  the handler to append
     *
     * @return A new instance of the {@link ConnectionRequest} sharing all existing state from this request.
     * Use {@link #newUpdater()} if you intend to do multiple mutations to this request, to avoid creating unused
     * intermediary {@link ConnectionRequest} objects.
     */
    public abstract <II, OO> ConnectionRequest<II, OO> addChannelHandlerAfter(EventExecutorGroup group,
                                                                              String baseName,
                                                                              String name,
                                                                              ChannelHandler handler);
    /**
     * Removes the {@link ChannelHandler} with the passed {@code name} from the {@link ChannelPipeline} for all
     * connections created by this request.
     *
     * <em>For better flexibility of pipeline modification, the method {@link #pipelineConfigurator(Action1)} will be more
     * convenient.</em>
     *
     * @param name Name of the handler.
     *
     * @return A new instance of the {@link ConnectionRequest} sharing all existing state from this request.
     * Use {@link #newUpdater()} if you intend to do multiple mutations to this request, to avoid creating unused
     * intermediary {@link ConnectionRequest} objects.
     */
    public abstract <II, OO> ConnectionRequest<II, OO> removeHandler(String name);

    /**
     * Creates a new client instances, inheriting all configurations from this client and using the passed
     * action to configure all the connections created by the newly created client instance.
     *
     * @param pipelineConfigurator Action to configure {@link ChannelPipeline}.
     *
     * @return A new instance of the {@link ConnectionRequest} sharing all existing state from this request.
     * Use {@link #newUpdater()} if you intend to do multiple mutations to this request, to avoid creating unused
     * intermediary {@link ConnectionRequest} objects.
     */
    public abstract <II, OO> ConnectionRequest<II, OO> pipelineConfigurator(Action1<ChannelPipeline> pipelineConfigurator);

    public abstract ConnectionRequestUpdater<I, O> newUpdater();
}

Examples

Infinite stream read
        RxNetty.newTcpClient("localhost", 8080)
               .createConnectionRequest()
              .flatMap(ObservableConnection::getInput)
              .toBlocking()
              .forEach(byteBuf -> System.out.println(byteBuf.toString(Charset.defaultCharset())));
Write and Read infinite stream
        RxNetty.newTcpClient("localhost", 8080)
              .createConnectionRequest()
              .flatMap(connection -> connection.writeStringAndFlush("Hello")
                                               .cast(ByteBuf.class)
                                               .concatWith(connection.getInput()))
              .toBlocking()
              .forEach(byteBuf -> System.out.println(byteBuf.toString(Charset.defaultCharset())));
Add ChannelHandler
      RxNetty.newTcpClient("localhost", 8080)
              .createConnectionRequest()
              .<ByteBuf, String>addChannelHandlerFirst("string-decoder", new SimpleChannelInboundHandler<ByteBuf>() {
                  @Override
                  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                      ctx.fireChannelRead(msg.toString(Charset.defaultCharset()));
                  }
              })
              .flatMap(ObservableConnection::getInput)
              .toBlocking()
              .forEach(System.out::println);
@NiteshKant
Copy link
Member Author

This is available on 0.5.x branch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant