Spark – Random IT Utensils https://blog.adamfurmanek.pl IT, operating systems, maths, and more. Thu, 19 Mar 2020 00:51:49 +0000 en-US hourly 1 https://wordpress.org/?v=6.5.2 RuntimeError: generator raised StopIteration in PySpark in Python 3.7.3 https://blog.adamfurmanek.pl/2020/08/15/runtimeerror-generator-raised-stopiteration-in-pyspark-in-python-3-7-3/ https://blog.adamfurmanek.pl/2020/08/15/runtimeerror-generator-raised-stopiteration-in-pyspark-in-python-3-7-3/#respond Sat, 15 Aug 2020 08:00:28 +0000 https://blog.adamfurmanek.pl/?p=3426 Continue reading RuntimeError: generator raised StopIteration in PySpark in Python 3.7.3]]> Recently I was debugging this simple PySpark code:

someDataframe = ...
someDict = someDataframe.collectAsMap()
someOtherDataframe.filter(lambda x: x in someDict).take(1)

First, we get some DataFrame. Next, we collect it to dictionary. It doesn’t matter how you create the dictionary, actually it could be a set or list as well. Finally, we do some filtering with lambda using in operator.

I was running this in Python 3.7.3 and was getting this exception:

[Stage 8:>                                                          (0 + 1) / 1]20/02/20 20:55:37 WARN TaskSetManager: Lost task 0.0 in stage 8.0 (TID 343, ip-10-0-0-2.ec2.internal, executor 20): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1371, in takeUpToNumLeft
StopIteration

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1/container_1/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1/container_1/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1/container_1/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
RuntimeError: generator raised StopIteration

It looks like this is related to in operator as replacing lambda with something like list(someDict.keys()).count(x) >= 0 stopped exceptions.
This is due to new generators behavior.
Similar issue was here, here and here.

]]>
https://blog.adamfurmanek.pl/2020/08/15/runtimeerror-generator-raised-stopiteration-in-pyspark-in-python-3-7-3/feed/ 0
Data encryption in s3 in Spark in EMR with multiple encryption schemes https://blog.adamfurmanek.pl/2020/04/04/data-encryption-in-s3-in-spark-in-emr-with-multiple-encryption-schemes/ https://blog.adamfurmanek.pl/2020/04/04/data-encryption-in-s3-in-spark-in-emr-with-multiple-encryption-schemes/#respond Sat, 04 Apr 2020 08:00:59 +0000 https://blog.adamfurmanek.pl/?p=3269 Continue reading Data encryption in s3 in Spark in EMR with multiple encryption schemes]]> Spark supports multiple encryption schemes. You can use client side encryption, server side encryption, etc. What wasn’t working for me for a long time is reading encrypted data and writing as a plain text. Before reading I was configuring encryption and this was working fine. However, writing as plain text didn’t work (data was encrypted), even though I was disabling encryption.

I was told that this is because encryption settings are cached and my changes are not honored. However, what works for me now is using different access protocols to read and write s3 files.

So, for configuration do this:

// Enable CSE for s3:// prefix  
spark.conf.set("fs.s3.enableServerSideEncryption", "false")
spark.conf.set("fs.s3.cse.enabled", "true")
spark.conf.set("fs.s3.cse.encryptionMaterialsProvider", "com.amazon.ws.emr.hadoop.fs.cse.KMSEncryptionMaterialsProvider")
spark.conf.set("fs.s3.cse.kms.keyId", "KMS ID") // KMS key to encrypt the data with
spark.conf.set("fs.s3.cse.kms.region", "us-east-1") // the region for the KMS key

// Disable CSE for s3a:// prefix to not encrypt 
spark.conf.set("fs.s3a.enableServerSideEncryption", "false")
spark.conf.set("fs.s3a.cse.enabled", "false")
spark.conf.set("fs.s3a.canned.acl","BucketOwnerFullControl")
spark.conf.set("fs.s3a.acl.default","BucketOwnerFullControl")
spark.conf.set("fs.s3a.acl","bucket-owner-full-control")

or in Python do this:

# Enable CSE for s3:// prefix 
spark._jsc.hadoopConfiguration().set("fs.s3.enableServerSideEncryption", "false")
spark._jsc.hadoopConfiguration().set("fs.s3.cse.enabled", "true")
spark._jsc.hadoopConfiguration().set("fs.s3.cse.encryptionMaterialsProvider", "com.amazon.ws.emr.hadoop.fs.cse.KMSEncryptionMaterialsProvider")
spark._jsc.hadoopConfiguration().set("fs.s3.cse.kms.keyId", "KMS ID") # KMS key to encrypt the data with
spark._jsc.hadoopConfiguration().set("fs.s3.cse.kms.region", "us-east-1") # the region for the KMS key

# Disable CSE for s3a:// prefix to not encrypt 
spark._jsc.hadoopConfiguration().set("fs.s3a.enableServerSideEncryption", "false")
spark._jsc.hadoopConfiguration().set("fs.s3a.cse.enabled", "false")
spark._jsc.hadoopConfiguration().set("fs.s3a.canned.acl","BucketOwnerFullControl")
spark._jsc.hadoopConfiguration().set("fs.s3a.acl.default","BucketOwnerFullControl")
spark._jsc.hadoopConfiguration().set("fs.s3a.acl","bucket-owner-full-control")

Now, when you read or write file using s3 prefix, it uses encryption with KMS key. However, if you read or write using s3a, it doesn’t encrypt. You can use s3n prefix to configure yet another encryption scheme. If you want to do more, you need to dig into protocol handlers.

]]>
https://blog.adamfurmanek.pl/2020/04/04/data-encryption-in-s3-in-spark-in-emr-with-multiple-encryption-schemes/feed/ 0
Running Anaconda with DGL and mxnet on CUDA GPU in Spark running in EMR https://blog.adamfurmanek.pl/2020/03/28/running-anaconda-with-dgl-and-mxnet-on-cuda-gpu-in-spark-running-in-emr/ https://blog.adamfurmanek.pl/2020/03/28/running-anaconda-with-dgl-and-mxnet-on-cuda-gpu-in-spark-running-in-emr/#respond Sat, 28 Mar 2020 09:00:14 +0000 https://blog.adamfurmanek.pl/?p=3266 Continue reading Running Anaconda with DGL and mxnet on CUDA GPU in Spark running in EMR]]> Today I’m going to share my configuration for running custom Anaconda Python with DGL (Deep Graph Library) and mxnet library, with GPU support via CUDA, running in Spark hosted in EMR. Actually, I have Redshift configuration as well, with support for gensim, tensorflow, keras, theano, pygpu, and cloudpickle. You can also install more libraries if needed. All this for Google to index keywords. Let’s begin.

My configuration uses EMR 5.17.2 and CUDA 9.2. When I’m writing it, there is EMR 5.27 available but it comes with the same CUDA version so I presume it should work as well. I’m also using Python 3.7.

First, create a cluster. Do not select mxnet as a provided library in EMR, we will install it later. As a master node use p3.8xlarge instance type — this instance must have GPU and this is where we will run DGL and mxnet. For slaves you can use anything, I’m going with 19 r3.4xlarge nodes (they don’t have GPU).

We need to install some custom libraries. I am using bootstrap script for that but you can just SSH into the host manually and run this code:

sudo mkdir /mnt/usr-moved
sudo mv /usr/local /mnt/usr-moved/
sudo ln -s /mnt/usr-moved/local /usr/
sudo mv /usr/share /mnt/usr-moved/
sudo ln -s /mnt/usr-moved/share /usr/

sudo mkdir /mnt/home
sudo chown hadoop.hadoop /mnt/home

wget https://repo.anaconda.com/archive/Anaconda3-2019.03-Linux-x86_64.sh -O ~/anaconda.sh
bash ~/anaconda.sh -b -p /mnt/home/hadoop/anaconda
echo -e '\nexport PATH=/mnt/home/hadoop/anaconda/bin:$PATH' >> $HOME/.bashrc && source $HOME/.bashrc
echo -e '\nexport PYSPARK_PYTHON=/mnt/home/hadoop/anaconda/bin/python' >> $HOME/.bashrc && source $HOME/.bashrc

/mnt/home/hadoop/anaconda/bin/conda install -y gensim
/mnt/home/hadoop/anaconda/bin/conda install -y tensorflow
/mnt/home/hadoop/anaconda/bin/conda install -y keras
/mnt/home/hadoop/anaconda/bin/conda install -y theano
/mnt/home/hadoop/anaconda/bin/conda install -y pygpu
/mnt/home/hadoop/anaconda/bin/conda upgrade -y cloudpickle
yes | sudo /mnt/home/hadoop/anaconda/bin/pip install --pre mxnet-cu92mkl
/mnt/home/hadoop/anaconda/bin/conda install -y -c dglteam dgl-cuda9.2
/mnt/home/hadoop/anaconda/bin/conda install -y s3fs

First, I’m making a symlink to not fill the disk while installing packages. Then in line 10 I download Anaconda. Finally, lines 15-23 install some additional libraries. Notice that in line 21 I install mxnet compiled for CUDA 9.2, and in line 22 the same for DGL. Also, s3fs is required for nice reading from s3.

When this is done and cluster is created, I replace Python for Zeppelin interpreter to point to /mnt/home/hadoop/anaconda/bin/python and add Redshift configuration. I do this with the following command line (this you need to run manually after the cluster is created):

sudo /usr/lib/zeppelin/bin/install-interpreter.sh --name jdbc
cd /usr/lib/zeppelin/interpreter/jdbc/
sudo wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

sudo cat | sudo tee /etc/zeppelin/conf/interpreter.json <<'endmsg'
{
  "interpreterSettings": {
    "2ANGGHHMQ": {
      "id": "2ANGGHHMQ",
      "name": "spark",
      "group": "spark",
      "properties": {
        "zeppelin.spark.printREPLOutput": "true",
        "spark.yarn.jar": "",
        "master": "yarn-client",
        "zeppelin.spark.maxResult": "1000",
        "zeppelin.dep.localrepo": "/usr/lib/zeppelin/local-repo",
        "spark.app.name": "Zeppelin",
        "zeppelin.spark.importImplicit": "true",
        "zeppelin.spark.useHiveContext": "true",
        "args": "",
        "spark.home": "/usr/lib/spark",
        "zeppelin.spark.concurrentSQL": "false",
        "zeppelin.pyspark.python": "/mnt/home/hadoop/anaconda/bin/python"
      },
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "spark",
          "class": "org.apache.zeppelin.spark.SparkInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "scala",
            "editOnDblClick": false
          }
        },
        {
          "name": "pyspark",
          "class": "org.apache.zeppelin.spark.PySparkInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "python",
            "editOnDblClick": false
          }
        },
        {
          "name": "sql",
          "class": "org.apache.zeppelin.spark.SparkSqlInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "sql",
            "editOnDblClick": false
          }
        }
      ],
      "dependencies": [
        {
          "groupArtifactVersion": "/usr/lib/zeppelin/interpreter/jdbc/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar",
          "local": false
        }
      ],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
                "setPermission": false,
        "users": [],
        "isUserImpersonate": false
      }
    },
    "2AM1YV5CU": {
      "id": "2AM1YV5CU",
      "name": "angular",
      "group": "angular",
      "properties": {},
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "angular",
          "class": "org.apache.zeppelin.angular.AngularInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "editOnDblClick": true
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "isUserImpersonate": false
      }
    },
    "2BRWU4WXC": {
      "id": "2BRWU4WXC",
      "name": "python",
      "group": "python",
      "properties": {
        "zeppelin.python": "/mnt/home/hadoop/anaconda/bin/python",
        "zeppelin.python.maxResult": "1000"
      },
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "python",
          "class": "org.apache.zeppelin.python.PythonInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "python",
            "editOnDblClick": false
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "users": [],
        "isUserImpersonate": false
              }
    },
    "2AJXGMUUJ": {
      "id": "2AJXGMUUJ",
      "name": "md",
      "group": "md",
      "properties": {},
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "md",
          "class": "org.apache.zeppelin.markdown.Markdown",
          "defaultInterpreter": false,
          "editor": {
            "language": "markdown",
            "editOnDblClick": true
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "isUserImpersonate": false
      }
    },
    "2AKK3QQXU": {
      "id": "2AKK3QQXU",
      "name": "sh",
      "group": "sh",
      "properties": {
        "shell.command.timeout.millisecs": "60000"
      },
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "sh",
          "class": "org.apache.zeppelin.shell.ShellInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "sh",
            "editOnDblClick": false
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "isUserImpersonate": false
      }
    }
  },
  "interpreterBindings": {
    "2EMW16J14": [
      "2ANGGHHMQ",
      "2AJXGMUUJ",
      "2AM1YV5CU",
      "2AKK3QQXU",
      "2BRWU4WXC"
    ],
    "2A94M5J1Z": [
      "2ANGGHHMQ",
      "2AJXGMUUJ",
      "2AM1YV5CU",
      "2AKK3QQXU",
      "2BRWU4WXC"
    ]
  },
  "interpreterRepositories": [
    {
      "id": "central",
      "type": "default",
      "url": "http://repo1.maven.org/maven2/",
      "releasePolicy": {
      "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "snapshotPolicy": {
        "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "mirroredRepositories": [],
      "repositoryManager": false
    },
    {
      "id": "local",
      "type": "default",
      "url": "file:///var/lib/zeppelin/.m2/repository",
      "releasePolicy": {
        "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "snapshotPolicy": {
        "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "mirroredRepositories": [],
      "repositoryManager": false
    }
  ]
}
endmsg

Now, I need to tune default spark submit options:

sudo cat | sudo tee /etc/zeppelin/conf.dist/zeppelin-env.sh <<'endmsg'
export ZEPPELIN_PORT=8890
export ZEPPELIN_CONF_DIR=/etc/zeppelin/conf
export ZEPPELIN_LOG_DIR=/var/log/zeppelin
export ZEPPELIN_PID_DIR=/var/run/zeppelin
export ZEPPELIN_PID=$ZEPPELIN_PID_DIR/zeppelin.pid
export ZEPPELIN_WAR_TEMPDIR=/var/run/zeppelin/webapps
export ZEPPELIN_NOTEBOOK_DIR=/var/lib/zeppelin/notebook
export MASTER=yarn-client
export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf
export LD_LIBRARY_PATH=/usr/local/cuda/lib64/
export CLASSPATH=":/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar"


export SPARK_SUBMIT_OPTIONS="--jars=YOUR_JARS_HERE --conf spark.executor.extraClassPath=/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/* --conf spark.driver.extraClassPath=/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/* --conf 'spark.executorEnv.PYTHONPATH=/usr/lib/spark/python/lib/py4j-src.zip:/usr/lib/spark/python/:<CPS>{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-src.zip' --conf spark.yarn.isPython=true --conf spark.driver.maxResultSize=80G --num-executors 56 --executor-cores 5 --executor-memory 38G --driver-memory 90G --conf 'spark.dynamicAllocation.enabled=false' --conf 'spark.hadoop.fs.s3.canned.acl=BucketOwnerFullControl' --conf 'spark.hadoop.fs.s3.acl.default=BucketOwnerFullControl' --conf 'spark.hadoop.fs.s3.acl=bucket-owner-full-control' --conf 'spark.hadoop.fs.s3n.acl=bucket-owner-full-control' --conf 'spark.hadoop.fs.s3.cse.enabled=false'"
export ZEPPELIN_NOTEBOOK_STORAGE=org.apache.zeppelin.notebook.repo.S3NotebookRepo
endmsg

This is not the full content! I omit some of my internal settings so generally don’t copy it blindly, just extend the zeppelin-env.sh file as needed. Important things are:
export LD_LIBRARY_PATH=/usr/local/cuda/lib64/ — this points to CUDA libraries
--conf spark.driver.maxResultSize=80G --num-executors 56 --executor-cores 5 --executor-memory 38G --driver-memory 90G --conf 'spark.dynamicAllocation.enabled=false' — this configures executors and memory. You need to tune it for your cluster size.

Now, restart Zeppelin. You should now be able to run:

%spark.pyspark
print(mx.__version__)
print(np.__version__)

1.6.0
1.14.6

Now you can create GPU context:

ctx = mx.gpu(0)

and it should work as a charm.

So now you have power of Spark — you can easily distribute job and use all slaves. And also, you have GPU at your hand, so whenever you use ndarray from mxnet, it can use the GPU power.

If you don’t want to use GPU, then just install these libraries instead:

yes | sudo /mnt/home/hadoop/anaconda/bin/pip install --pre mxnet
/mnt/home/hadoop/anaconda/bin/conda install -y -c dglteam dgl

and use mx.cpu() context. This works as well, obviously, much slower. For my use case GPU calculations were 80 times faster than when running on CPU.

]]>
https://blog.adamfurmanek.pl/2020/03/28/running-anaconda-with-dgl-and-mxnet-on-cuda-gpu-in-spark-running-in-emr/feed/ 0
Running any query in Redshift or JDBC from Spark in EMR https://blog.adamfurmanek.pl/2020/03/21/running-any-query-in-redshift-or-jdbc-from-spark-in-emr/ https://blog.adamfurmanek.pl/2020/03/21/running-any-query-in-redshift-or-jdbc-from-spark-in-emr/#respond Sat, 21 Mar 2020 09:00:20 +0000 https://blog.adamfurmanek.pl/?p=3264 Continue reading Running any query in Redshift or JDBC from Spark in EMR]]> Last time we saw how to connect to Redshift from Spark running in EMR. Provided solution was nice but allowed for reading data only. Sometimes we might want to run any DDL or DML query, not only simple read statements.

To do that, we need to connect to Redshift directly over JDBC. I assume you configured your cluster the same way as in the previous part. Now use this code:

def executeNonQuery(query: String) = {
    import java.sql._
    import java.util._
    
    var connectionProps = new Properties();

    var connection = DriverManager.getConnection(getConnectionString(), connectionProps);
    var statement = connection.createStatement(query);
    statement.executeUpdate();
}

We first import packages for JDBC. Next, we create new properties for the connection which can be empty. Then, we open the connection using JDBC infrastructure, prepare the query and execute it. Please remember that this query must return row count so it shouldn’t be SELECT query.

We can use it like this:

%spark
executeNonQuery(s"""DROP TABLE IF EXISTS table""")

Query is pretty much anything you can run in SQL Workbench. It works with temporary tables as well (unlike prepareStatement).

]]>
https://blog.adamfurmanek.pl/2020/03/21/running-any-query-in-redshift-or-jdbc-from-spark-in-emr/feed/ 0
Connecting to Redshift from Spark running in EMR https://blog.adamfurmanek.pl/2020/03/14/connecting-to-redshift-from-spark-running-in-emr/ https://blog.adamfurmanek.pl/2020/03/14/connecting-to-redshift-from-spark-running-in-emr/#respond Sat, 14 Mar 2020 09:00:22 +0000 https://blog.adamfurmanek.pl/?p=3261 Continue reading Connecting to Redshift from Spark running in EMR]]> Today I’ll share my configuration for Spark running in EMR to connect to Redshift cluster. First, I assume the cluster is accessible (so configure virtual subnet, allowed IPs and all network stuff before running this).

I’m using Zeppelin so I’ll show two interpreters configured for the connection, but the same thing should work with standalone job (as long as it has the same libraries configured). I tested things with EMR 5.17.2 but it should work with other versions as well.

Redshift interpreter

First, let’s configure separate interpreter to use in Zeppelin. SSH into the master node of the cluster and install JDBC interpreter:

sudo /usr/lib/zeppelin/bin/install-interpreter.sh --name jdbc

Next, download the driver:

cd /usr/lib/zeppelin/interpreter/jdbc/
sudo wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

Restart zeppelin:

sudo stop zeppelin
sudo start zeppelin

Go to interpreters configuration in Zeppelin and add new JDBC named redshift. Use the following settings:

default.driver	com.amazon.redshift.jdbc42.Driver
default.url	jdbc:redshift://your-redshift-instance-address.redshift.amazonaws.com:5439/your-database
default.user	redshift_user
default.password	redshift_password

Now create new paragraph like below:

%redshift
SELECT * FROM table

And it should work.

Spark interpreter

Download driver the same way as before. Now, go to interpreter settings and add dependency to Spark interpreter:

/usr/lib/zeppelin/interpreter/jdbc/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

Now you can start reading data like this:

%spark
import org.apache.spark.sql._

def getConnectionString() = {
    val url = "cluster url"
    val port = 8192
    val database = "database name"
    val user = "user"
    val password = "password"
    
    s"jdbc:redshift://${url}:$port/$database?user=$user&password=$password"
}

def runQuery(query: String) = {
    val df: DataFrame = sqlContext.read
      .format("jdbc")
      .option("driver", "com.amazon.redshift.jdbc42.Driver")
      .option("url", getConnectionString())
      .option("dbtable", s"($query) tmp")
      .load()
    
    df
}

var table = runQuery(s"""
SELECT * FROM Table AS t
""")

This is even nicer because you can use string interpolation to provide parameters for queries.

]]>
https://blog.adamfurmanek.pl/2020/03/14/connecting-to-redshift-from-spark-running-in-emr/feed/ 0
Spark and NegativeArraySizeException https://blog.adamfurmanek.pl/2019/06/22/spark-and-negativearraysizeexception/ https://blog.adamfurmanek.pl/2019/06/22/spark-and-negativearraysizeexception/#respond Sat, 22 Jun 2019 08:00:44 +0000 https://blog.adamfurmanek.pl/?p=2905 Recently I was debugging the following crash in Spark:

java.lang.NegativeArraySizeException
	at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:447)
	at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:245)
	at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:239)
	at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:135)
	at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:246)
	at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:239)
	at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:135)
	at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:41)
	at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:658)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:623)
	at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
	at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
	at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29)
	at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
	at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27)
	at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
	at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:207)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:268)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:268)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
	at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:269)
	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1411)

Disabling Kryo solves the issue. To do that just set spark.serializer to org.apache.spark.serializer.JavaSerializer.
Other workaround is to change Kryo’s reference management, as explained on Github:

Kryo kryo = new Kryo();
kryo.setReferences(false);

]]>
https://blog.adamfurmanek.pl/2019/06/22/spark-and-negativearraysizeexception/feed/ 0
Spark and NullPointerException in UTF8String.contains https://blog.adamfurmanek.pl/2019/06/15/spark-and-nullpointerexception-in-utf8string-contains/ https://blog.adamfurmanek.pl/2019/06/15/spark-and-nullpointerexception-in-utf8string-contains/#respond Sat, 15 Jun 2019 08:00:51 +0000 https://blog.adamfurmanek.pl/?p=2903 Continue reading Spark and NullPointerException in UTF8String.contains]]> Recently I was debugging a NullPointerException in Spark. The stacktrace was indicating this:

java.lang.NullPointerException
	at org.apache.spark.unsafe.types.UTF8String.contains(UTF8String.java:284)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

After some digging I found out that the following query causes the problem:

df1
  .join(df2,
	df1("id1") === df2("id2")
	  && !isnull(df1("ref"))
	  && !isnull(df2("ref"))
	  && df2("ref").contains(df1("ref")) // <--- this is the problem
	, "left_outer"
  )
  .drop("id2")

If I commented out the line with the comment the NPE was no longer there. Also, when I replaced either df2("ref") or df1("ref") with lit("ref") it was not crashing as well so there was something wrong with the contains running on two dataframes.

In my case removing the cache helped — I was caching df2 with cache() method before running the join. When I removed the caching the problem disappeared. Spark version 2.1.0 with EMR 5.5.3.

]]>
https://blog.adamfurmanek.pl/2019/06/15/spark-and-nullpointerexception-in-utf8string-contains/feed/ 0
Machine Learning Part 1 — Linear regression in MXNet https://blog.adamfurmanek.pl/2018/10/20/machine-learning-part-1/ https://blog.adamfurmanek.pl/2018/10/20/machine-learning-part-1/#comments Sat, 20 Oct 2018 08:00:13 +0000 https://blog.adamfurmanek.pl/?p=2629 Continue reading Machine Learning Part 1 — Linear regression in MXNet]]>

This is the first part of the Machine Learning series. For your convenience you can find other parts using the links below (or by guessing the address):
Part 1 — Linear regression in MXNet
Part 2 — Linear regression in SQL
Part 3 — Linear regression in SQL revisited
Part 4 — Linear regression in T-SQL
Part 5 — Linear regression
Part 6 — Matrix multiplication in SQL
Part 7 — Forward propagation in neural net in SQL
Part 8 — Backpropagation in neural net in SQL

In this series I assume you do know basics of machine learning. I will provide some source code for different use cases but no extensive explanation. Let’s go.

Today we will take a look at linear regression in MXNet. We will predict sepal length in well know iris dataset.

I assume you have the dataset uploaded to s3. Let’s go with loading the dataset:

from mxnet import nd, autograd
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

local_file="/tmp/Iris.csv"
df = pd.read_csv("/blabla/" + local_file, delimiter=',', header = 0)

print df.shape

We can see some records with print df.head(3) or check different iris categories with df.iris.unique().

We have one target variable and four features. Let’s create two adttional:

df['i_setosa'] = 0
df.loc[(df['iris']=='setosa'), 'i_setosa']= 1
df['i_versicolor'] = 0
df.loc[(df['iris']=='versicolor'), 'i_versicolor']= 1

Two features similar to one hot encoding of categorical feature.

Time to prepare training and test datasets with: df_train, df_test = train_test_split( df, test_size=0.3, random_state=1)

Let’s get down to training. We start with defining the training variables and the target one:

independent_var = ['sepal_width','petal_length','petal_width','i_setosa','i_versicolor']
y_train = nd.array(df_train['sepal_length'])
X_train = nd.array(df_train[independent_var]) 
y_test = nd.array(df_test['sepal_length'])
X_test = nd.array(df_test[independent_var])

Let’s prepare class representing data instance:

class data:
    def __init__(self,X,y):
        self.X = nd.array(X) 
        self.y = nd.array(y)
        cols = X.shape[1]
        self.initialize_parameter(cols)

    def initialize_parameter(self,cols):
        self.w = nd.random.normal(shape = [cols, 1])
        self.b = nd.random.normal(shape = 1)
        self.params = [self.w, self.b]

        for x in self.params:
            x.attach_grad()

We initialize parameters and attach gradient calculation. This is a very nice feature, we don’t need to take care of derivatives, everything is taken care for us.

Let’s now carry on with a single step for gradient:

class optimizer:
    def __init__(self):
        pass
    
    def GD(self,data_instance,lr):
        for x in data_instance.params:
            x[:] = x - x.grad * lr

We just subtract gradient multiplied by learning rate. Also, we use x[:] instead of x to avoid reinitializing the gradient. If we go with the latter, we will see the following error:

Check failed: !AGInfo::IsNone(*i) Cannot differentiate node because it is not in a computational graph. You need to set is_recording to true or use autograd.record() to save computational graphs for backward. If you want to differentiate the same graph twice, you need to pass retain_graph=True to backward.

Now, let’s train our model:

def main():
    # Modeling parameters
    learning_rate = 1e-2
    num_iters = 100
    
    data_instance = data(X_train,y_train) 

    opt = optimizer()
    gd = optimizer.GD
    
    loss_sequence = []
    
    for iteration in range(num_iters):
        with autograd.record():
            loss = nd.mean((nd.dot(X_train, data_instance.w) + data_instance.b - y_train)**2)
            
        loss.backward()
        gd(opt, data_instance, learning_rate)
        
        print ("iteration %s, Mean loss: %s" % (iteration,loss))
        loss_sequence.append(loss.asscalar())
        
    plt.figure(num=None,figsize=(8, 6))
    plt.plot(loss_sequence)
    plt.xlabel('iteration',fontsize=14)
    plt.ylabel('Mean loss',fontsize=14)

We should get the following:

Training - mean

Note that our loss uses mean whereas we could calculate just a sum. However, due to overflow problems we would get the following:

Training - sum

Finally, let’s check the performance of trained model:

MSE = nd.mean(((nd.dot(X_test, data_instance.w) + data_instance.b) - y_test)**2)
print ("Mean Squared Error on Test Set: %s" % (MSE))

Done.

Summary

We can see that linear regression is pretty concise and easy. However, this uses Python and Spark which me might want to avoid. In next parts we will take a look at different solutions.

]]>
https://blog.adamfurmanek.pl/2018/10/20/machine-learning-part-1/feed/ 4
Random notes from crashing and hanging EMR Spark job https://blog.adamfurmanek.pl/2018/09/01/random-notes-from-crashing-and-hanging-emr-spark-job/ https://blog.adamfurmanek.pl/2018/09/01/random-notes-from-crashing-and-hanging-emr-spark-job/#respond Sat, 01 Sep 2018 08:00:41 +0000 https://blog.adamfurmanek.pl/?p=2591 Continue reading Random notes from crashing and hanging EMR Spark job]]> It sometimes happens that your EMR job crashes or hangs indefinitely with no meaningful log. You can try to capture memory dump but it is not very useful when your cluster machines have hundreds gigabytes of memory each. Below are “fixes” which worked for me.

  • If it just crashes with lost slave or lost task, make sure that you are not running out of memory. Especially when you broadcast variables
  • Disable Kryo. In my case it caused application crashes. No clue what was wrong but default Java serializer didn’t have this problem. What’s more, Kryo was slower
  • Use one core per executor by changing spark.executor.cores, it helps when the job hangs in the middle
  • Execute System.exit(0) at the end when your job is done, sometimes the step doesn’t terminate even though everything is done
  • Do not use cache or persist
  • If you overwrite files in S3, make sure that you remove them early. It looks like sometimes you can error that file already exists even though you removed it
  • Catch Throwable at the top of your job. I know that it is a bad idea generally but otherwise you may not get any logs when you get OutOfMemoryError
]]>
https://blog.adamfurmanek.pl/2018/09/01/random-notes-from-crashing-and-hanging-emr-spark-job/feed/ 0
Investigating AWS SDK conflicts in EMR https://blog.adamfurmanek.pl/2018/08/25/investigating-aws-sdk-conflicts-in-emr/ https://blog.adamfurmanek.pl/2018/08/25/investigating-aws-sdk-conflicts-in-emr/#respond Sat, 25 Aug 2018 08:00:03 +0000 https://blog.adamfurmanek.pl/?p=2589 Continue reading Investigating AWS SDK conflicts in EMR]]> When you deploy your package to Amazon Elastic Map Reduce (EMR), you can access the AWS SDK provided by the platform. This gets tricky if you compile your code against different version of SDK because then you may get very cryptic bugs in runtime, like class not found or method not existing. You should always check EMR changelog to see if the SDK version has changed.

But what to do if you have this problem? How to debug it?

First, make sure that your jars are on the classpath. You can add them using spark.driver.extraClassPath (similar for executor) and spark.driver.userClassPathFirst parameters.

Next, verify that your classes are actually loaded. Just add -verbose:class when submitting the job. Next, see the logs and check if the class you want to use is loaded and where it comes from.

Now you should probably see the reason why a method is not found. Typically it is not there (because of SDK version change) or it has slightly different signature. If you still have no clue what’s going on, make sure that you are using right classloader.

]]>
https://blog.adamfurmanek.pl/2018/08/25/investigating-aws-sdk-conflicts-in-emr/feed/ 0