Skip to content

Commit

Permalink
Process external_configs after synchronization
Browse files Browse the repository at this point in the history
Summary:
If there are file changes in between two identical commands, external config values were recorded in the event log twice. This is because  events were fired in dice updater which is called by the concurrency manager in a loop in order to get the updates in cases that files on disk have changed between commands.

This diff pulls out code to outside of this loop  so that these events are fired once we acquire access to DICE transaction. This way we only record them once, not each time we check whether the state is changed in between.

This diff also resolves another issue with picking up the correct external configs if `--reuse-current-config` is set. In that case, we reuse the external config data from the previous command, so we should record that.

Reviewed By: JakobDegen

Differential Revision: D68828945

fbshipit-source-id: f3059d08af09dc3562d786d6e3ad38ef664d7adb
  • Loading branch information
ezgicicek authored and facebook-github-bot committed Jan 31, 2025
1 parent 5cecb81 commit 291001e
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 25 deletions.
7 changes: 7 additions & 0 deletions app/buck2_common/src/legacy_configs/dice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,13 @@ pub fn inject_legacy_config_for_test(
Ok(())
}

pub fn inject_external_config_for_test(
dice: &mut DiceTransactionUpdater,
) -> buck2_error::Result<()> {
dice.changed_to([(LegacyExternalBuckConfigDataKey, None)])?;
Ok(())
}

#[async_trait]
impl HasLegacyConfigs for DiceComputations<'_> {
async fn get_legacy_config_on_dice(
Expand Down
9 changes: 0 additions & 9 deletions app/buck2_server/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ use buck2_server_ctx::concurrency::DiceUpdater;
use buck2_server_ctx::ctx::DiceAccessor;
use buck2_server_ctx::ctx::PrivateStruct;
use buck2_server_ctx::ctx::ServerCommandContextTrait;
use buck2_server_ctx::experiment_util::get_experiment_tags;
use buck2_server_ctx::stderr_output_guard::StderrOutputGuard;
use buck2_server_ctx::stderr_output_guard::StderrOutputWriter;
use buck2_server_starlark_debug::create_debugger_handle;
Expand Down Expand Up @@ -441,14 +440,6 @@ impl ServerCommandContext<'_> {
.await?;

self.report_traced_config_paths(&new_configs.config_paths)?;
// Normally, this code should execute only once (hence we should fire only one BuckconfigInputValues event) but there might be an additional call once concurrent command is detected.
// Even if there is no concurrent command, we sometimes end up having two events due to a bug where concurrency manager treats many more commands as being concurrent than it's supposed to.
let components = new_configs.external_data.get_buckconfig_components();
let tags = get_experiment_tags(&components);
self.events().instant_event(buck2_data::TagEvent { tags });
self.events()
.instant_event(buck2_data::BuckconfigInputValues { components });

if self.reuse_current_config {
if dice_ctx
.is_injected_external_buckconfig_data_key_set()
Expand Down
46 changes: 33 additions & 13 deletions app/buck2_server_ctx/src/concurrency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use allocative::Allocative;
use async_condvar_fair::Condvar;
use async_trait::async_trait;
use buck2_cli_proto::client_context::PreemptibleWhen;
use buck2_common::legacy_configs::dice::HasInjectedLegacyConfigs;
use buck2_core::soft_error;
use buck2_data::DiceBlockConcurrentCommandEnd;
use buck2_data::DiceBlockConcurrentCommandStart;
Expand Down Expand Up @@ -58,6 +59,8 @@ use tokio::sync::oneshot::error::RecvError;
use tokio::sync::Mutex;
use tokio::sync::MutexGuard;

use crate::experiment_util::get_experiment_tags;

#[derive(buck2_error::Error, Debug)]
#[buck2(tag = Input)]
enum ConcurrencyHandlerError {
Expand Down Expand Up @@ -439,7 +442,7 @@ impl ConcurrencyHandler {
preempt: Some(preempt_sender),
};

let (transaction, tainted) = loop {
let (mut transaction, tainted) = loop {
match &data.dice_status {
DiceStatus::Cleanup { future, epoch } => {
tracing::debug!("ActiveDice is in cleanup");
Expand Down Expand Up @@ -587,6 +590,17 @@ impl ConcurrencyHandler {
data.previously_tainted = true;
}

if transaction
.is_injected_external_buckconfig_data_key_set()
.await?
{
let external_configs = transaction.get_injected_external_buckconfig_data().await?;
let components = external_configs.get_buckconfig_components();
event_dispatcher.instant_event(buck2_data::TagEvent {
tags: get_experiment_tags(&components),
});
event_dispatcher.instant_event(buck2_data::BuckconfigInputValues { components });
}
// create the on exit drop handler, which will take care of notifying tasks.
let drop_guard = OnExecExit::new(self.dupe(), command_id, command_data, data)?;
// This adds the task to the list of all tasks (see ::new impl)
Expand Down Expand Up @@ -787,15 +801,21 @@ mod tests {
}
}

async fn make_default_dice() -> Arc<Dice> {
let dice = Dice::builder().build(DetectCycles::Enabled);
let mut updater = dice.updater();
drop(buck2_common::legacy_configs::dice::inject_external_config_for_test(&mut updater));
updater.commit().await;
dice
}

#[tokio::test]
async fn nested_invocation_same_transaction() {
// FIXME: This times out on open source, and we don't know why
if is_open_source() {
return;
}

let dice = Dice::builder().build(DetectCycles::Enabled);

let dice = make_default_dice().await;
let concurrency = ConcurrencyHandler::new(dice);

let traces1 = TraceId::new();
Expand Down Expand Up @@ -861,7 +881,7 @@ mod tests {

#[tokio::test]
async fn nested_invocation_should_error() {
let dice = Dice::builder().build(DetectCycles::Enabled);
let dice = make_default_dice().await;

let concurrency = ConcurrencyHandler::new(dice);

Expand Down Expand Up @@ -914,7 +934,7 @@ mod tests {

#[tokio::test]
async fn parallel_invocation_same_transaction() {
let dice = Dice::builder().build(DetectCycles::Enabled);
let dice = make_default_dice().await;

let concurrency = ConcurrencyHandler::new(dice);

Expand Down Expand Up @@ -981,7 +1001,7 @@ mod tests {

#[tokio::test]
async fn parallel_invocation_different_traceid_blocks() -> buck2_error::Result<()> {
let dice = Dice::builder().build(DetectCycles::Enabled);
let dice = make_default_dice().await;

let concurrency = ConcurrencyHandler::new(dice.dupe());

Expand Down Expand Up @@ -1098,7 +1118,7 @@ mod tests {

#[tokio::test]
async fn parallel_invocation_exit_when_different_state() -> buck2_error::Result<()> {
let dice = Dice::builder().build(DetectCycles::Enabled);
let dice = make_default_dice().await;

let concurrency = ConcurrencyHandler::new(dice.dupe());

Expand Down Expand Up @@ -1220,7 +1240,7 @@ mod tests {

#[tokio::test]
async fn parallel_invocation_exit_when_preemptible() -> buck2_error::Result<()> {
let dice = Dice::builder().build(DetectCycles::Enabled);
let dice = make_default_dice().await;

let concurrency = ConcurrencyHandler::new(dice.dupe());

Expand Down Expand Up @@ -1377,7 +1397,7 @@ mod tests {

let key = &key;

let dice = Dice::builder().build(DetectCycles::Enabled);
let dice = make_default_dice().await;

let concurrency = ConcurrencyHandler::new(dice.dupe());

Expand Down Expand Up @@ -1527,7 +1547,7 @@ mod tests {

#[tokio::test]
async fn exclusive_command_lock() -> buck2_error::Result<()> {
let dice = Dice::builder().build(DetectCycles::Enabled);
let dice = make_default_dice().await;
let concurrency = ConcurrencyHandler::new(dice.dupe());
let (mut source, sink) = create_source_sink_pair();
let dispatcher = EventDispatcher::new(TraceId::new(), sink);
Expand Down Expand Up @@ -1612,7 +1632,7 @@ mod tests {

#[tokio::test]
async fn test_thundering_herd() -> buck2_error::Result<()> {
let dice = Dice::builder().build(DetectCycles::Enabled);
let dice = make_default_dice().await;

let concurrency = ConcurrencyHandler::new(dice.dupe());

Expand Down Expand Up @@ -1653,7 +1673,7 @@ mod tests {
}
}

let dice = Dice::builder().build(DetectCycles::Enabled);
let dice = make_default_dice().await;

let concurrency = ConcurrencyHandler::new(dice.dupe());

Expand Down
4 changes: 1 addition & 3 deletions tests/core/build/test_external_buckconfigs.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ async def test_external_buckconfigs(buck: Buck) -> None:
buckconfig_input_values = await filter_events(
buck, "Event", "data", "Instant", "data", "BuckconfigInputValues", "components"
)
# Currently, when there are file changes in between, we end up having two BuckconfigInputValues events.
# Will be fixed in the next diff.
assert len(buckconfig_input_values) == 2
assert len(buckconfig_input_values) == 1
external_configs = buckconfig_input_values[0]

assert len(external_configs) == 4
Expand Down

0 comments on commit 291001e

Please sign in to comment.