Skip to content

Commit

Permalink
Merge pull request bpfman#817 from dave-tucker/tc-from-image
Browse files Browse the repository at this point in the history
bpfd: Use tc dispatcher from container image
  • Loading branch information
mergify[bot] authored Nov 15, 2023
2 parents 6d8fca3 + f5378e0 commit f9c86e9
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 28 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/image-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,16 @@ jobs:
type=sha,format=long
type=raw,value=v2,enable=true
- registry: quay.io
bpf_build_wrapper: rust
repository: bpfd
image: tc-dispatcher
context: .
dockerfile: ./Containerfile.tc_dispatcher
tags: |
type=sha,format=long
type=raw,value=v1,enable=true
name: Build Image (${{ matrix.image.image }})
environment: image-repositories
steps:
Expand Down
7 changes: 7 additions & 0 deletions Containerfile.tc_dispatcher
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM scratch

COPY .output/tc_dispatcher.bpf.o dispatcher.o
LABEL io.ebpf.program_type tc
LABEL io.ebpf.filename dispatcher.o
LABEL io.ebpf.program_name tc_dispatcher
LABEL io.ebpf.bpf_function_name tc_dispatcher
59 changes: 43 additions & 16 deletions bpfd/src/multiprog/tc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@
use std::{fs, io::BufReader, mem};

use aya::{
include_bytes_aligned,
programs::{
links::FdLink,
tc::{self, SchedClassifierLink, TcOptions},
Extension, Link, SchedClassifier, TcAttachType,
},
Bpf, BpfLoader,
};
use bpfd_api::util::directories::*;
use bpfd_api::{util::directories::*, ImagePullPolicy};
use futures::stream::TryStreamExt;
use log::debug;
use netlink_packet_route::tc::Nla;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::Sender;
use tokio::sync::{mpsc::Sender, oneshot};

use super::Dispatcher;
use crate::{
Expand All @@ -29,15 +28,12 @@ use crate::{
},
dispatcher_config::TcDispatcherConfig,
errors::BpfdError,
oci_utils::image_manager::Command as ImageManagerCommand,
oci_utils::image_manager::{BytecodeImage, Command as ImageManagerCommand},
utils::should_map_be_pinned,
};

const DEFAULT_PRIORITY: u32 = 50; // Default priority for user programs in the dispatcher
const TC_DISPATCHER_PRIORITY: u16 = 50; // Default TC priority for TC Dispatcher
const DISPATCHER_PROGRAM_NAME: &str = "tc_dispatcher";

static DISPATCHER_BYTES: &[u8] = include_bytes_aligned!("../../../.output/tc_dispatcher.bpf.o");

#[derive(Debug, Serialize, Deserialize)]
pub struct TcDispatcher {
Expand All @@ -50,6 +46,7 @@ pub struct TcDispatcher {
num_extensions: usize,
#[serde(skip)]
loader: Option<Bpf>,
program_name: Option<String>,
}

impl TcDispatcher {
Expand All @@ -60,8 +57,7 @@ impl TcDispatcher {
programs: &mut [&mut Program],
revision: u32,
old_dispatcher: Option<Dispatcher>,
// TODO(astoycos) tc dispatcher should be pulled from an image
_image_manager: Sender<ImageManagerCommand>,
image_manager: Sender<ImageManagerCommand>,
) -> Result<TcDispatcher, BpfdError> {
debug!("TcDispatcher::new() for if_index {if_index}, revision {revision}");
let mut extensions: Vec<&mut TcProgram> = programs
Expand All @@ -83,15 +79,45 @@ impl TcDispatcher {
};

debug!("tc dispatcher config: {:?}", config);
let image = BytecodeImage::new(
"quay.io/bpfd/tc-dispatcher:v1".to_string(),
ImagePullPolicy::IfNotPresent as i32,
None,
None,
);
let (tx, rx) = oneshot::channel();
image_manager
.send(ImageManagerCommand::Pull {
image: image.image_url.clone(),
pull_policy: image.image_pull_policy.clone(),
username: image.username.clone(),
password: image.password.clone(),
resp: tx,
})
.await
.map_err(|e| BpfdError::BpfBytecodeError(e.into()))?;

let (path, bpf_function_name) = rx
.await
.map_err(|e| BpfdError::BpfBytecodeError(e.into()))?
.map_err(|e| BpfdError::BpfBytecodeError(e.into()))?;

let (tx, rx) = oneshot::channel();
image_manager
.send(ImageManagerCommand::GetBytecode { path, resp: tx })
.await
.map_err(|e| BpfdError::BpfBytecodeError(e.into()))?;
let program_bytes = rx
.await
.map_err(|e| BpfdError::BpfBytecodeError(e.into()))?
.map_err(BpfdError::BpfBytecodeError)?;

let mut loader = BpfLoader::new()
.set_global("CONFIG", &config, true)
.load(DISPATCHER_BYTES)?;
.load(&program_bytes)?;

let dispatcher: &mut SchedClassifier = loader
.program_mut(DISPATCHER_PROGRAM_NAME)
.unwrap()
.try_into()?;
let dispatcher: &mut SchedClassifier =
loader.program_mut(&bpf_function_name).unwrap().try_into()?;

dispatcher.load()?;

Expand All @@ -111,6 +137,7 @@ impl TcDispatcher {
priority: TC_DISPATCHER_PRIORITY,
handle: None,
loader: Some(loader),
program_name: Some(bpf_function_name),
};
dispatcher.attach_extensions(&mut extensions).await?;
dispatcher.attach(old_dispatcher).await?;
Expand Down Expand Up @@ -173,7 +200,7 @@ impl TcDispatcher {
.loader
.as_mut()
.ok_or(BpfdError::NotLoaded)?
.program_mut(DISPATCHER_PROGRAM_NAME)
.program_mut(self.program_name.clone().unwrap().as_str())
.unwrap()
.try_into()?;

Expand Down Expand Up @@ -223,7 +250,7 @@ impl TcDispatcher {
.loader
.as_mut()
.ok_or(BpfdError::NotLoaded)?
.program_mut(DISPATCHER_PROGRAM_NAME)
.program_mut(self.program_name.clone().unwrap().as_str())
.unwrap()
.try_into()?;

Expand Down
29 changes: 21 additions & 8 deletions bpfd/src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,12 @@ pub async fn serve(

let allow_unsigned = config.signing.as_ref().map_or(true, |s| s.allow_unsigned);
let (itx, irx) = mpsc::channel(32);

let mut image_manager =
ImageManager::new(BYTECODE_IMAGE_CONTENT_STORE, allow_unsigned, irx).await?;
let image_manager_handle = tokio::spawn(async move {
image_manager.run().await;
});

let mut bpf_manager = BpfManager::new(config, rx, itx);
bpf_manager.rebuild_state().await?;
Expand All @@ -101,19 +105,28 @@ pub async fn serve(
};
if csi_support {
let storage_manager = StorageManager::new(tx);

join!(
let storage_manager_handle = tokio::spawn(storage_manager.run());
let (_, res_image, res_storage, _) = join!(
join_listeners(listeners),
bpf_manager.process_commands(),
image_manager.run(),
storage_manager.run()
image_manager_handle,
storage_manager_handle,
bpf_manager.process_commands()
);
if let Some(e) = res_storage.err() {
return Err(e.into());
}
if let Some(e) = res_image.err() {
return Err(e.into());
}
} else {
join!(
let (_, res_image, _) = join!(
join_listeners(listeners),
bpf_manager.process_commands(),
image_manager.run(),
image_manager_handle,
bpf_manager.process_commands()
);
if let Some(e) = res_image.err() {
return Err(e.into());
}
}

Ok(())
Expand Down
12 changes: 8 additions & 4 deletions tests/integration-test/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,14 @@ impl Drop for ChildGuard {
/// Spawn a bpfd process
pub fn start_bpfd() -> Result<ChildGuard> {
debug!("Starting bpfd");
let bpfd_process = Command::cargo_bin("bpfd")?.spawn().map(|c| ChildGuard {
name: "bpfd",
child: c,
})?;

let bpfd_process = Command::cargo_bin("bpfd")?
.env("RUST_LOG", "bpfd=debug")
.spawn()
.map(|c| ChildGuard {
name: "bpfd",
child: c,
})?;

// Wait for up to 5 seconds for bpfd to be ready
sleep(Duration::from_millis(100));
Expand Down

0 comments on commit f9c86e9

Please sign in to comment.