Skip to content

Commit

Permalink
Merge pull request #377 from paillave/Scheduler
Browse files Browse the repository at this point in the history
update readers
  • Loading branch information
paillave authored Aug 19, 2022
2 parents 9860dbe + c7a85c7 commit b6e6be7
Show file tree
Hide file tree
Showing 31 changed files with 340 additions and 141 deletions.
4 changes: 3 additions & 1 deletion src/.vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
"request": "launch",
"preLaunchTask": "build",
"program": "${workspaceFolder}/Tutorials/Paillave.Etl.Samples/bin/Debug/net6.0/Paillave.Etl.Samples.dll",
"args": [ ],
"args": [
"/home/stephane/Desktop/"
],
"cwd": "${workspaceFolder}/Tutorials/Paillave.Etl.Samples",
"stopAtEntry": false,
"console": "externalTerminal"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<PackageId>Paillave.EntityFrameworkCoreExtension</PackageId>
<Version>2.0.46</Version>
<Version>2.0.47</Version>
<Authors>Stéphane Royer</Authors>
<Company></Company>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
Expand Down
2 changes: 1 addition & 1 deletion src/Paillave.Etl.Autofac/Paillave.Etl.Autofac.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<PackageId>Paillave.EtlNet.Autofac</PackageId>
<Version>2.0.46</Version>
<Version>2.0.47</Version>
<Authors>Stéphane Royer</Authors>
<Company></Company>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
Expand Down
2 changes: 1 addition & 1 deletion src/Paillave.Etl.Bloomberg/Paillave.Etl.Bloomberg.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<PackageId>Paillave.EtlNet.Bloomberg</PackageId>
<Version>2.0.46</Version>
<Version>2.0.47</Version>
<Authors>Stéphane Royer</Authors>
<Company></Company>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
Expand Down
2 changes: 1 addition & 1 deletion src/Paillave.Etl.Dropbox/Paillave.Etl.Dropbox.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<PackageId>Paillave.EtlNet.Dropbox</PackageId>
<Version>2.0.46</Version>
<Version>2.0.47</Version>
<Authors>Stéphane Royer</Authors>
<Company></Company>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<PackageId>Paillave.EtlNet.EntityFrameworkCore</PackageId>
<Version>2.0.46</Version>
<Version>2.0.47</Version>
<Authors>Stéphane Royer</Authors>
<Company></Company>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
Expand Down
6 changes: 3 additions & 3 deletions src/Paillave.Etl.ExcelFile/Core/ExcelFileDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public ExcelFileDefinition<T> SetDefaultMapping(bool withColumnHeader = true, Cu
}
return this;
}
public ExcelFileReader GetExcelReader(ExcelWorksheet excelWorksheet = null)
internal ExcelFileReader GetExcelReader(ExcelWorksheet excelWorksheet = null)
{
if ((_fieldDefinitions?.Count ?? 0) == 0) SetDefaultMapping();
if (_dataRange == null)
Expand Down Expand Up @@ -161,7 +161,7 @@ public ExcelFileDefinition<T> WithCultureInfo(string name)
this._cultureInfo = CultureInfo.GetCultureInfo(name);
return this;
}
public ExcelFileDefinition<T> MapColumnToProperty<TField>(int index, Expression<Func<T, TField>> memberLambda, CultureInfo cultureInfo = null)
internal ExcelFileDefinition<T> MapColumnToProperty<TField>(int index, Expression<Func<T, TField>> memberLambda, CultureInfo cultureInfo = null)
{
SetFieldDefinition(new ExcelFileFieldDefinition
{
Expand All @@ -171,7 +171,7 @@ public ExcelFileDefinition<T> MapColumnToProperty<TField>(int index, Expression<
});
return this;
}
public ExcelFileDefinition<T> MapColumnToProperty<TField>(int index, Expression<Func<T, TField>> memberLambda, string cultureInfo)
internal ExcelFileDefinition<T> MapColumnToProperty<TField>(int index, Expression<Func<T, TField>> memberLambda, string cultureInfo)
{
SetFieldDefinition(new ExcelFileFieldDefinition
{
Expand Down
20 changes: 18 additions & 2 deletions src/Paillave.Etl.ExcelFile/ExcelDatasetsValuesProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
using System.Threading;
using ExcelDataReader;
using System.Data;
using System.Collections.Generic;

namespace Paillave.Etl.ExcelFile
{
public class ExcelDatasetsValuesProviderArgs<TOut>
{
public Func<DataTable, IFileValue, TOut> GetOutput { get; set; }
public Func<DataTable, IFileValue, IEnumerable<TOut>> GetOutput { get; set; }
}
public class ExcelDatasetsValuesProvider<TOut> : ValuesProviderBase<IFileValue, TOut>
{
Expand All @@ -24,7 +25,22 @@ public override void PushValues(IFileValue input, Action<TOut> push, Cancellatio
var dataset = reader.AsDataSet();
dataset.DataSetName = input.Name;
foreach (var item in dataset.Tables.Cast<DataTable>())
push(_args.GetOutput(item, input));
_args.GetOutput(item, input).ToList().ForEach(push);
}
}
}

public class ExcelDataTablesValuesProvider : ValuesProviderBase<IFileValue, DataTable>
{
public override ProcessImpact PerformanceImpact => ProcessImpact.Average;
public override ProcessImpact MemoryFootPrint => ProcessImpact.Average;
public override void PushValues(IFileValue input, Action<DataTable> push, CancellationToken cancellationToken, IDependencyResolver resolver, IInvoker invoker)
{
using (var reader = ExcelReaderFactory.CreateReader(input.GetContent()))
{
var dataset = reader.AsDataSet();
dataset.DataSetName = input.Name;
dataset.Tables.Cast<DataTable>().ToList().ForEach(push);
}
}
}
Expand Down
167 changes: 132 additions & 35 deletions src/Paillave.Etl.ExcelFile/ExcelFile.Stream.ex.cs
Original file line number Diff line number Diff line change
@@ -1,73 +1,161 @@
using Paillave.Etl.Core;
using Paillave.Etl.Core.Mapping;
using Paillave.Etl.ExcelFile.Core;
using System;
using System.Collections.Generic;
using System.Data;
using System.IO;
using System.Linq.Expressions;

namespace Paillave.Etl.ExcelFile
{
public class ExcelFileArgBuilder
{
public ExcelFileDefinition<T> UseMap<T>(Expression<Func<IFieldMapper, T>> expression) => ExcelFileDefinition.Create(expression);
public ExcelFileDefinition<T> UseType<T>() => new ExcelFileDefinition<T>();
public ExcelFileDefinition<T> UseType<T>(T prototype) => new ExcelFileDefinition<T>();
}

public static class ExcelFileEx
{
#region CrossApplyExcelSheets
public static IStream<ExcelSheetSelection> CrossApplyExcelSheets(this IStream<IFileValue> stream, string name, bool noParallelisation = false)
public static IStream<ExcelSheetSelection> CrossApplyExcelSheets(
this IStream<IFileValue> stream,
string name,
bool noParallelisation = false)
=> stream.CrossApply(name, new ExcelSheetsValuesProvider<ExcelSheetSelection>(new ExcelSheetsValuesProviderArgs<ExcelSheetSelection>
{
GetOutput = (i, j) => i
}), noParallelisation);
public static IStream<TOut> CrossApplyExcelSheets<TOut>(this IStream<IFileValue> stream, string name, Func<ExcelSheetSelection, TOut> selector, bool noParallelisation = false)
public static IStream<TOut> CrossApplyExcelSheets<TOut>(
this IStream<IFileValue> stream,
string name,
Func<ExcelSheetSelection, TOut> selector,
bool noParallelisation = false)
=> stream.CrossApply(name, new ExcelSheetsValuesProvider<TOut>(new ExcelSheetsValuesProviderArgs<TOut>
{
GetOutput = (i, j) => selector(i)
}), noParallelisation);
public static IStream<TOut> CrossApplyExcelDatasets<TOut>(this IStream<IFileValue> stream, string name, Func<DataTable, IFileValue, TOut> selector, bool noParallelisation = false)
public static IStream<TOut> CrossApplyExcelDatasets<TOut>(
this IStream<IFileValue> stream,
string name,
Func<DataTable, IFileValue, IEnumerable<TOut>> selector,
bool noParallelisation = false)
=> stream.CrossApply(name, new ExcelDatasetsValuesProvider<TOut>(new ExcelDatasetsValuesProviderArgs<TOut>
{
GetOutput = selector
}), noParallelisation);
#endregion


#region CrossApplyExcelDatasets
[Obsolete("use CrossApplyExcelDatatables instead")]
public static IStream<DataTable> CrossApplyExcelDatasets(this IStream<IFileValue> stream, string name, bool noParallelisation = false)
=> stream.CrossApplyExcelDatatables(name, noParallelisation);
[Obsolete("use CrossApplyExcelDatatables instead")]
public static IStream<TOut> CrossApplyExcelDatasets<TOut>(this IStream<IFileValue> stream, string name, Func<DataTable, TOut> selector, bool noParallelisation = false)
=> stream.CrossApplyExcelDatatables(name, selector, noParallelisation);
public static IStream<DataTable> CrossApplyExcelDatatables(this IStream<IFileValue> stream, string name, bool noParallelisation = false)
=> stream.CrossApply(name, new ExcelDatasetsValuesProvider<DataTable>(new ExcelDatasetsValuesProviderArgs<DataTable>
{
GetOutput = (i, j) => i
}), noParallelisation);
public static IStream<TOut> CrossApplyExcelDatatables<TOut>(this IStream<IFileValue> stream, string name, Func<DataTable, TOut> selector, bool noParallelisation = false)
public static IStream<DataTable> CrossApplyExcelDataTables(
this IStream<IFileValue> stream,
string name,
bool noParallelisation = false)
=> stream.CrossApply(name, new ExcelDataTablesValuesProvider(), noParallelisation);
public static IStream<TOut> CrossApplyExcelDataTables<TOut>(
this IStream<IFileValue> stream,
string name,
Func<DataTable, IEnumerable<TOut>> selector,
bool noParallelisation = false)
=> stream.CrossApply(name, new ExcelDatasetsValuesProvider<TOut>(new ExcelDatasetsValuesProviderArgs<TOut>
{
GetOutput = (i, j) => selector(i)
}), noParallelisation);
#endregion

#region CrossApplyExcelRows
public static IStream<TOut> CrossApplyExcelRows<TParsed, TOut>(this IStream<ExcelSheetSelection> stream, string name, ExcelFileDefinition<TParsed> mapping, Func<TParsed, ExcelSheetSelection, TOut> selector, bool noParallelisation = false)
public static IStream<TOut> CrossApplyExcelRows<TParsed, TOut>(
this IStream<ExcelSheetSelection> stream,
string name,
Func<ExcelFileArgBuilder, ExcelFileDefinition<TParsed>> mapBuilder,
Func<TParsed, ExcelSheetSelection, TOut> selector,
bool noParallelisation = false)
=> stream.CrossApply(name, new ExcelRowsValuesProvider<ExcelSheetSelection, TParsed, TOut>(new ExcelRowsValuesProviderArgs<ExcelSheetSelection, TParsed, TOut>
{
Mapping = mapBuilder(new()),
GetSheetSelection = i => i,
GetOutput = selector
}), noParallelisation);
public static IStream<TOut> CrossApplyExcelRows<TParsed, TOut>(
this IStream<ExcelSheetSelection> stream,
string name,
ExcelFileDefinition<TParsed> mapping,
Func<TParsed, ExcelSheetSelection, TOut> selector,
bool noParallelisation = false)
=> stream.CrossApply(name, new ExcelRowsValuesProvider<ExcelSheetSelection, TParsed, TOut>(new ExcelRowsValuesProviderArgs<ExcelSheetSelection, TParsed, TOut>
{
Mapping = mapping,
GetSheetSelection = i => i,
GetOutput = selector
}), noParallelisation);
public static IStream<TOut> CrossApplyExcelRows<TIn, TParsed, TOut>(this IStream<TIn> stream, string name, ExcelFileDefinition<TParsed> mapping, Func<TIn, ExcelSheetSelection> sheetSelection, Func<TParsed, TIn, TOut> selector, bool noParallelisation = false)
public static IStream<TOut> CrossApplyExcelRows<TIn, TParsed, TOut>(
this IStream<TIn> stream,
string name,
Func<ExcelFileArgBuilder, ExcelFileDefinition<TParsed>> mapBuilder,
Func<TIn, ExcelSheetSelection> sheetSelection,
Func<TParsed, TIn, TOut> selector,
bool noParallelisation = false)
=> stream.CrossApply(name, new ExcelRowsValuesProvider<TIn, TParsed, TOut>(new ExcelRowsValuesProviderArgs<TIn, TParsed, TOut>
{
Mapping = mapBuilder(new()),
GetSheetSelection = sheetSelection,
GetOutput = selector
}), noParallelisation);
public static IStream<TOut> CrossApplyExcelRows<TIn, TParsed, TOut>(
this IStream<TIn> stream,
string name,
ExcelFileDefinition<TParsed> mapping,
Func<TIn, ExcelSheetSelection> sheetSelection,
Func<TParsed, TIn, TOut> selector,
bool noParallelisation = false)
=> stream.CrossApply(name, new ExcelRowsValuesProvider<TIn, TParsed, TOut>(new ExcelRowsValuesProviderArgs<TIn, TParsed, TOut>
{
Mapping = mapping,
GetSheetSelection = sheetSelection,
GetOutput = selector
}), noParallelisation);
public static IStream<TParsed> CrossApplyExcelRows<TIn, TParsed>(this IStream<TIn> stream, string name, ExcelFileDefinition<TParsed> mapping, Func<TIn, ExcelSheetSelection> sheetSelection, bool noParallelisation = false)
public static IStream<TParsed> CrossApplyExcelRows<TIn, TParsed>(
this IStream<TIn> stream,
string name,
Func<ExcelFileArgBuilder, ExcelFileDefinition<TParsed>> mapBuilder,
Func<TIn, ExcelSheetSelection> sheetSelection,
bool noParallelisation = false)
=> stream.CrossApply(name, new ExcelRowsValuesProvider<TIn, TParsed, TParsed>(new ExcelRowsValuesProviderArgs<TIn, TParsed, TParsed>
{
Mapping = mapBuilder(new()),
GetSheetSelection = sheetSelection,
GetOutput = (i, j) => i
}), noParallelisation);
public static IStream<TParsed> CrossApplyExcelRows<TIn, TParsed>(
this IStream<TIn> stream,
string name,
ExcelFileDefinition<TParsed> mapping,
Func<TIn, ExcelSheetSelection> sheetSelection,
bool noParallelisation = false)
=> stream.CrossApply(name, new ExcelRowsValuesProvider<TIn, TParsed, TParsed>(new ExcelRowsValuesProviderArgs<TIn, TParsed, TParsed>
{
Mapping = mapping,
GetSheetSelection = sheetSelection,
GetOutput = (i, j) => i
}), noParallelisation);
public static IStream<TParsed> CrossApplyExcelRows<TParsed>(this IStream<ExcelSheetSelection> stream, string name, ExcelFileDefinition<TParsed> mapping, bool noParallelisation = false)
public static IStream<TParsed> CrossApplyExcelRows<TParsed>(
this IStream<ExcelSheetSelection> stream,
string name,
Func<ExcelFileArgBuilder, ExcelFileDefinition<TParsed>> mapBuilder,
bool noParallelisation = false)
=> stream.CrossApply(name, new ExcelRowsValuesProvider<ExcelSheetSelection, TParsed, TParsed>(new ExcelRowsValuesProviderArgs<ExcelSheetSelection, TParsed, TParsed>
{
Mapping = mapBuilder(new()),
GetSheetSelection = i => i,
GetOutput = (i, j) => i
}), noParallelisation);
public static IStream<TParsed> CrossApplyExcelRows<TParsed>(
this IStream<ExcelSheetSelection> stream,
string name,
ExcelFileDefinition<TParsed> mapping,
bool noParallelisation = false)
=> stream.CrossApply(name, new ExcelRowsValuesProvider<ExcelSheetSelection, TParsed, TParsed>(new ExcelRowsValuesProviderArgs<ExcelSheetSelection, TParsed, TParsed>
{
Mapping = mapping,
Expand All @@ -77,45 +165,54 @@ public static IStream<TParsed> CrossApplyExcelRows<TParsed>(this IStream<ExcelSh
#endregion

#region ThroughExcelFile
public static IStream<TIn> ToExcelFile<TIn>(this IStream<TIn> stream, string name, ISingleStream<Stream> resourceStream, ExcelFileDefinition<TIn> mapping = null)
{
return new ToExcelFileStreamNode<TIn, IStream<TIn>>(name, new ToExcelFileArgs<TIn, IStream<TIn>>
public static IStream<TIn> ToExcelFile<TIn>(
this IStream<TIn> stream,
string name,
ISingleStream<Stream> resourceStream,
ExcelFileDefinition<TIn> mapping = null)
=> new ToExcelFileStreamNode<TIn, IStream<TIn>>(name, new ToExcelFileArgs<TIn, IStream<TIn>>
{
MainStream = stream,
TargetStream = resourceStream,
Mapping = mapping
}).Output;
}
public static ISortedStream<TIn, TKey> ToExcelFile<TIn, TKey>(this ISortedStream<TIn, TKey> stream, string name, ISingleStream<Stream> resourceStream, ExcelFileDefinition<TIn> mapping = null)
{
return new ToExcelFileStreamNode<TIn, ISortedStream<TIn, TKey>>(name, new ToExcelFileArgs<TIn, ISortedStream<TIn, TKey>>
public static ISortedStream<TIn, TKey> ToExcelFile<TIn, TKey>(
this ISortedStream<TIn, TKey> stream,
string name,
ISingleStream<Stream> resourceStream,
ExcelFileDefinition<TIn> mapping = null)
=> new ToExcelFileStreamNode<TIn, ISortedStream<TIn, TKey>>(name, new ToExcelFileArgs<TIn, ISortedStream<TIn, TKey>>
{
MainStream = stream,
TargetStream = resourceStream,
Mapping = mapping
}).Output;
}
public static IKeyedStream<TIn, TKey> ToExcelFile<TIn, TKey>(this IKeyedStream<TIn, TKey> stream, string name, ISingleStream<Stream> resourceStream, ExcelFileDefinition<TIn> mapping = null)
{
return new ToExcelFileStreamNode<TIn, IKeyedStream<TIn, TKey>>(name, new ToExcelFileArgs<TIn, IKeyedStream<TIn, TKey>>
public static IKeyedStream<TIn, TKey> ToExcelFile<TIn, TKey>(
this IKeyedStream<TIn, TKey> stream,
string name,
ISingleStream<Stream> resourceStream,
ExcelFileDefinition<TIn> mapping = null)
=> new ToExcelFileStreamNode<TIn, IKeyedStream<TIn, TKey>>(name, new ToExcelFileArgs<TIn, IKeyedStream<TIn, TKey>>
{
MainStream = stream,
TargetStream = resourceStream,
Mapping = mapping
}).Output;
}

#endregion

#region ToExcelFile
public static IStream<IFileValue> ToExcelFile<TIn>(this IStream<TIn> stream, string name, string fileName, ExcelFileDefinition<TIn> mapping = null)
{
return new ToExcelFileStreamNode<TIn>(name, new ToExcelFileArgs<TIn>
public static IStream<IFileValue> ToExcelFile<TIn>(
this IStream<TIn> stream,
string name,
string fileName,
ExcelFileDefinition<TIn> mapping = null)
=> new ToExcelFileStreamNode<TIn>(name, new ToExcelFileArgs<TIn>
{
MainStream = stream,
Mapping = mapping,
FileName = fileName
}).Output;
}
#endregion
}
}
2 changes: 1 addition & 1 deletion src/Paillave.Etl.ExcelFile/Paillave.Etl.ExcelFile.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<PackageId>Paillave.EtlNet.ExcelFile</PackageId>
<Version>2.0.46</Version>
<Version>2.0.47</Version>
<Authors>Stéphane Royer</Authors>
<Company></Company>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<PackageId>Paillave.EtlNet.ExecutionToolkit</PackageId>
<Version>2.0.46</Version>
<Version>2.0.47</Version>
<Authors>Stéphane Royer</Authors>
<Company></Company>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
Expand Down
Loading

0 comments on commit b6e6be7

Please sign in to comment.