map by partition: dask and pyspark inconsistency #470
pavlis
started this conversation in
Design & Development
Replies: 1 comment 3 replies
-
hmmm... this is tricky and I am not aware of the currying concept before. I wonder if it is possible to create another function wrapper like this:
|
Beta Was this translation helpful? Give feedback.
3 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
This particular topic comes up from a frustrating experience I've had working on a revision to the
write_distributed_data
function. The problem was frustrating because of a fundamental problem debugging parallel code involved in any lazy/delayed computation. This particular problem was hidden for a long time by misleading errors from pyspark.The problem I uncovered is a disconnect in pyspark that is a legacy of the same problem that we long ago fond requiree a different usage in map operator calls in dask and spark. That is, consider the example of how we run
Database.save_data
in a map operator in dask and spark. I dask you would use:while in pyspark you need to use a lambda function
The reason the lambda is necessary is that pyspark is the python translation of spark where the native language is scala. I don't know scala, but scala is a "functional programming" language which makes the fact that the map method of an rdd, which is what is used above, takes a function object as the only required argument. We use the lambda as a convenient way to translate a regular python function where arg0 of the function expects to always handle a single datum in the container.
That was a review for all MsPASS developers, but is needed to understand a related operator found in both dask and pyspark. In dask it has the name
map_partitions
and in pyspark it is calledmapPartitions
. The documentation for both dask and spark state that one should use the partitioned form when the function has an overhead that is expensive. I, correctly I am certain, realized this was the right approach to use inwrite_distributed_data
. What it allows is to replace MongoDBinsert_one
transactions with a fewer calls toinsert_many
where the "many" is defined by the number of data in each partition. You can look at the current implementation in the (presently almost complete) branch with the (bad) name "cleanup_database". A key point is that the approach works fine in dask. That is, I have a working version of this approach for dask. The dask implementation was more straightforward because the api formap_partitions
in pythonic and just allows regular and variable (kwargs) arguments as part of the api. The only oddity I had to get around is that arg0 for the function received by dask'smap_partition
is not a single datum, which it is the case for a map operator but an "iterator" that is best thought of as a pointer into the bag container. The only real complexity is that the iterator can only be traversed once in the function passed to map_partition.Now the problem I need help with that we need to preserve for the record. The documentation for spark's
mapPartitions
method of RDD can be found here. The key point is that like map, mapPartitions requires only one argument, which is the function name to be run "by partition". That is consistent with pyspark being derived from a scala api where that kind of construct would be the norm. In python it is an oddity and clashes with the normal use of the language. The problem this causes is that, as far as I can tell, a lambda function cannot be used to resolve this problem. i.e. a construct likedoes not work and there is no way to do something like this:
The reason is that "myfunction" needs an "iterable" for arg0 and what you get from a normal lambda like the above is not an iterable. The second form is maybe possible if there is a way to make d be an iterable derived from the partitioning and the container, but I haven't seen anything on the web on how to do that.
The main solution I've seen described on the web is to use "currying". That was a new term to me. Here is one of many pages you can find on this topic for python programming That is probably the solution to this problem, but I haven't found it yet. I think I can work it out for this specific example. I just have to get the right incantation to implement currying for this particular example. If I get this done I'll post it to this page, but it defines a more general problem; I'm not sure there is a generic way to equivalence a dask
map_partitions
to a sparkmapPartitions
call sequence.Do any of you have an experience or insight on this problem?
Beta Was this translation helpful? Give feedback.
All reactions