diff --git a/polars/polars-lazy/polars-pipe/src/executors/operators/dummy.rs b/polars/polars-lazy/polars-pipe/src/executors/operators/dummy.rs
index 290010ea8585..069b1d2dfd61 100644
--- a/polars/polars-lazy/polars-pipe/src/executors/operators/dummy.rs
+++ b/polars/polars-lazy/polars-pipe/src/executors/operators/dummy.rs
@@ -17,4 +17,8 @@ impl Operator for Dummy {
     fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
         Box::new(Self {})
     }
+
+    fn fmt(&self) -> &str {
+        "dummy"
+    }
 }
diff --git a/polars/polars-lazy/polars-pipe/src/executors/operators/filter.rs b/polars/polars-lazy/polars-pipe/src/executors/operators/filter.rs
index bbdf36f7d9f9..f4a409521deb 100644
--- a/polars/polars-lazy/polars-pipe/src/executors/operators/filter.rs
+++ b/polars/polars-lazy/polars-pipe/src/executors/operators/filter.rs
@@ -33,4 +33,7 @@ impl Operator for FilterOperator {
     fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
         Box::new(self.clone())
     }
+    fn fmt(&self) -> &str {
+        "filter"
+    }
 }
diff --git a/polars/polars-lazy/polars-pipe/src/executors/operators/projection.rs b/polars/polars-lazy/polars-pipe/src/executors/operators/projection.rs
index 2a0fa6d4036f..2486e5a70605 100644
--- a/polars/polars-lazy/polars-pipe/src/executors/operators/projection.rs
+++ b/polars/polars-lazy/polars-pipe/src/executors/operators/projection.rs
@@ -24,6 +24,9 @@ impl Operator for FastProjectionOperator {
     fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
         Box::new(self.clone())
     }
+    fn fmt(&self) -> &str {
+        "fast_join_projection"
+    }
 }
 
 #[derive(Clone)]
@@ -49,6 +52,9 @@ impl Operator for ProjectionOperator {
     fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
         Box::new(self.clone())
     }
+    fn fmt(&self) -> &str {
+        "projection"
+    }
 }
 
 #[derive(Clone)]
@@ -79,4 +85,7 @@ impl Operator for HstackOperator {
     fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
         Box::new(self.clone())
     }
+    fn fmt(&self) -> &str {
+        "hstack"
+    }
 }
diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/aggregates/convert.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/aggregates/convert.rs
index 5976d9baf418..0684921a02f1 100644
--- a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/aggregates/convert.rs
+++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/aggregates/convert.rs
@@ -5,6 +5,7 @@ use polars_core::datatypes::Field;
 use polars_core::error::PolarsResult;
 use polars_core::prelude::{DataType, SchemaRef, Series, IDX_DTYPE};
 use polars_core::schema::Schema;
+use polars_plan::dsl::Expr;
 use polars_plan::logical_plan::{ArenaExprIter, Context};
 use polars_plan::prelude::{AAggExpr, AExpr};
 use polars_utils::arena::{Arena, Node};
@@ -30,6 +31,10 @@ impl PhysicalPipedExpr for Count {
     fn field(&self, _input_schema: &Schema) -> PolarsResult<Field> {
         todo!()
     }
+
+    fn expression(&self) -> Expr {
+        Expr::Count
+    }
 }
 
 pub fn can_convert_to_hash_agg(
diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/generic.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/generic.rs
index dabe8c4edc24..94c31dc77fb2 100644
--- a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/generic.rs
+++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/generic.rs
@@ -428,4 +428,7 @@ impl Sink for GenericGroupbySink {
     fn as_any(&mut self) -> &mut dyn Any {
         self
     }
+    fn fmt(&self) -> &str {
+        "generic_groupby"
+    }
 }
diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/primitive.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/primitive.rs
index a7c3bed68388..ab629b69428f 100644
--- a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/primitive.rs
+++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/primitive.rs
@@ -360,6 +360,9 @@ where
     fn as_any(&mut self) -> &mut dyn Any {
         self
     }
+    fn fmt(&self) -> &str {
+        "primitive_groupby"
+    }
 }
 
 fn insert_and_get<T>(
diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/string.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/string.rs
index 0a93f8764302..4e9f0bd0caf1 100644
--- a/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/string.rs
+++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/groupby/string.rs
@@ -358,6 +358,9 @@ impl Sink for Utf8GroupbySink {
     fn as_any(&mut self) -> &mut dyn Any {
         self
     }
+    fn fmt(&self) -> &str {
+        "utf8_groupby"
+    }
 }
 
 // write agg_idx to the hashes buffer.
diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/cross.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/cross.rs
index 016a06804f03..3cb68da5946e 100644
--- a/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/cross.rs
+++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/cross.rs
@@ -62,6 +62,10 @@ impl Sink for CrossJoin {
     fn as_any(&mut self) -> &mut dyn Any {
         self
     }
+
+    fn fmt(&self) -> &str {
+        "cross_join_sink"
+    }
 }
 
 #[derive(Clone)]
@@ -152,4 +156,8 @@ impl Operator for CrossJoinProbe {
     fn split(&self, _thread_no: usize) -> Box<dyn Operator> {
         Box::new(self.clone())
     }
+
+    fn fmt(&self) -> &str {
+        "cross_join_probe"
+    }
 }
diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/generic_build.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/generic_build.rs
index db3ba812409a..e62de3438ed0 100644
--- a/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/generic_build.rs
+++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/generic_build.rs
@@ -151,6 +151,7 @@ impl GenericBuild {
         chunk: &DataChunk,
     ) -> PolarsResult<&[Series]> {
         self.join_series.clear();
+
         for phys_e in self.join_columns_left.iter() {
             let s = phys_e.evaluate(chunk, context.execution_state.as_any())?;
             let s = s.to_physical_repr();
@@ -337,7 +338,7 @@ impl Sink for GenericBuild {
                 let suffix = self.suffix.clone();
                 let hb = self.hb.clone();
                 let hash_tables = Arc::new(std::mem::take(&mut self.hash_tables));
-                let join_columns_left = self.join_columns_right.clone();
+                let join_columns_left = self.join_columns_left.clone();
                 let join_columns_right = self.join_columns_right.clone();
 
                 // take the buffers, this saves one allocation
@@ -369,6 +370,9 @@ impl Sink for GenericBuild {
     fn as_any(&mut self) -> &mut dyn Any {
         self
     }
+    fn fmt(&self) -> &str {
+        "generic_join_build"
+    }
 }
 
 pub(super) struct KeysIter<'a> {
diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/inner_left.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/inner_left.rs
index 9d2cc049f97d..767360dbe957 100644
--- a/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/inner_left.rs
+++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/joins/inner_left.rs
@@ -66,15 +66,12 @@ impl GenericJoinProbe {
         hash_tables: Arc<Vec<PlIdHashMap<Key, Vec<ChunkId>>>>,
         join_columns_left: Arc<Vec<Arc<dyn PhysicalPipedExpr>>>,
         join_columns_right: Arc<Vec<Arc<dyn PhysicalPipedExpr>>>,
-        mut swapped_or_left: bool,
+        swapped_or_left: bool,
         join_series: Vec<Series>,
         hashes: Vec<u64>,
         context: &PExecutionContext,
         how: JoinType,
     ) -> Self {
-        if matches!(how, JoinType::Left) {
-            swapped_or_left = true
-        }
         if swapped_or_left {
             let tmp = DataChunk {
                 data: df_a.slice(0, 1),
@@ -329,4 +326,7 @@ impl Operator for GenericJoinProbe {
         let new = self.clone();
         Box::new(new)
     }
+    fn fmt(&self) -> &str {
+        "generic_join_probe"
+    }
 }
diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/ordered.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/ordered.rs
index fa3aad8cb084..211d84e79a37 100644
--- a/polars/polars-lazy/polars-pipe/src/executors/sinks/ordered.rs
+++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/ordered.rs
@@ -48,4 +48,7 @@ impl Sink for OrderedSink {
     fn as_any(&mut self) -> &mut dyn Any {
         self
     }
+    fn fmt(&self) -> &str {
+        "ordered_sink"
+    }
 }
diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/parquet_sink.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/parquet_sink.rs
index ef05b547a7ff..bdae5fcb75dc 100644
--- a/polars/polars-lazy/polars-pipe/src/executors/sinks/parquet_sink.rs
+++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/parquet_sink.rs
@@ -108,4 +108,7 @@ impl Sink for ParquetSink {
     fn as_any(&mut self) -> &mut dyn Any {
         self
     }
+    fn fmt(&self) -> &str {
+        "parquet_sink"
+    }
 }
diff --git a/polars/polars-lazy/polars-pipe/src/executors/sinks/slice.rs b/polars/polars-lazy/polars-pipe/src/executors/sinks/slice.rs
index 862f484250d0..0153ea222831 100644
--- a/polars/polars-lazy/polars-pipe/src/executors/sinks/slice.rs
+++ b/polars/polars-lazy/polars-pipe/src/executors/sinks/slice.rs
@@ -102,4 +102,7 @@ impl Sink for SliceSink {
     fn as_any(&mut self) -> &mut dyn Any {
         self
     }
+    fn fmt(&self) -> &str {
+        "slice_sink"
+    }
 }
diff --git a/polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs b/polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs
index 2776181b5ff2..c45473d97dfb 100644
--- a/polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs
+++ b/polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs
@@ -110,4 +110,7 @@ impl Source for CsvSource {
             ),
         })
     }
+    fn fmt(&self) -> &str {
+        "csv"
+    }
 }
diff --git a/polars/polars-lazy/polars-pipe/src/executors/sources/frame.rs b/polars/polars-lazy/polars-pipe/src/executors/sources/frame.rs
index 6628922a9d76..b8a4ff4bae20 100644
--- a/polars/polars-lazy/polars-pipe/src/executors/sources/frame.rs
+++ b/polars/polars-lazy/polars-pipe/src/executors/sources/frame.rs
@@ -39,4 +39,7 @@ impl Source for DataFrameSource {
             Ok(SourceResult::GotMoreData(chunks))
         }
     }
+    fn fmt(&self) -> &str {
+        "df"
+    }
 }
diff --git a/polars/polars-lazy/polars-pipe/src/executors/sources/parquet.rs b/polars/polars-lazy/polars-pipe/src/executors/sources/parquet.rs
index 665d6f667ac4..bb3b38adcc91 100644
--- a/polars/polars-lazy/polars-pipe/src/executors/sources/parquet.rs
+++ b/polars/polars-lazy/polars-pipe/src/executors/sources/parquet.rs
@@ -84,4 +84,7 @@ impl Source for ParquetSource {
             ),
         })
     }
+    fn fmt(&self) -> &str {
+        "parquet"
+    }
 }
diff --git a/polars/polars-lazy/polars-pipe/src/executors/sources/union.rs b/polars/polars-lazy/polars-pipe/src/executors/sources/union.rs
index 929deb92f95d..771c21f27c04 100644
--- a/polars/polars-lazy/polars-pipe/src/executors/sources/union.rs
+++ b/polars/polars-lazy/polars-pipe/src/executors/sources/union.rs
@@ -29,4 +29,7 @@ impl Source for UnionSource {
         }
         Ok(SourceResult::Finished)
     }
+    fn fmt(&self) -> &str {
+        "union"
+    }
 }
diff --git a/polars/polars-lazy/polars-pipe/src/expressions.rs b/polars/polars-lazy/polars-pipe/src/expressions.rs
index c30e2d477481..5c4a78cc896f 100644
--- a/polars/polars-lazy/polars-pipe/src/expressions.rs
+++ b/polars/polars-lazy/polars-pipe/src/expressions.rs
@@ -1,6 +1,7 @@
 use std::any::Any;
 
 use polars_core::prelude::*;
+use polars_plan::dsl::Expr;
 
 use crate::operators::DataChunk;
 
@@ -10,4 +11,6 @@ pub trait PhysicalPipedExpr: Send + Sync {
     fn evaluate(&self, chunk: &DataChunk, lazy_state: &dyn Any) -> PolarsResult<Series>;
 
     fn field(&self, input_schema: &Schema) -> PolarsResult<Field>;
+
+    fn expression(&self) -> Expr;
 }
diff --git a/polars/polars-lazy/polars-pipe/src/operators/operator.rs b/polars/polars-lazy/polars-pipe/src/operators/operator.rs
index d834e28d408c..66e5fe009a31 100644
--- a/polars/polars-lazy/polars-pipe/src/operators/operator.rs
+++ b/polars/polars-lazy/polars-pipe/src/operators/operator.rs
@@ -15,4 +15,6 @@ pub trait Operator: Send + Sync {
     ) -> PolarsResult<OperatorResult>;
 
     fn split(&self, thread_no: usize) -> Box<dyn Operator>;
+
+    fn fmt(&self) -> &str;
 }
diff --git a/polars/polars-lazy/polars-pipe/src/operators/sink.rs b/polars/polars-lazy/polars-pipe/src/operators/sink.rs
index f2337e9a3c0f..80dd48a3ce5c 100644
--- a/polars/polars-lazy/polars-pipe/src/operators/sink.rs
+++ b/polars/polars-lazy/polars-pipe/src/operators/sink.rs
@@ -23,4 +23,6 @@ pub trait Sink: Send + Sync {
     fn finalize(&mut self, context: &PExecutionContext) -> PolarsResult<FinalizedSink>;
 
     fn as_any(&mut self) -> &mut dyn Any;
+
+    fn fmt(&self) -> &str;
 }
diff --git a/polars/polars-lazy/polars-pipe/src/operators/source.rs b/polars/polars-lazy/polars-pipe/src/operators/source.rs
index a4280ec71bfb..80006347b65b 100644
--- a/polars/polars-lazy/polars-pipe/src/operators/source.rs
+++ b/polars/polars-lazy/polars-pipe/src/operators/source.rs
@@ -7,4 +7,6 @@ pub enum SourceResult {
 
 pub trait Source: Send + Sync {
     fn get_batches(&mut self, context: &PExecutionContext) -> PolarsResult<SourceResult>;
+
+    fn fmt(&self) -> &str;
 }
diff --git a/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs b/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs
index 04c5010afcc5..8035afcf3df0 100644
--- a/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs
+++ b/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs
@@ -407,8 +407,9 @@ where
 }
 
 pub fn swap_join_order(options: &JoinOptions) -> bool {
-    match (options.rows_left, options.rows_right) {
-        ((Some(left), _), (Some(right), _)) => left > right,
-        ((_, left), (_, right)) => left > right,
-    }
+    matches!(options.how, JoinType::Left)
+        || match (options.rows_left, options.rows_right) {
+            ((Some(left), _), (Some(right), _)) => left > right,
+            ((_, left), (_, right)) => left > right,
+        }
 }
diff --git a/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs b/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs
index 108268f882a2..deaa3c88a81c 100644
--- a/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs
+++ b/polars/polars-lazy/polars-pipe/src/pipeline/dispatcher.rs
@@ -1,9 +1,12 @@
+use std::collections::VecDeque;
+
 use polars_core::error::PolarsResult;
 use polars_core::frame::DataFrame;
 use polars_core::POOL;
 use polars_utils::arena::Node;
 use rayon::prelude::*;
 
+use crate::executors::operators::Dummy;
 use crate::executors::sources::DataFrameSource;
 use crate::operators::{
     DataChunk, FinalizedSink, Operator, OperatorResult, PExecutionContext, SExecutionContext, Sink,
@@ -66,12 +69,16 @@ impl PipeLine {
         self
     }
 
-    fn replace_operator(&mut self, op: &dyn Operator, node: Node) {
+    // returns if operator was successfully replaced
+    fn replace_operator(&mut self, op: &dyn Operator, node: Node) -> bool {
         if let Some(pos) = self.operator_nodes.iter().position(|n| *n == node) {
             let pos = pos + self.operator_offset;
             for (i, operator_pipe) in &mut self.operators.iter_mut().enumerate() {
                 operator_pipe[pos] = op.split(i)
             }
+            true
+        } else {
+            false
         }
     }
 
@@ -214,12 +221,46 @@ impl PipeLine {
         Ok(out.unwrap())
     }
 
+    /// print the branches of the pipeline
+    /// in the order they run.
+    #[cfg(test)]
+    fn show(&self) {
+        let mut fmt = String::new();
+        let mut start = 0usize;
+        fmt.push_str(self.sources[0].fmt());
+        fmt.push_str(" -> ");
+        for (offset_end, sink) in &self.sinks {
+            // take operators of a single thread
+            let ops = &self.operators[0];
+            // slice the pipeline
+            let ops = &ops[start..*offset_end];
+            for op in ops {
+                fmt.push_str(op.fmt());
+                fmt.push_str(" -> ")
+            }
+            start = *offset_end;
+            fmt.push_str(sink[0].fmt())
+        }
+        println!("{fmt}");
+        for pl in &self.rh_sides {
+            pl.show()
+        }
+    }
+
     pub fn execute(&mut self, state: Box<dyn SExecutionContext>) -> PolarsResult<DataFrame> {
         let ec = PExecutionContext::new(state);
         let mut sink_out = self.run_pipeline(&ec)?;
         let mut pipelines = self.rh_sides.iter_mut();
         let mut sink_nodes = std::mem::take(&mut self.sink_nodes);
 
+        // This is a stack of operators that should replace the sinks of join nodes
+        // If we don't reorder joins, the order we run the pipelines coincide with the
+        // order the sinks need to be replaced, however this is not always the case
+        // if we reorder joins.
+        // This stack ensures we still replace the dummy operators even if they are all in
+        // the most right branch
+        let mut operators_to_replace: VecDeque<(Box<dyn Operator>, Node)> = VecDeque::new();
+
         loop {
             match &mut sink_out {
                 FinalizedSink::Finished(df) => return Ok(std::mem::take(df)),
@@ -236,10 +277,33 @@ impl PipeLine {
                     // we unwrap, because the latest pipeline should not return an Operator
                     let pipeline = pipelines.next().unwrap();
 
+                    // First check the operators
+                    // keep a counter as we also push to the front of deque
+                    // otherwise we keep iterating
+                    let mut remaining = operators_to_replace.len();
+                    while let Some((op, sink_node)) = operators_to_replace.pop_back() {
+                        if !pipeline.replace_operator(op.as_ref(), sink_node) {
+                            operators_to_replace.push_front((op, sink_node))
+                        } else {
+                        }
+                        if remaining == 0 {
+                            break;
+                        }
+                        remaining -= 1;
+                    }
+
                     // latest sink_node will be the operator, as the left side of the join
                     // always finishes that branch.
                     if let Some(sink_node) = sink_nodes.pop() {
-                        pipeline.replace_operator(op.as_ref(), sink_node);
+                        // if dummy that should be replaces is not found in this branch
+                        // we push it to the operators stack that should be replaced
+                        // on the next branch of the pipeline we first check this stack.
+                        // this only happens if we reorder joins
+                        if !pipeline.replace_operator(op.as_ref(), sink_node) {
+                            let mut swap = Box::<Dummy>::default() as Box<dyn Operator>;
+                            std::mem::swap(op, &mut swap);
+                            operators_to_replace.push_back((swap, sink_node));
+                        }
                     }
                     sink_out = pipeline.run_pipeline(&ec)?;
                     sink_nodes = std::mem::take(&mut pipeline.sink_nodes);
diff --git a/polars/polars-lazy/polars-plan/src/dsl/options.rs b/polars/polars-lazy/polars-plan/src/dsl/options.rs
index f744b62705e2..07b44e3e966c 100644
--- a/polars/polars-lazy/polars-plan/src/dsl/options.rs
+++ b/polars/polars-lazy/polars-plan/src/dsl/options.rs
@@ -44,6 +44,8 @@ pub struct JoinOptions {
     pub how: JoinType,
     pub suffix: Cow<'static, str>,
     pub slice: Option<(i64, usize)>,
+    /// Proxy of the number of rows in both sides of the joins
+    /// Holds `(Option<known_size>, estimated_size)`
     pub rows_left: (Option<usize>, usize),
     pub rows_right: (Option<usize>, usize),
 }
diff --git a/polars/polars-lazy/src/physical_plan/expressions/column.rs b/polars/polars-lazy/src/physical_plan/expressions/column.rs
index de9908367eaa..39fc7cab6c33 100644
--- a/polars/polars-lazy/src/physical_plan/expressions/column.rs
+++ b/polars/polars-lazy/src/physical_plan/expressions/column.rs
@@ -61,7 +61,9 @@ impl ColumnExpr {
             // this path should not happen
             #[cfg(feature = "panic_on_schema")]
             {
-                if _state.ext_contexts.is_empty() {
+                if _state.ext_contexts.is_empty()
+                    && std::env::var("POLARS_NO_SCHEMA_CHECK").is_err()
+                {
                     panic!(
                         "got {} expected: {} from schema: {:?} and DataFrame: {:?}",
                         out.name(),
@@ -88,7 +90,10 @@ impl ColumnExpr {
     ) -> PolarsResult<Series> {
         #[cfg(feature = "panic_on_schema")]
         {
-            if _panic_during_test && _state.ext_contexts.is_empty() {
+            if _panic_during_test
+                && _state.ext_contexts.is_empty()
+                && std::env::var("POLARS_NO_SCHEMA_CHECK").is_err()
+            {
                 panic!("invalid schema")
             }
         }
diff --git a/polars/polars-lazy/src/physical_plan/streaming/convert.rs b/polars/polars-lazy/src/physical_plan/streaming/convert.rs
index 8da154dc0515..77d52c40c593 100644
--- a/polars/polars-lazy/src/physical_plan/streaming/convert.rs
+++ b/polars/polars-lazy/src/physical_plan/streaming/convert.rs
@@ -28,6 +28,10 @@ impl PhysicalPipedExpr for Wrap {
     fn field(&self, input_schema: &Schema) -> PolarsResult<Field> {
         self.0.to_field(input_schema)
     }
+
+    fn expression(&self) -> Expr {
+        self.0.as_expression().unwrap().clone()
+    }
 }
 
 fn to_physical_piped_expr(
@@ -206,12 +210,11 @@ pub(crate) fn insert_streaming_nodes(
                 // *except* for a left join. In a left join we use the right
                 // table as build table and we stream the left table. This way
                 // we maintain order in the left join.
-                let (input_left, input_right) =
-                    if swap_join_order(options) || matches!(options.how, JoinType::Left) {
-                        (input_right, input_left)
-                    } else {
-                        (input_left, input_right)
-                    };
+                let (input_left, input_right) = if swap_join_order(options) {
+                    (input_right, input_left)
+                } else {
+                    (input_left, input_right)
+                };
 
                 // rhs is second, so that is first on the stack
                 let mut state_right = state;
@@ -357,7 +360,7 @@ pub(crate) fn insert_streaming_nodes(
                                 ..
                             } = lp_arena.get(node)
                             {
-                                let slice_node = lp_arena.add(ALogicalPlan::Slice {
+                                let slice_node = lp_arena.add(Slice {
                                     input: Node::default(),
                                     offset: *offset,
                                     len: *len as IdxSize,
diff --git a/polars/polars-lazy/src/tests/streaming.rs b/polars/polars-lazy/src/tests/streaming.rs
index ee5a4df41c4d..192c21996075 100644
--- a/polars/polars-lazy/src/tests/streaming.rs
+++ b/polars/polars-lazy/src/tests/streaming.rs
@@ -230,7 +230,7 @@ fn test_streaming_left_join() -> PolarsResult<()> {
 
     let q = lf_left.left_join(lf_right, col("a"), col("a"));
 
-    let out1 = q.clone().with_streaming(false).collect()?;
+    let out1 = q.clone().with_streaming(true).collect()?;
     let out2 = q.clone().with_streaming(false).collect()?;
     assert!(out1.frame_equal_missing(&out2));
 
@@ -308,3 +308,48 @@ fn test_streaming_aggregate_join() -> PolarsResult<()> {
     assert_eq!(out_streaming.shape(), (3, 3));
     Ok(())
 }
+
+#[test]
+fn test_streaming_double_left_join() -> PolarsResult<()> {
+    // A left join swaps the tables, so that checks the swapping of the branches
+    let q1 = df![
+        "id" => ["1a"],
+        "p_id" => ["1b"],
+        "m_id" => ["1c"],
+    ]?
+    .lazy();
+
+    let q2 = df![
+        "p_id2" => ["2a"],
+        "p_code" => ["2b"],
+    ]?
+    .lazy();
+
+    let q3 = df![
+        "m_id3" => ["3a"],
+        "m_code" => ["3b"],
+    ]?
+    .lazy();
+
+    let q = q1
+        .clone()
+        .left_join(q2, col("p_id"), col("p_id2"))
+        .left_join(q3.clone(), col("m_id"), col("m_id3"));
+
+    let out = q.clone().with_streaming(true).collect()?;
+    let expected = q.clone().with_streaming(false).collect()?;
+
+    assert_eq!(out, expected);
+
+    // more joins
+    let q = q
+        .left_join(q1, col("p_id"), col("p_id"))
+        .left_join(q3, col("m_id"), col("m_id3"));
+
+    let out = q.clone().with_streaming(true).collect()?;
+    let expected = q.clone().with_streaming(false).collect()?;
+
+    assert_eq!(out, expected);
+
+    Ok(())
+}
diff --git a/py-polars/Cargo.lock b/py-polars/Cargo.lock
index 07b777ae473d..a4fc0e626045 100644
--- a/py-polars/Cargo.lock
+++ b/py-polars/Cargo.lock
@@ -1718,7 +1718,7 @@ dependencies = [
 
 [[package]]
 name = "py-polars"
-version = "0.15.13"
+version = "0.15.14"
 dependencies = [
  "ahash",
  "bincode",