Recently I was debugging this simple PySpark code:
1 2 3 |
someDataframe = ... someDict = someDataframe.collectAsMap() someOtherDataframe.filter(lambda x: x in someDict).take(1) |
First, we get some DataFrame. Next, we collect it to dictionary. It doesn’t matter how you create the dictionary, actually it could be a set or list as well. Finally, we do some filtering with lambda using in
operator.
I was running this in Python 3.7.3 and was getting this exception:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
[Stage 8:> (0 + 1) / 1]20/02/20 20:55:37 WARN TaskSetManager: Lost task 0.0 in stage 8.0 (TID 343, ip-10-0-0-2.ec2.internal, executor 20): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/lib/spark/python/pyspark/rdd.py", line 1371, in takeUpToNumLeft StopIteration The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/mnt/yarn/usercache/hadoop/appcache/application_1/container_1/pyspark.zip/pyspark/worker.py", line 230, in main process() File "/mnt/yarn/usercache/hadoop/appcache/application_1/container_1/pyspark.zip/pyspark/worker.py", line 225, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/mnt/yarn/usercache/hadoop/appcache/application_1/container_1/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream vs = list(itertools.islice(iterator, batch)) RuntimeError: generator raised StopIteration |
It looks like this is related to in
operator as replacing lambda with something like list(someDict.keys()).count(x) >= 0
stopped exceptions.
This is due to new generators behavior.
Similar issue was here, here and here.