Skip to content

Commit

Permalink
Apply formatter.
Browse files Browse the repository at this point in the history
  • Loading branch information
alxmrs committed Jan 29, 2025
1 parent 048f8e3 commit aa85894
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 17 deletions.
5 changes: 3 additions & 2 deletions sdks/python/apache_beam/runners/dask/dask_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,15 @@ def _add_argparse_args(cls, parser: argparse.ArgumentParser) -> None:
type=int,
default=None,
help='The desired number of `dask.Bag` partitions. When unspecified, '
'an educated guess is made.')
'an educated guess is made.')
partitions_parser.add_argument(
'--dask_partition_size',
dest='partition_size',
type=int,
default=None,
help='The length of each `dask.Bag` partition. When unspecified, '
'an educated guess is made.')
'an educated guess is made.')


@dataclasses.dataclass
class DaskRunnerResult(PipelineResult):
Expand Down
23 changes: 8 additions & 15 deletions sdks/python/apache_beam/runners/dask/transform_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,26 +160,19 @@ def apply(self, input_bag: OpInput, side_inputs: OpSide = None) -> db.Bag:
npartitions = self.bag_kwargs.get('npartitions')
partition_size = self.bag_kwargs.get('partition_size')
if npartitions and partition_size:
raise ValueError(
raise ValueError(
f'Please specify either `dask_npartitions` or '
f'`dask_parition_size` but not both: '
f'{npartitions=}, {partition_size=}.'
)
f'{npartitions=}, {partition_size=}.')
if not npartitions and not partition_size:
# partition_size is inversely related to `npartitions`.
# Ideal "chunk sizes" in dask are around 10-100 MBs.
# Let's hope ~128 items per partition is around this
# memory overhead.
partition_size = max(
128,
math.ceil(math.sqrt(len(items)) / 10)
)
# partition_size is inversely related to `npartitions`.
# Ideal "chunk sizes" in dask are around 10-100 MBs.
# Let's hope ~128 items per partition is around this
# memory overhead.
partition_size = max(128, math.ceil(math.sqrt(len(items)) / 10))

return db.from_sequence(
items,
npartitions=npartitions,
partition_size=partition_size
)
items, npartitions=npartitions, partition_size=partition_size)


def apply_dofn_to_bundle(
Expand Down

0 comments on commit aa85894

Please sign in to comment.