From a829d2477dbba65cf2ab42fcc9663079cc210205 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Tamarelle?=
 <jerome.tamarelle@mongodb.com>
Date: Thu, 12 Sep 2024 10:20:32 +0200
Subject: [PATCH] PHPLIB-1419 Encode Agg builder objects in Collection methods
 (#1383)

---
 psalm-baseline.xml                            | 34 ++++++++++
 src/Client.php                                |  7 +++
 src/Collection.php                            | 13 ++++
 src/Database.php                              | 13 ++++
 src/functions.php                             | 22 +++++++
 tests/ClientFunctionalTest.php                | 25 ++++++++
 .../BuilderCollectionFunctionalTest.php       | 34 +++++++++-
 .../BuilderDatabaseFunctionalTest.php         | 63 +++++++++++++++++++
 tests/FunctionsTest.php                       | 18 ++++++
 9 files changed, 227 insertions(+), 2 deletions(-)
 create mode 100644 tests/Database/BuilderDatabaseFunctionalTest.php

diff --git a/psalm-baseline.xml b/psalm-baseline.xml
index 732325e23..7e99f32d6 100644
--- a/psalm-baseline.xml
+++ b/psalm-baseline.xml
@@ -191,6 +191,7 @@
   <file src="src/Client.php">
     <MixedArgument>
       <code><![CDATA[$driverOptions['driver'] ?? []]]></code>
+      <code><![CDATA[$pipeline]]></code>
     </MixedArgument>
     <MixedAssignment>
       <code><![CDATA[$mergedDriver['platform']]]></code>
@@ -198,6 +199,12 @@
     <MixedPropertyTypeCoercion>
       <code><![CDATA[$driverOptions['builderEncoder'] ?? new BuilderEncoder()]]></code>
     </MixedPropertyTypeCoercion>
+    <NamedArgumentNotAllowed>
+      <code><![CDATA[$pipeline]]></code>
+    </NamedArgumentNotAllowed>
+    <PossiblyInvalidArgument>
+      <code><![CDATA[$pipeline]]></code>
+    </PossiblyInvalidArgument>
   </file>
   <file src="src/Codec/EncodeIfSupported.php">
     <ArgumentTypeCoercion>
@@ -220,9 +227,22 @@
     </MixedArgumentTypeCoercion>
   </file>
   <file src="src/Collection.php">
+    <MixedArgument>
+      <code><![CDATA[$pipeline]]></code>
+      <code><![CDATA[$pipeline]]></code>
+    </MixedArgument>
     <MixedPropertyTypeCoercion>
       <code><![CDATA[$options['builderEncoder'] ?? new BuilderEncoder()]]></code>
     </MixedPropertyTypeCoercion>
+    <NamedArgumentNotAllowed>
+      <code><![CDATA[$pipeline]]></code>
+      <code><![CDATA[$pipeline]]></code>
+    </NamedArgumentNotAllowed>
+    <PossiblyInvalidArgument>
+      <code><![CDATA[$pipeline]]></code>
+      <code><![CDATA[$pipeline]]></code>
+      <code><![CDATA[$pipeline]]></code>
+    </PossiblyInvalidArgument>
   </file>
   <file src="src/Command/ListCollections.php">
     <MixedAssignment>
@@ -237,9 +257,22 @@
     </MixedAssignment>
   </file>
   <file src="src/Database.php">
+    <MixedArgument>
+      <code><![CDATA[$pipeline]]></code>
+      <code><![CDATA[$pipeline]]></code>
+    </MixedArgument>
     <MixedPropertyTypeCoercion>
       <code><![CDATA[$options['builderEncoder'] ?? new BuilderEncoder()]]></code>
     </MixedPropertyTypeCoercion>
+    <NamedArgumentNotAllowed>
+      <code><![CDATA[$pipeline]]></code>
+      <code><![CDATA[$pipeline]]></code>
+    </NamedArgumentNotAllowed>
+    <PossiblyInvalidArgument>
+      <code><![CDATA[$pipeline]]></code>
+      <code><![CDATA[$pipeline]]></code>
+      <code><![CDATA[$pipeline]]></code>
+    </PossiblyInvalidArgument>
   </file>
   <file src="src/GridFS/Bucket.php">
     <MixedArgument>
@@ -854,6 +887,7 @@
     <MixedAssignment>
       <code><![CDATA[$element[$key]]]></code>
       <code><![CDATA[$stage]]></code>
+      <code><![CDATA[$stage]]></code>
       <code><![CDATA[$type]]></code>
       <code><![CDATA[$typeMap['fieldPaths'][$fieldPath . '.' . $existingFieldPath]]]></code>
       <code><![CDATA[$typeMap['fieldPaths'][$fieldPath]]]></code>
diff --git a/src/Client.php b/src/Client.php
index 5b02aa44c..2e161707b 100644
--- a/src/Client.php
+++ b/src/Client.php
@@ -22,6 +22,7 @@
 use MongoDB\BSON\Document;
 use MongoDB\BSON\PackedArray;
 use MongoDB\Builder\BuilderEncoder;
+use MongoDB\Builder\Pipeline;
 use MongoDB\Codec\Encoder;
 use MongoDB\Driver\ClientEncryption;
 use MongoDB\Driver\Exception\InvalidArgumentException as DriverInvalidArgumentException;
@@ -391,6 +392,12 @@ public function startSession(array $options = [])
      */
     public function watch(array $pipeline = [], array $options = [])
     {
+        if (is_builder_pipeline($pipeline)) {
+            $pipeline = new Pipeline(...$pipeline);
+        }
+
+        $pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
+
         if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
             $options['readPreference'] = $this->readPreference;
         }
diff --git a/src/Collection.php b/src/Collection.php
index 7769b9a5b..8ed460c4e 100644
--- a/src/Collection.php
+++ b/src/Collection.php
@@ -23,6 +23,7 @@
 use MongoDB\BSON\JavascriptInterface;
 use MongoDB\BSON\PackedArray;
 use MongoDB\Builder\BuilderEncoder;
+use MongoDB\Builder\Pipeline;
 use MongoDB\Codec\DocumentCodec;
 use MongoDB\Codec\Encoder;
 use MongoDB\Driver\CursorInterface;
@@ -223,6 +224,12 @@ public function __toString()
      */
     public function aggregate(array $pipeline, array $options = [])
     {
+        if (is_builder_pipeline($pipeline)) {
+            $pipeline = new Pipeline(...$pipeline);
+        }
+
+        $pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
+
         $hasWriteStage = is_last_pipeline_operator_write($pipeline);
 
         $options = $this->inheritReadPreference($options);
@@ -1098,6 +1105,12 @@ public function updateSearchIndex(string $name, array|object $definition, array
      */
     public function watch(array $pipeline = [], array $options = [])
     {
+        if (is_builder_pipeline($pipeline)) {
+            $pipeline = new Pipeline(...$pipeline);
+        }
+
+        $pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
+
         $options = $this->inheritReadOptions($options);
         $options = $this->inheritCodecOrTypeMap($options);
 
diff --git a/src/Database.php b/src/Database.php
index 8b7742b70..125012c81 100644
--- a/src/Database.php
+++ b/src/Database.php
@@ -21,6 +21,7 @@
 use MongoDB\BSON\Document;
 use MongoDB\BSON\PackedArray;
 use MongoDB\Builder\BuilderEncoder;
+use MongoDB\Builder\Pipeline;
 use MongoDB\Codec\Encoder;
 use MongoDB\Driver\ClientEncryption;
 use MongoDB\Driver\Cursor;
@@ -202,6 +203,12 @@ public function __toString()
      */
     public function aggregate(array $pipeline, array $options = [])
     {
+        if (is_builder_pipeline($pipeline)) {
+            $pipeline = new Pipeline(...$pipeline);
+        }
+
+        $pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
+
         $hasWriteStage = is_last_pipeline_operator_write($pipeline);
 
         if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
@@ -611,6 +618,12 @@ public function selectGridFSBucket(array $options = [])
      */
     public function watch(array $pipeline = [], array $options = [])
     {
+        if (is_builder_pipeline($pipeline)) {
+            $pipeline = new Pipeline(...$pipeline);
+        }
+
+        $pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
+
         if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
             $options['readPreference'] = $this->readPreference;
         }
diff --git a/src/functions.php b/src/functions.php
index ca30fdbc0..a445467ba 100644
--- a/src/functions.php
+++ b/src/functions.php
@@ -21,6 +21,7 @@
 use MongoDB\BSON\Document;
 use MongoDB\BSON\PackedArray;
 use MongoDB\BSON\Serializable;
+use MongoDB\Builder\Type\StageInterface;
 use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException;
 use MongoDB\Driver\Manager;
 use MongoDB\Driver\ReadPreference;
@@ -327,6 +328,27 @@ function is_pipeline(array|object $pipeline, bool $allowEmpty = false): bool
     return true;
 }
 
+/**
+ * Returns whether the argument is a list that contains at least one
+ * {@see StageInterface} object.
+ *
+ * @internal
+ */
+function is_builder_pipeline(array $pipeline): bool
+{
+    if (! $pipeline || ! array_is_list($pipeline)) {
+        return false;
+    }
+
+    foreach ($pipeline as $stage) {
+        if (is_object($stage) && $stage instanceof StageInterface) {
+            return true;
+        }
+    }
+
+    return false;
+}
+
 /**
  * Returns whether we are currently in a transaction.
  *
diff --git a/tests/ClientFunctionalTest.php b/tests/ClientFunctionalTest.php
index 3c04ecd67..ca0ea2722 100644
--- a/tests/ClientFunctionalTest.php
+++ b/tests/ClientFunctionalTest.php
@@ -2,6 +2,9 @@
 
 namespace MongoDB\Tests;
 
+use MongoDB\Builder\Pipeline;
+use MongoDB\Builder\Query;
+use MongoDB\Builder\Stage;
 use MongoDB\Client;
 use MongoDB\Driver\BulkWrite;
 use MongoDB\Driver\Command;
@@ -13,6 +16,7 @@
 
 use function call_user_func;
 use function is_callable;
+use function iterator_to_array;
 use function sprintf;
 
 /**
@@ -137,4 +141,25 @@ public function testAddAndRemoveSubscriber(): void
 
         $client->getManager()->executeCommand('admin', new Command(['ping' => 1]));
     }
+
+    public function testWatchWithBuilderPipeline(): void
+    {
+        $this->skipIfChangeStreamIsNotSupported();
+
+        if ($this->isShardedCluster()) {
+            $this->markTestSkipped('Test does not apply on sharded clusters: need more than a single getMore call on the change stream.');
+        }
+
+        $pipeline = new Pipeline(
+            Stage::match(operationType: Query::eq('insert')),
+        );
+        // Extract the list of stages for arg type restriction
+        $pipeline = iterator_to_array($pipeline);
+
+        $changeStream = $this->client->watch($pipeline);
+        $this->client->selectCollection($this->getDatabaseName(), $this->getCollectionName())->insertOne(['x' => 3]);
+        $changeStream->next();
+        $this->assertTrue($changeStream->valid());
+        $this->assertEquals('insert', $changeStream->current()->operationType);
+    }
 }
diff --git a/tests/Collection/BuilderCollectionFunctionalTest.php b/tests/Collection/BuilderCollectionFunctionalTest.php
index 15a89cda5..2ace15109 100644
--- a/tests/Collection/BuilderCollectionFunctionalTest.php
+++ b/tests/Collection/BuilderCollectionFunctionalTest.php
@@ -2,10 +2,13 @@
 
 namespace MongoDB\Tests\Collection;
 
+use MongoDB\Builder\Expression;
 use MongoDB\Builder\Pipeline;
 use MongoDB\Builder\Query;
 use MongoDB\Builder\Stage;
 
+use function iterator_to_array;
+
 class BuilderCollectionFunctionalTest extends FunctionalTestCase
 {
     public function setUp(): void
@@ -17,7 +20,18 @@ public function setUp(): void
 
     public function testAggregate(): void
     {
-        $this->markTestSkipped('Not supported yet');
+        $this->collection->insertMany([['x' => 10], ['x' => 10], ['x' => 10]]);
+        $pipeline = new Pipeline(
+            Stage::bucketAuto(
+                groupBy: Expression::intFieldPath('x'),
+                buckets: 2,
+            ),
+        );
+        // Extract the list of stages for arg type restriction
+        $pipeline = iterator_to_array($pipeline);
+
+        $results = $this->collection->aggregate($pipeline)->toArray();
+        $this->assertCount(2, $results);
     }
 
     public function testBulkWriteDeleteMany(): void
@@ -245,6 +259,22 @@ public function testUpdateManyWithPipeline(): void
 
     public function testWatch(): void
     {
-        $this->markTestSkipped('Not supported yet');
+        $this->skipIfChangeStreamIsNotSupported();
+
+        if ($this->isShardedCluster()) {
+            $this->markTestSkipped('Test does not apply on sharded clusters: need more than a single getMore call on the change stream.');
+        }
+
+        $pipeline = new Pipeline(
+            Stage::match(operationType: Query::eq('insert')),
+        );
+        // Extract the list of stages for arg type restriction
+        $pipeline = iterator_to_array($pipeline);
+
+        $changeStream = $this->collection->watch($pipeline);
+        $this->collection->insertOne(['x' => 3]);
+        $changeStream->next();
+        $this->assertTrue($changeStream->valid());
+        $this->assertEquals('insert', $changeStream->current()->operationType);
     }
 }
diff --git a/tests/Database/BuilderDatabaseFunctionalTest.php b/tests/Database/BuilderDatabaseFunctionalTest.php
new file mode 100644
index 000000000..9b89d87e1
--- /dev/null
+++ b/tests/Database/BuilderDatabaseFunctionalTest.php
@@ -0,0 +1,63 @@
+<?php
+
+namespace MongoDB\Tests\Database;
+
+use MongoDB\Builder\Expression;
+use MongoDB\Builder\Pipeline;
+use MongoDB\Builder\Query;
+use MongoDB\Builder\Stage;
+
+use function iterator_to_array;
+
+class BuilderDatabaseFunctionalTest extends FunctionalTestCase
+{
+    public function tearDown(): void
+    {
+        $this->dropCollection($this->getDatabaseName(), $this->getCollectionName());
+
+        parent::tearDown();
+    }
+
+    public function testAggregate(): void
+    {
+        $this->skipIfServerVersion('<', '6.0.0', '$documents stage is not supported');
+
+        $pipeline = new Pipeline(
+            Stage::documents([
+                ['x' => 1],
+                ['x' => 2],
+                ['x' => 3],
+            ]),
+            Stage::bucketAuto(
+                groupBy: Expression::intFieldPath('x'),
+                buckets: 2,
+            ),
+        );
+        // Extract the list of stages for arg type restriction
+        $pipeline = iterator_to_array($pipeline);
+
+        $results = $this->database->aggregate($pipeline)->toArray();
+        $this->assertCount(2, $results);
+    }
+
+    public function testWatch(): void
+    {
+        $this->skipIfChangeStreamIsNotSupported();
+
+        if ($this->isShardedCluster()) {
+            $this->markTestSkipped('Test does not apply on sharded clusters: need more than a single getMore call on the change stream.');
+        }
+
+        $pipeline = new Pipeline(
+            Stage::match(operationType: Query::eq('insert')),
+        );
+        // Extract the list of stages for arg type restriction
+        $pipeline = iterator_to_array($pipeline);
+
+        $changeStream = $this->database->watch($pipeline);
+        $this->database->selectCollection($this->getCollectionName())->insertOne(['x' => 3]);
+        $changeStream->next();
+        $this->assertTrue($changeStream->valid());
+        $this->assertEquals('insert', $changeStream->current()->operationType);
+    }
+}
diff --git a/tests/FunctionsTest.php b/tests/FunctionsTest.php
index 7098cb24a..a673ef719 100644
--- a/tests/FunctionsTest.php
+++ b/tests/FunctionsTest.php
@@ -4,6 +4,8 @@
 
 use MongoDB\BSON\Document;
 use MongoDB\BSON\PackedArray;
+use MongoDB\Builder\Stage\LimitStage;
+use MongoDB\Builder\Stage\MatchStage;
 use MongoDB\Driver\WriteConcern;
 use MongoDB\Model\BSONArray;
 use MongoDB\Model\BSONDocument;
@@ -12,6 +14,7 @@
 use function MongoDB\apply_type_map_to_document;
 use function MongoDB\create_field_path_type_map;
 use function MongoDB\document_to_array;
+use function MongoDB\is_builder_pipeline;
 use function MongoDB\is_first_key_operator;
 use function MongoDB\is_last_pipeline_operator_write;
 use function MongoDB\is_mapreduce_output_inline;
@@ -311,6 +314,21 @@ public function providePipelines(): array
         ];
     }
 
+    /** @dataProvider provideStagePipelines */
+    public function testIsBuilderPipeline($expected, $pipeline): void
+    {
+        $this->assertSame($expected, is_builder_pipeline($pipeline));
+    }
+
+    public function provideStagePipelines(): iterable
+    {
+        yield 'empty array' => [false, []];
+        yield 'array of arrays' => [false, [['$match' => ['x' => 1]]]];
+        yield 'map of stages' => [false, [1 => new MatchStage([])]];
+        yield 'stages' => [true, [new MatchStage([]), new LimitStage(1)]];
+        yield 'stages and operators' => [true, [new MatchStage([]), ['$limit' => 1]]];
+    }
+
     /** @dataProvider provideWriteConcerns */
     public function testIsWriteConcernAcknowledged($expected, WriteConcern $writeConcern): void
     {