Skip to content

Commit

Permalink
fix: single-connection mode connection reestablishment (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zane authored Sep 10, 2024
1 parent b3d3022 commit 00d975a
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 13 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ jobs:
name: Cargo build and test
runs-on: ubuntu-latest

timeout-minutes: 10

strategy:
matrix:
features: [std, no_std]
Expand Down
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

(no changes yet)
### tacacs-plus

#### Fixed

- `Client` connections are now properly reestablished when closed at the other end ([#39](https://github.com/cPacketNetworks/tacacs-plus-rs/pull/39))

## [0.3.0] - 2024-08-29

Expand Down
8 changes: 7 additions & 1 deletion tacacs-plus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@ md-5 = "0.10.6"
uuid = { version = "1.10.0", features = ["v4"] }

[dev-dependencies]
tokio = { version = "1.39.1", features = ["rt", "net", "time", "macros"] }
tokio = { version = "1.39.1", features = [
"rt",
"net",
"time",
"macros",
"process",
] }
tokio-util = { version = "0.7.11", features = ["compat"] }
async-net = "2.0.0"
async-std = { version = "1.12.0", features = ["attributes"] }
71 changes: 69 additions & 2 deletions tacacs-plus/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ use std::fmt;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::Poll;

use byteorder::{ByteOrder, NetworkEndian};
use futures::poll;
use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tacacs_plus_protocol::{Deserialize, PacketBody, Serialize};
use tacacs_plus_protocol::{HeaderInfo, Packet, PacketFlags};

use super::ClientError;

#[cfg(test)]
mod tests;

/// A (pinned, boxed) future that returns a client connection or an error, as returned from a [`ConnectionFactory`].
///
/// This is roughly equivalent to the [`BoxFuture`](futures::future::BoxFuture) type in the `futures` crate, but without
Expand Down Expand Up @@ -119,11 +124,28 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ClientInner<S> {
Ok(conn)
}

/// Writes a packet to the underlying connection.
/// Writes a packet to the underlying connection, reconnecting if necessary.
pub(super) async fn send_packet<B: PacketBody + Serialize>(
&mut self,
packet: Packet<B>,
secret_key: Option<&[u8]>,
) -> Result<(), ClientError> {
// check if other end closed our connection, and reopen it accordingly
let connection = self.connection().await?;
if !is_connection_open(connection).await? {
self.post_session_cleanup(true).await?;
}

// send the packet after ensuring the connection is valid (or dropping
// it if it's invalid)
self._send_packet(packet, secret_key).await
}

/// Writes a packet to the underlying connection.
async fn _send_packet<B: PacketBody + Serialize>(
&mut self,
packet: Packet<B>,
secret_key: Option<&[u8]>,
) -> Result<(), ClientError> {
// allocate zero-filled buffer large enough to hold packet
let mut packet_buffer = vec![0; packet.wire_size()];
Expand Down Expand Up @@ -195,7 +217,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ClientInner<S> {
pub(super) async fn post_session_cleanup(&mut self, status_is_error: bool) -> io::Result<()> {
// close session if server doesn't agree to SINGLE_CONNECTION negotiation, or if an error occurred (since a mutex guarantees only one session is going at a time)
if !self.single_connection_established || status_is_error {
// SAFETY: ensure_connection should be called before this function, and guarantees inner.connection is non-None
// SAFETY: connection() should be called before this function, and guarantees inner.connection is non-None
let mut connection = self.connection.take().unwrap();
connection.close().await?;

Expand All @@ -212,3 +234,48 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ClientInner<S> {
Ok(())
}
}

/// Checks if the provided connection is still open on both sides.
///
/// This is accomplished by attempting to read a single byte from the connection
/// and checking for an EOF condition or specific errors (broken pipe/connection reset).
///
/// This might be overkill, but during testing I encountered a case where a write succeeded
/// and a subsequent read hung due to the connection being closed on the other side, so
/// avoiding that is preferable.
async fn is_connection_open<C>(connection: &mut C) -> io::Result<bool>
where
C: AsyncRead + Unpin,
{
// read into a 1-byte buffer, since a 0-byte buffer might return 0 besides just on EOF
let mut buffer = [0];

// poll the read future exactly once to see if anything is ready immediately
match poll!(connection.read(&mut buffer)) {
// something ready on first poll likely indicates something wrong, since we aren't
// expecting any data to actually be ready
Poll::Ready(ready) => match ready {
// read of length 0 indicates an EOF, which happens when the other side closes a TCP connection
Ok(0) => Ok(false),

Err(e) => match e.kind() {
// these errors indicate that the connection is closed, which is the exact
// situation we're trying to recover from
//
// BrokenPipe seems to be Linux-specific (?), ConnectionReset is more general though
// (checked TCP & read(2) man pages for MacOS/FreeBSD/Linux)
io::ErrorKind::BrokenPipe | io::ErrorKind::ConnectionReset => Ok(false),

// bubble up any other errors to the caller
_ => Err(e),
},

// if there's data still available, the connection is still open, although
// this shouldn't happen in the context of TACACS+
Ok(1..) => Ok(true),
},

// nothing ready to read -> connection is still open
Poll::Pending => Ok(true),
}
}
94 changes: 94 additions & 0 deletions tacacs-plus/src/inner/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use std::sync::Arc;
use std::time::Duration;

use futures::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Notify;
use tokio_util::compat::TokioAsyncReadCompatExt;

use super::is_connection_open;

async fn bind_to_port(port: u16) -> TcpListener {
TcpListener::bind(("localhost", port))
.await
.unwrap_or_else(|err| panic!("failed to bind to address localhost:{port}: {err:?}"))
}

#[tokio::test]
async fn connection_open_check() {
let notify = Arc::new(Notify::new());
let listener_notify = notify.clone();

tokio::spawn(async move {
let listener = bind_to_port(9999).await;
listener_notify.notify_one();

let (_stream, _) = listener
.accept()
.await
.expect("failed to accept connection");

// this is done to keep the stream open for the rest of the test
listener_notify.notified().await;
});

// wait for server to bind to address
notify.notified().await;

let client = TcpStream::connect(("localhost", 9999))
.await
.expect("couldn't connect to test listener");
let mut client = client.compat();

let is_open = is_connection_open(&mut client)
.await
.expect("couldn't check if connection was open");
assert!(is_open);

notify.notify_one();
}

#[tokio::test]
async fn connection_closed_check() {
let notify = Arc::new(Notify::new());
let listener_notify = notify.clone();

tokio::spawn(async move {
let listener = bind_to_port(9998).await;
listener_notify.notify_one();

let (stream, _) = listener
.accept()
.await
.expect("failed to accept connection");

let mut stream = stream.compat();

// close connection & notify main test task
stream.close().await.unwrap();

// wait for a bit before notifying main task; this caused some sporadic failures
// during testing when omitted
tokio::time::sleep(Duration::from_millis(250)).await;

// notify main task that stream is closed
listener_notify.notify_one();
});

// wait for server to bind to address
notify.notified().await;

let client = TcpStream::connect(("localhost", 9998))
.await
.expect("couldn't connect to test listener");
let mut client = client.compat();

// let server close connection
notify.notified().await;

// ensure connection is detected as closed
let is_open = is_connection_open(&mut client)
.await
.expect("couldn't check if connection was open");
assert!(!is_open);
}
60 changes: 55 additions & 5 deletions tacacs-plus/tests/pap_login.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::time::Duration;

use futures::{FutureExt, TryFutureExt};
use tokio::net::TcpStream;
use tokio_util::compat::TokioAsyncWriteCompatExt;
use tokio_util::compat::{Compat, TokioAsyncWriteCompatExt};

use tacacs_plus::Client;
use tacacs_plus::Client as TacacsClient;
use tacacs_plus::{AuthenticationType, ContextBuilder, ResponseStatus};

mod common;

type Client = TacacsClient<Compat<TcpStream>>;

#[tokio::test]
async fn pap_success() {
// NOTE: this assumes you have a TACACS+ server running already
Expand All @@ -24,10 +28,43 @@ async fn pap_success() {
Some(common::SECRET_KEY),
);

let context = ContextBuilder::new("someuser".to_owned()).build();
attempt_pap_login(&tac_client, "someuser".to_owned(), "hunter2").await;
}

let response = tac_client
.authenticate(context, "hunter2", AuthenticationType::Pap)
// this test is ignored since it takes a bit to run & requires specific actions to run alongside the test (restarting server)
#[tokio::test]
#[ignore]
async fn connection_reestablishment() {
let address = common::get_server_address();
let client = Client::new(
Box::new(move || {
TcpStream::connect(address.clone())
.map_ok(TokioAsyncWriteCompatExt::compat_write)
.boxed()
}),
Some(common::SECRET_KEY),
);

let user = String::from("paponly");
let password = "pass-word";
attempt_pap_login(&client, user.clone(), password).await;

// restart server container
if let Ok(container_name) = std::env::var("SERVER_CONTAINER") {
restart_server_container(container_name).await;
}

// sleep for a bit to allow server time to start back up
tokio::time::sleep(Duration::from_millis(500)).await;

// try logging in after server restart to ensure connection is reestablished
attempt_pap_login(&client, user, password).await;
}

async fn attempt_pap_login(client: &Client, user: String, password: &str) {
let context = ContextBuilder::new(user).build();
let response = client
.authenticate(context, password, AuthenticationType::Pap)
.await
.expect("error completing authentication session");

Expand All @@ -37,3 +74,16 @@ async fn pap_success() {
"authentication failed, full response: {response:?}"
);
}

async fn restart_server_container(name: String) {
let docker_command = std::env::var("docker").unwrap_or_else(|_| "docker".to_owned());

let status = tokio::process::Command::new(docker_command)
.args(["restart", &name])
.stdout(std::process::Stdio::null())
.status()
.await
.expect("couldn't get exit status of server container restart command");

assert!(status.success(), "bad exit status: {status}");
}
1 change: 1 addition & 0 deletions test-assets/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ id = tac_plus-ng {
device everything {
key = "very secure key that is super secret"
address = 0.0.0.0/0
single-connection = yes

script { rewrite user = emptyGuest }
}
Expand Down
12 changes: 8 additions & 4 deletions test-assets/run-client-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ set -euo pipefail

REPO_ROOT=$(git rev-parse --show-toplevel)
TMPDIR=$(mktemp -d)
docker=${docker:-docker}
export docker=${docker:-docker}
export SERVER_CONTAINER=tacacs-server

if [ ! -v CI ]; then
# build server image
Expand All @@ -30,21 +31,24 @@ test_against_server_image() {
echo "Testing against image: $image"

echo "Running server container in background"
$docker run --rm --detach --publish 5555:5555 --name tacacs-server "$image" >/dev/null
$docker run --rm --detach --publish 5555:5555 --name $SERVER_CONTAINER "$image" >/dev/null

# run all integration tests against server
# run integration tests against server
echo "Running tests..."
cargo test --package tacacs-plus --test '*' --no-fail-fast

# copy accounting file out of container
$docker cp tacacs-server:/tmp/accounting.log $TMPDIR/accounting.log
$docker cp $SERVER_CONTAINER:/tmp/accounting.log $TMPDIR/accounting.log

# verify contents of accounting file, printing if invalid
if ! $REPO_ROOT/test-assets/validate_accounting_file.py $TMPDIR/accounting.log; then
echo 'accounting file:'
cat $TMPDIR/accounting.log
return 1
fi

# test reconnection by restarting server mid test-run
cargo test --package tacacs-plus --test pap_login connection_reestablishment -- --ignored
}

trap "stop_running_containers" EXIT
Expand Down

0 comments on commit 00d975a

Please sign in to comment.