diff --git a/src/EtlExecutor.php b/src/EtlExecutor.php index 7566a3d..86811c4 100644 --- a/src/EtlExecutor.php +++ b/src/EtlExecutor.php @@ -4,14 +4,11 @@ namespace BenTools\ETL; -use BenTools\ETL\EventDispatcher\Event\BeforeLoadEvent; use BenTools\ETL\EventDispatcher\Event\EndEvent; -use BenTools\ETL\EventDispatcher\Event\ExtractEvent; use BenTools\ETL\EventDispatcher\Event\FlushEvent; use BenTools\ETL\EventDispatcher\Event\InitEvent; use BenTools\ETL\EventDispatcher\Event\LoadEvent; use BenTools\ETL\EventDispatcher\Event\StartEvent; -use BenTools\ETL\EventDispatcher\Event\TransformEvent; use BenTools\ETL\EventDispatcher\EventDispatcher; use BenTools\ETL\EventDispatcher\PrioritizedListenerProvider; use BenTools\ETL\Exception\ExtractException; @@ -24,6 +21,7 @@ use BenTools\ETL\Extractor\IterableExtractor; use BenTools\ETL\Internal\ClonableTrait; use BenTools\ETL\Internal\ConditionalLoaderTrait; +use BenTools\ETL\Internal\DispatchEventsTrait; use BenTools\ETL\Internal\EtlBuilderTrait; use BenTools\ETL\Internal\TransformResult; use BenTools\ETL\Loader\InMemoryLoader; @@ -49,6 +47,11 @@ final class EtlExecutor implements EventDispatcherInterface */ use EtlBuilderTrait; + /** + * @use DispatchEventsTrait + */ + use DispatchEventsTrait; + use ConditionalLoaderTrait; private EventDispatcher $eventDispatcher; @@ -74,14 +77,14 @@ public function process(mixed $source = null, mixed $destination = null, array $ $state = new EtlState(options: $this->options, source: $source, destination: $destination, context: $context); try { - $this->dispatch(new InitEvent($state)); + $this->emit(InitEvent::class, $state); $items = $this->extractor->extract($state); $state = $state->getLastVersion(); if (is_countable($items)) { $state = $state->update($state->withNbTotalItems(count($items))); } - $this->dispatch(new StartEvent($state)); + $this->emit(StartEvent::class, $state); if (!$this->processor->supports($items)) { throw new ExtractException(sprintf('Current processor %s cannot process data of type: %s.', $this->processor::class, get_debug_type($items))); @@ -100,8 +103,7 @@ public function processItem(mixed $item, mixed $key, EtlState $state): void if ($state->currentItemIndex > 0) { $this->consumeNextTick($state); } - $event = $this->dispatch(new ExtractEvent($state, $item)); - $item = $event->item; + $item = $this->emitExtractEvent($state, $item); $itemsToLoad = $this->transform($item, $state); $this->load($itemsToLoad, $state); } @@ -123,10 +125,10 @@ private function consumeNextTick(EtlState $state): void private function transform(mixed $item, EtlState $state): array { try { - $transformResult = TransformResult::create($this->transformer->transform($item, $state)); - - $event = $this->dispatch(new TransformEvent($state, $transformResult)); - $transformResult = TransformResult::create($event->transformResult); + $transformResult = $this->emitTransformEvent( + $state, + TransformResult::create($this->transformer->transform($item, $state)), + ); return [...$transformResult]; } catch (SkipRequest|StopRequest $e) { @@ -151,7 +153,7 @@ private function load(array $items, EtlState $state): void continue; } try { - $item = $this->dispatch(new BeforeLoadEvent($state, $item))->item; + $item = $this->emitBeforeLoadEvent($state, $item); } catch (SkipRequest) { continue; } catch (StopRequest) { @@ -159,7 +161,7 @@ private function load(array $items, EtlState $state): void } $this->loader->load($item, $state); $state = $state->update($state->getLastVersion()->withIncrementedNbLoadedItems()); - $this->dispatch(new LoadEvent($state, $item)); + $this->emit(LoadEvent::class, $state, $item); } } catch (SkipRequest|StopRequest $e) { throw $e; @@ -173,9 +175,9 @@ private function load(array $items, EtlState $state): void /** * @internal */ - private function flush(EtlState $state, bool $isPartial): mixed + private function flush(EtlState $state, bool $early): mixed { - if ($isPartial && !$state->shouldFlush()) { + if ($early && !$state->shouldFlush()) { return null; } @@ -186,11 +188,11 @@ private function flush(EtlState $state, bool $isPartial): mixed $output = null; $state->flush(); try { - $output = $this->loader->flush($isPartial, $state); + $output = $this->loader->flush($early, $state); } catch (Throwable $e) { FlushException::emit($this->eventDispatcher, $e, $state); } - $this->dispatch(new FlushEvent($state, $isPartial, $output)); + $this->emit(FlushEvent::class, $state, $early, $output); $state->update($state->withClearedFlush()); return $output; @@ -210,26 +212,10 @@ private function terminate(EtlState $state): EtlState } $state = $state->update($state->withOutput($output)); - $this->dispatch(new EndEvent($state)); + $this->emit(EndEvent::class, $state); gc_collect_cycles(); return $state; } - - /** - * @internal - * - * @param T $event - * - * @return T - * - * @template T of object - */ - public function dispatch(object $event): object - { - $this->eventDispatcher->dispatch($event); - - return $event; - } } diff --git a/src/EventDispatcher/Event/FlushEvent.php b/src/EventDispatcher/Event/FlushEvent.php index d60b583..c622715 100644 --- a/src/EventDispatcher/Event/FlushEvent.php +++ b/src/EventDispatcher/Event/FlushEvent.php @@ -15,7 +15,7 @@ final class FlushEvent extends Event implements StoppableEventInterface public function __construct( public readonly EtlState $state, public readonly bool $early, - public mixed $output, + public readonly mixed $output, ) { } } diff --git a/src/EventDispatcher/PrioritizedListenerProvider.php b/src/EventDispatcher/PrioritizedListenerProvider.php index 9dafaef..3470a8c 100644 --- a/src/EventDispatcher/PrioritizedListenerProvider.php +++ b/src/EventDispatcher/PrioritizedListenerProvider.php @@ -28,6 +28,11 @@ public function listenTo(string $eventClass, callable $callback, int $priority = $this->flattenedListeners[$eventClass] = array_merge(...$this->prioritizedListeners[$eventClass]); } + public function hasListeners(string $eventClass): bool + { + return isset($this->flattenedListeners[$eventClass]); + } + /** * @return iterable */ diff --git a/src/Internal/ConditionalLoaderTrait.php b/src/Internal/ConditionalLoaderTrait.php index 04bd5a1..4bd8395 100644 --- a/src/Internal/ConditionalLoaderTrait.php +++ b/src/Internal/ConditionalLoaderTrait.php @@ -8,6 +8,9 @@ use BenTools\ETL\Loader\ConditionalLoaderInterface; use BenTools\ETL\Loader\LoaderInterface; +/** + * @internal + */ trait ConditionalLoaderTrait { private static function shouldLoad(LoaderInterface $loader, mixed $item, EtlState $state): bool diff --git a/src/Internal/DispatchEventsTrait.php b/src/Internal/DispatchEventsTrait.php new file mode 100644 index 0000000..a547f7b --- /dev/null +++ b/src/Internal/DispatchEventsTrait.php @@ -0,0 +1,72 @@ +eventDispatcher->dispatch($event); + + return $event; + } + + /** + * @template E of Event + * + * @param class-string $eventClass + * + * @return E|null + */ + private function emit(string $eventClass, EtlState $state, mixed ...$args): ?Event + { + if (!$this->listenerProvider->hasListeners($eventClass)) { + return null; + } + + return $this->dispatch(new $eventClass($state, ...$args)); + } + + private function emitExtractEvent(EtlState $state, mixed $item): mixed + { + $event = $this->emit(ExtractEvent::class, $state, $item); + + return $event?->item ?? $item; + } + + private function emitTransformEvent(EtlState $state, TransformResult $transformResult): TransformResult + { + $event = $this->emit(TransformEvent::class, $state, $transformResult); + + return TransformResult::create($event?->transformResult ?? $transformResult); + } + + private function emitBeforeLoadEvent(EtlState $state, mixed $item): mixed + { + $event = $this->emit(BeforeLoadEvent::class, $state, $item); + + return $event?->item ?? $item; + } +} diff --git a/src/Internal/EtlBuilderTrait.php b/src/Internal/EtlBuilderTrait.php index af50b16..31d98a1 100644 --- a/src/Internal/EtlBuilderTrait.php +++ b/src/Internal/EtlBuilderTrait.php @@ -5,6 +5,7 @@ namespace BenTools\ETL\Internal; use BenTools\ETL\EtlConfiguration; +use BenTools\ETL\EtlExecutor; use BenTools\ETL\Extractor\CallableExtractor; use BenTools\ETL\Extractor\ChainExtractor; use BenTools\ETL\Extractor\ExtractorInterface; @@ -22,12 +23,12 @@ /** * @internal * - * @template T + * @template EtlExecutor */ trait EtlBuilderTrait { /** - * @use EtlEventListenersTrait + * @use EtlEventListenersTrait */ use EtlEventListenersTrait; diff --git a/src/Internal/EtlEventListenersTrait.php b/src/Internal/EtlEventListenersTrait.php index 312b470..a82c182 100644 --- a/src/Internal/EtlEventListenersTrait.php +++ b/src/Internal/EtlEventListenersTrait.php @@ -21,7 +21,7 @@ /** * @internal * - * @template T + * @template EtlExecutor */ trait EtlEventListenersTrait { diff --git a/src/Internal/StateHolder.php b/src/Internal/StateHolder.php index ebae2af..5849f6c 100644 --- a/src/Internal/StateHolder.php +++ b/src/Internal/StateHolder.php @@ -6,6 +6,9 @@ use BenTools\ETL\EtlState; +/** + * @internal + */ final class StateHolder { public function __construct( diff --git a/src/Internal/TransformResult.php b/src/Internal/TransformResult.php index 5d2eded..c85f937 100644 --- a/src/Internal/TransformResult.php +++ b/src/Internal/TransformResult.php @@ -36,8 +36,12 @@ public static function create(mixed $value): self static $prototype; $prototype ??= new self(); + if ($value instanceof self) { + return $value; + } + $that = clone $prototype; - if ($value instanceof Generator || $value instanceof self) { + if ($value instanceof Generator) { $that->value = [...$value]; $that->iterable = true; } else {