Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ExecutionContext and related conf to support multi-tenancy configurations #1924

Closed
wants to merge 1 commit into from

Conversation

mingmwang
Copy link
Contributor

Which issue does this PR close?

Closes #1862.

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Mar 4, 2022
@alamb alamb added the api change Changes the API exposed to users of the crate label Mar 6, 2022
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mingmwang -- I really like where this PR is headed 🏅

I read the datafusion parts, and skimmed the ballista parts, though realistically this PR is too large for me to carefully review. I also think the size of this PR is likely to gather significant conflicts as other parts of the code changes

To make review easier, what would you think about breaking it into several parts. Here are some ideas of chunks I thinkcould be done individually:

  1. Configuration changes for ballista (e.g. scheduler URls)
  2. Rename from ExecutionContext to SessionContext and ExecutionContextState SessionContextState
  3. Add TaskContext / TaskContextRegistry to RuntimeEnv
  4. Add SessionContextRegistry
  5. Add a Global RuntimeEnv and wire it all together

I know that this will require more work on your part but I think it will make for better code and a more thorough review over the long term.

I have a concern about adding a global singleton RuntimeEnv to DataFusion, however, the more I think about it a global default runtime makes sense but I would like to hear other opinions.

As I think this is a major structural change to some core DataFusion concepts, I think some some other maintainers and contributors should weigh in as well but won't tag them here in case you are willing to break this PR down to make it easier to review

Finally This likely conflicts with #1911 #1912 and #1913 from @yahoNanJing

😅 keep up the good work!

is_local: false,
memory_manager: MemoryManager::new(memory_manager),
disk_manager: DiskManager::try_new(disk_manager)?,
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of adding a object_store_registry, task_context_registry and session_conctext_registry to the RuntimeEnvironment -- I think having a way to track those items is a good one. 👍

pub fn batch_size(&self) -> usize {
self.batch_size
/// Return the global singleton RuntimeEnv
pub fn global() -> &'static Arc<RuntimeEnv> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned that if we have this function, it will become too easy for DataFusion code to accidentally use the global RuntimeEnv rather than the one that is passed through

I would prefer that the global runtime is put in ballista (or a separate module in DataFusion)

/// Execution runtime environment. This structure is passed to the
/// physical plans when they are run.
/// Global singleton RuntimeEnv
pub static RUNTIME_ENV: OnceCell<Arc<RuntimeEnv>> = OnceCell::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a single static runtime environment makes sense for Ballista but not for DataFusion (which gets used in a variety of usecases that a single runtime might not be applicable for)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a single static runtime environment makes sense for Ballista but not for DataFusion (which gets used in a variety of usecases that a single runtime might not be applicable for)

Hi, Alamb

Thanks a lot for taking a look. One reason that I have to use the global static runtime is make the ExecutionPlans session/configuration aware.

 pub trait ExecutionPlan: Debug + Send + Sync {

    ...................
    
    /// Return the Session id associated with the execution plan.
    fn session_id(&self) -> String;

    /// Return the Session configuration associated with the execution plan.
    fn session_config(&self) -> SessionConfig {
        let session_id = self.session_id();
        let runtime = RuntimeEnv::global();
        runtime.lookup_session_config(session_id.as_str())
    }


impl RuntimeEnv {

    ...................
    
    /// Retrieves a copied version of `SessionConfig` by session_id
    pub fn lookup_session_config(&self, session_id: &str) -> SessionConfig {
        if self.is_local() {
            // It is possible that in a local env such as in unit tests there is no
            // SessionContext created, in this case we have to return a default SessionConfig.
            self.lookup_config(session_id)
                .map_or(SessionConfig::new(), |c| c.lock().clone())
        } else if self.is_scheduler() {
            self.lookup_session(session_id)
                .expect("SessionContext doesn't exist")
                .copied_config()
        } else {
            self.config_from_task_context(session_id)
        }
 }

I think even we just use DataFusion as a lib, it is still a valid requirement to make the ExecutionPlan configuration aware. We might add more and more configuration options to enable/disable some feature during plan time and execution runtime(System like SparkSQL/Presto they have more than hundreds of configuration options). We need an extendable way to achieve this. With the single static runtime environment, I could make the ExecutionPlan configuration aware and runtime aware(it knows it runs in DataFusion local as a lib, or in the Ballista executor/scheduler) without pass-though anything.

Some of the systems might leverage the ThreadLocal facility in the program language to get the current context/configuration to avoid pass-through configurations. But In Rust we can not leverage ThreadLocal, they are so many async parts in the code paths.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree the alternative is having to pass through some sort of context (like the ExecutionContext) explicitly everywhere, which may make the code harder to read. However the upside of explicitly passing through any shared state is that there are no surprising behaviors

I think even we just use DataFusion as a lib, it is still a valid requirement to make the ExecutionPlan configuration aware. We might add more and more configuration options to enable/disable some feature during plan time and execution runtime(System like SparkSQL/Presto they have more than hundreds of configuration options).

Yes I agree having ExecutionPlan be configuration aware is important.

The idea up to this point has been to explicitly copy / plumb through this state (via Arc<ExecutionContextState>)
https://github.com/apache/arrow-datafusion/blob/7a2cbd5500c1e9447c7d71599eeccfdd5833cd4e/datafusion%2Fsrc%2Fexecution%2Fcontext.rs#L148

And the RuntimeEnv on the call to ExecutionPlan::execute

https://github.com/apache/arrow-datafusion/blob/33b9357139ad918da0f45a92db37f00ffa64b0ba/datafusion/src/physical_plan/mod.rs#L226-L230

What would you think about adding configuration options to RuntimeEnv (or something else more general)?
https://github.com/apache/arrow-datafusion/blob/641338f726549c10c5bafee34537dc1e56cdec04/datafusion/src/execution/runtime_env.rs#L35-L42

.with_target_partitions(1)
.with_batch_size(10);
let mut ctx = ExecutionContext::with_config(config);
let ctx = SessionContext::with_config(config, RuntimeEnv::global());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that the RuntimeEnv is explicitly passed here

@@ -1307,7 +1314,7 @@ mod tests {
// load expected answers from tpch-dbgen
// read csv as all strings, trim and cast to expected type as the csv string
// to value parser does not handle data with leading/trailing spaces
let mut ctx = ExecutionContext::new();
let ctx = SessionContext::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what you think about calling this function SessionContext::create_default() or something to give a hint that it will be connected to the Global runtime environment?

@yahoNanJing
Copy link
Contributor

As we regard the DataFusion as library and the Ballista as a standalone system, putting the global singleton into Ballista should be the right choice.

@mingmwang
Copy link
Contributor Author

I think the debate is whether we should have a global RuntimeEnv for DataFusion. Originally the RuntimeEnv was not a singleton. But I would say the definition of RuntimeEnv was not clear . It stood for an
execution runtime environment and the structure was passed to the physical plans when they are run. Looks like It was the task/physical plan's runtime. But the original ExecutionContextState also had a member of type RuntimeEnv(not a reference), so the RuntimeEnv also stood for DataFusion/Ballista context's runtime. So does it stand for a task level runtime or session/execution context level runtime is not clear. RuntimeEnv was originally introduced for MemoryManager and DiskManager which I think should be managed globally. After this refactoring, I think the meaning of RuntimeEnv is more clear.

@mingmwang
Copy link
Contributor Author

@yjshen @thinkharderdev

@yjshen
Copy link
Member

yjshen commented Mar 7, 2022

Sorry to join the party late! Thanks for @ me.

Initially, I made RuntimeEnv a global one for minor interface plumbing while prototyping MemoryManager. And I even have had the formal proposal before introducing the ObjectStore interface to make ObjectStore itself global. In my humble opinion, whether the Env or Store static or not is more like a programming flavor. When implementing a lib, explicitly plumbing is favored. While implementing a framework or a standalone program, you are free to use global.

However, when it comes to the multi-tenancy world, the issue changed significantly in my mind. We should prompt Runtime Isolation to among our first-class considerations. Therefore, I'm against making RuntimeEnv, especially MemoryManager be global among multiple tenants in DataFusion or Ballista. We should never have tasks from different clients with different priorities or shares treated equally in one manager. We may even need to consider physical isolation for the most crucial jobs or queries and assign exclusive executors to guarantee that no bad things happen.

I'm OK with whatever configurations are passed down because of the simple usage pattern.

Therefore, please provide another abstraction layer for resource-related or performance-critical components. We can afford to use resources less optimally, but never in a way that interferes with or even hinders each other.

@yjshen
Copy link
Member

yjshen commented Mar 7, 2022

I might be asking too much about this great initiative. Another plausible and low-hanging fruit in my mind is: multi-tenancy only inside the scheduler. By making executors one query entrance at a time, a "Mutex". We could have an easier route to achieve multi-tenancy.

Another question, just out of curiosity, is the scheduler a single point of failure in the current design? If it is, have you considered a "multi" scheduler architecture?

Thanks again for broadening Ballista's landscape!

@thinkharderdev
Copy link
Contributor

I wonder if we are missing a layer here. My intuition is that a global RuntimeEnv might be a problem and not really support real multi-tenancy but a session-level RuntimeEnv may be too fine-grained. We already have the concept of a namespace in the state configurations so does it make sense to namespace the RuntimeEnv. This could be away to support proper resource-level multi-tenancy. That is each namespace is a proper "slice" of the total cluster resources and there is some level of isolation so that one resource-hungry query by one client (in theory at least) can not use all cluster resources. It would probably be a little complicated to do dynamic registration of namespaces but should be possible in principle (and we probably don't have to tackle this right away).

I have the same question as @yjshen with respect to mutli-scheduler environments. I think that will be a key consideration for Ballista in general.

@mingmwang
Copy link
Contributor Author

I might be asking too much about this great initiative. Another plausible and low-hanging fruit in my mind is: multi-tenancy only inside the scheduler. By making executors one query entrance at a time, a "Mutex". We could have an easier route to achieve multi-tenancy.

Another question, just out of curiosity, is the scheduler a single point of failure in the current design? If it is, have you considered a "multi" scheduler architecture?

Thanks again for broadening Ballista's landscape!

Actually the PR does not touch the existing scheduler part too much. The major purpose is to refactoring the configuration and make execution plan session/configuration aware. Of cause the Ballista scheduler server need to maintain a list of running sessions and their session state currently in a global RuntimeEnv. But since Ballista Client also maintains its own local session context and session state for optimization and planning, I think if we have multiple Ballista scheduler's in the system, the current design should not make the scheduler a single point of failure.

@mingmwang
Copy link
Contributor Author

I wonder if we are missing a layer here. My intuition is that a global RuntimeEnv might be a problem and not really support real multi-tenancy but a session-level RuntimeEnv may be too fine-grained. We already have the concept of a namespace in the state configurations so does it make sense to namespace the RuntimeEnv. This could be away to support proper resource-level multi-tenancy. That is each namespace is a proper "slice" of the total cluster resources and there is some level of isolation so that one resource-hungry query by one client (in theory at least) can not use all cluster resources. It would probably be a little complicated to do dynamic registration of namespaces but should be possible in principle (and we probably don't have to tackle this right away).

I have the same question as @yjshen with respect to mutli-scheduler environments. I think that will be a key consideration for Ballista in general.

For cluster level resource isolations, I think it's the responsibility of the query scheduler and task scheduler. We should not put too much burden to the RunTimeEnv. Just make the RunTimeEnv as simple as possible.
If future, we can add user level query profile or resource profile to the session configuration, if the plan is session configuration aware, the query scheduler and task scheduler can take the resource isolations and resource requirements into consideration, no need to leverage RuntimeEnv to make decision.

@alamb
Copy link
Contributor

alamb commented Mar 7, 2022

I think another consideration is

I agree with @yjshen (and I think @thinkharderdev ) that for the datafusion (use as a library) feature, a single static RuntimeEnv is a challenge because of how many different ways it may be deployed / embedded. I can imagine people using DataFusion to build systems that attempt to build in isolation (by using separate RuntimeEnv for different subsets of users / queries) as well as systems that use other technologies (e.g. kubernetes) to implement isolation

I think passing around the RuntimeEnv is the most flexible, if annoying, approach.

I also wanted to reiterate that I liked pretty much all of the rest of this PR (like adding TaskContext, SessionContext, cleanups, etc) so perhaps we can proceed with adding in those pieces incremental while we work out the "global" vs "non global" context thing

@mingmwang
Copy link
Contributor Author

I think another consideration is

I agree with @yjshen (and I think @thinkharderdev ) that for the datafusion (use as a library) feature, a single static RuntimeEnv is a challenge because of how many different ways it may be deployed / embedded. I can imagine people using DataFusion to build systems that attempt to build in isolation (by using separate RuntimeEnv for different subsets of users / queries) as well as systems that use other technologies (e.g. kubernetes) to implement isolation

I think passing around the RuntimeEnv is the most flexible, if annoying, approach.

I also wanted to reiterate that I liked pretty much all of the rest of this PR (like adding TaskContext, SessionContext, cleanups, etc) so perhaps we can proceed with adding in those pieces incremental while we work out the "global" vs "non global" context thing

Ok, I will remove the the static singleton RuntimeEnv in DataFusion, and go with the approach to pass through the required structure in the ExecutionPlan execute() method. It can still achieve the current goal and make the plan's major execution path configuration aware. But in the same time it will lose some capabilities for example in other methods like display() or stats(), if we want to add some session configuration to control the plan display(short plan/long plan displaying etc) or stats collection(columnar stats, support histogram etc).

@yjshen
Copy link
Member

yjshen commented Mar 9, 2022

Hi @mingmwang, would you like to split this effort into several parts as @alamb suggested? This may make it easier for us to merge this huge feature gradually.

@mingmwang
Copy link
Contributor Author

@alamb @yjshen @thinkharderdev
Please help to take a look. I do not use static singleton RuntimeEnv anymore in both DataFusion and Ballista.
For Ballista, the SessionContextRegistry is managed by the Ballista SchedulerServer to support long running sessions.

I had verified that the Ballista client can correctly set the target partitions, batch size to Scheduler and propagate to Executor/Tasks. Issue #1848 was also fixed.

@yahoNanJing
Please help to take a look at the Ballista part changes.

@mingmwang
Copy link
Contributor Author

Hi @mingmwang, would you like to split this effort into several parts as @alamb suggested? This may make it easier for us to merge this huge feature gradually.

Hi @yjshen
The major changes are in the file context.rs. Most of other changed files are trivial UT fixes.

Copy link
Contributor

@thinkharderdev thinkharderdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is great, but I think we are losing some of the extensibility recently added to Ballista.

Per other discussions I agree that we want Ballista to be able to be a standalone system that can be run "out of the box" but I think we also need to allow extensibility.

Comment on lines 121 to 124
ctx: Arc<RwLock<ExecutionContext>>,
codec: BallistaCodec<T, U>,
/// DataFusion session contexts that are registered within the SchedulerServer
session_context_registry: Arc<SessionContextRegistry>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think we need to preserve some mechanism to register common extensions in the Context. The original intention of adding ExecutionContext to the SchedulerServer was to allow creating a SchedulerServer which had a customized ExecutionContext. Looks like you've dealt with custom ObjectStore implementations by moving that to RuntimeEnv but there are serveral other extension points which may be useful such as custom planners, optimizers, udf/udaf

Maybe we can add a "default" SessionContext to SessionContextRegistry so we can create new contexts from a template? That way the template context can be initialized with custom extensions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add on to that slightly, previous work allows the serialization of extension plans in Ballista, but in order to actually use them an ExtensionPlanner needs to be registered in the DefaultQueryPlanner.

// executor should be registered
assert_eq!(state.get_executors_metadata().await.unwrap().len(), 1);
Ok(())
}
}

/// A Registry holds all the datafusion session contexts
pub struct SessionContextRegistry {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here maybe we can add base_context: Option<Arc<SessionContext>>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @thinkharderdev

Yes, this is very good point. In the current implementation, the SessionContext is alway created using configurations from query settings/Ballista configurations, there is no way to configure different optimizers/planners. Maybe I can add a new struct like Base SessionContext Builder and make that an extension point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thinkharderdev Done the change, please help to take a look.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Thanks!

Copy link
Contributor

@thinkharderdev thinkharderdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work!

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mingmwang -- thank you for taking feedback and removing the global singleton.

Sadly, realistically I am not going to be able to find enough contiguous time to carefully review this PR as written. It changes too many fundamental structures to DataFusion (like ExecutionContext) in non trivial ways.

From what I can see the basic structure now looks good, but I don't feel I can evaluate the implications of this PR as a whole.

If other maintainers feel differently, I will defer to their judgement and don't oppose merging this and sorting out any fallout afterwards. However, I stand with my original suggestion to break this into smaller parts that can be evaluated separately.

…onfigurations

Do not use static singleton RuntimeEnv in both DataFusion and Ballista

Add SessionBuilder fn type to SchedulerServer to allow customized SessionContext creation

fix client BallistaContext
@mingmwang
Copy link
Contributor Author

From what I can see the basic structure now looks good, but I don't feel I ca

Hi @mingmwang -- thank you for taking feedback and removing the global singleton.

Sadly, realistically I am not going to be able to find enough contiguous time to carefully review this PR as written. It changes too many fundamental structures to DataFusion (like ExecutionContext) in non trivial ways.

From what I can see the basic structure now looks good, but I don't feel I can evaluate the implications of this PR as a whole.

If other maintainers feel differently, I will defer to their judgement and don't oppose merging this and sorting out any fallout afterwards. However, I stand with my original suggestion to break this into smaller parts that can be evaluated separately.

Hi, @alamb

Could you please guide me that who else can review this PR ?
Sorry for pushing this hard, I need to close this the early the better so that I can continue the working on other parts.

@alamb
Copy link
Contributor

alamb commented Mar 10, 2022

Could you please guide me that who else can review this PR ?

The current list of maintainers is listed at https://arrow.apache.org/committers/ -- ones who may be interested in this PR include @andygrove @Dandandan @rdettai @jimexist @yjshen and @houqp

Sorry for pushing this hard, I need to close this the early the better so that I can continue the working on other parts.

No problem -- I want to support you and you work as I think it is valuable.

If you are able to break this PR into smaller pieces, as suggested on #1924 (review) I think you'll find sufficient review capacity to get it through.

@mingmwang
Copy link
Contributor Author

mingmwang commented Mar 11, 2022

Could you please guide me that who else can review this PR ?

The current list of maintainers is listed at https://arrow.apache.org/committers/ -- ones who may be interested in this PR include @andygrove @Dandandan @rdettai @jimexist @yjshen and @houqp

Sorry for pushing this hard, I need to close this the early the better so that I can continue the working on other parts.

No problem -- I want to support you and you work as I think it is valuable.

If you are able to break this PR into smaller pieces, as suggested on #1924 (review) I think you'll find sufficient review capacity to get it through.

Hi, @alamb

Ok. I will break this PR into three parts.

  1. Rename from ExecutionContext to SessionContext and ExecutionContextState to SessionState, Add TaskContext
    and wrap the RuntimeEnv into TaskContext and pass down TaskContext into ExecutionPlan's execute() method, fix all the trivial UTs.

  2. Changes related to the SessionContext and SessionContextState themselves, move the planner/optimizer from SessionConfig to SessionState.

  3. Ballista related changes, add SessionContextRegistry, make SessionContext long running. SessionContext can created from BallistaConfig and can propagate to Executor.

@yjshen
Copy link
Member

yjshen commented Mar 16, 2022

I'm closing this since we decided to separate this monolithic PR into steps and get it merged gradually. And we've already merged the first part #1987.

@yjshen yjshen closed this Mar 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Refactor ExecutionContext and related conf to support multi-tenancy configurations.
5 participants