Today I’m going to share my configuration for running custom Anaconda Python with DGL (Deep Graph Library) and mxnet library, with GPU support via CUDA, running in Spark hosted in EMR. Actually, I have Redshift configuration as well, with support for gensim, tensorflow, keras, theano, pygpu, and cloudpickle. You can also install more libraries if needed. All this for Google to index keywords. Let’s begin.
My configuration uses EMR 5.17.2 and CUDA 9.2. When I’m writing it, there is EMR 5.27 available but it comes with the same CUDA version so I presume it should work as well. I’m also using Python 3.7.
First, create a cluster. Do not select mxnet
as a provided library in EMR, we will install it later. As a master node use p3.8xlarge instance type — this instance must have GPU and this is where we will run DGL and mxnet. For slaves you can use anything, I’m going with 19 r3.4xlarge nodes (they don’t have GPU).
We need to install some custom libraries. I am using bootstrap script for that but you can just SSH into the host manually and run this code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
sudo mkdir /mnt/usr-moved sudo mv /usr/local /mnt/usr-moved/ sudo ln -s /mnt/usr-moved/local /usr/ sudo mv /usr/share /mnt/usr-moved/ sudo ln -s /mnt/usr-moved/share /usr/ sudo mkdir /mnt/home sudo chown hadoop.hadoop /mnt/home wget https://repo.anaconda.com/archive/Anaconda3-2019.03-Linux-x86_64.sh -O ~/anaconda.sh bash ~/anaconda.sh -b -p /mnt/home/hadoop/anaconda echo -e '\nexport PATH=/mnt/home/hadoop/anaconda/bin:$PATH' >> $HOME/.bashrc && source $HOME/.bashrc echo -e '\nexport PYSPARK_PYTHON=/mnt/home/hadoop/anaconda/bin/python' >> $HOME/.bashrc && source $HOME/.bashrc /mnt/home/hadoop/anaconda/bin/conda install -y gensim /mnt/home/hadoop/anaconda/bin/conda install -y tensorflow /mnt/home/hadoop/anaconda/bin/conda install -y keras /mnt/home/hadoop/anaconda/bin/conda install -y theano /mnt/home/hadoop/anaconda/bin/conda install -y pygpu /mnt/home/hadoop/anaconda/bin/conda upgrade -y cloudpickle yes | sudo /mnt/home/hadoop/anaconda/bin/pip install --pre mxnet-cu92mkl /mnt/home/hadoop/anaconda/bin/conda install -y -c dglteam dgl-cuda9.2 /mnt/home/hadoop/anaconda/bin/conda install -y s3fs |
First, I’m making a symlink to not fill the disk while installing packages. Then in line 10 I download Anaconda. Finally, lines 15-23 install some additional libraries. Notice that in line 21 I install mxnet compiled for CUDA 9.2, and in line 22 the same for DGL. Also, s3fs is required for nice reading from s3.
When this is done and cluster is created, I replace Python for Zeppelin interpreter to point to /mnt/home/hadoop/anaconda/bin/python
and add Redshift configuration. I do this with the following command line (this you need to run manually after the cluster is created):
1 2 3 |
sudo /usr/lib/zeppelin/bin/install-interpreter.sh --name jdbc cd /usr/lib/zeppelin/interpreter/jdbc/ sudo wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
sudo cat | sudo tee /etc/zeppelin/conf/interpreter.json <<'endmsg' { "interpreterSettings": { "2ANGGHHMQ": { "id": "2ANGGHHMQ", "name": "spark", "group": "spark", "properties": { "zeppelin.spark.printREPLOutput": "true", "spark.yarn.jar": "", "master": "yarn-client", "zeppelin.spark.maxResult": "1000", "zeppelin.dep.localrepo": "/usr/lib/zeppelin/local-repo", "spark.app.name": "Zeppelin", "zeppelin.spark.importImplicit": "true", "zeppelin.spark.useHiveContext": "true", "args": "", "spark.home": "/usr/lib/spark", "zeppelin.spark.concurrentSQL": "false", "zeppelin.pyspark.python": "/mnt/home/hadoop/anaconda/bin/python" }, "status": "READY", "interpreterGroup": [ { "name": "spark", "class": "org.apache.zeppelin.spark.SparkInterpreter", "defaultInterpreter": false, "editor": { "language": "scala", "editOnDblClick": false } }, { "name": "pyspark", "class": "org.apache.zeppelin.spark.PySparkInterpreter", "defaultInterpreter": false, "editor": { "language": "python", "editOnDblClick": false } }, { "name": "sql", "class": "org.apache.zeppelin.spark.SparkSqlInterpreter", "defaultInterpreter": false, "editor": { "language": "sql", "editOnDblClick": false } } ], "dependencies": [ { "groupArtifactVersion": "/usr/lib/zeppelin/interpreter/jdbc/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar", "local": false } ], "option": { "remote": true, "port": -1, "perNote": "shared", "perUser": "shared", "isExistingProcess": false, "setPermission": false, "users": [], "isUserImpersonate": false } }, "2AM1YV5CU": { "id": "2AM1YV5CU", "name": "angular", "group": "angular", "properties": {}, "status": "READY", "interpreterGroup": [ { "name": "angular", "class": "org.apache.zeppelin.angular.AngularInterpreter", "defaultInterpreter": false, "editor": { "editOnDblClick": true } } ], "dependencies": [], "option": { "remote": true, "port": -1, "perNote": "shared", "perUser": "shared", "isExistingProcess": false, "setPermission": false, "isUserImpersonate": false } }, "2BRWU4WXC": { "id": "2BRWU4WXC", "name": "python", "group": "python", "properties": { "zeppelin.python": "/mnt/home/hadoop/anaconda/bin/python", "zeppelin.python.maxResult": "1000" }, "status": "READY", "interpreterGroup": [ { "name": "python", "class": "org.apache.zeppelin.python.PythonInterpreter", "defaultInterpreter": false, "editor": { "language": "python", "editOnDblClick": false } } ], "dependencies": [], "option": { "remote": true, "port": -1, "perNote": "shared", "perUser": "shared", "isExistingProcess": false, "setPermission": false, "users": [], "isUserImpersonate": false } }, "2AJXGMUUJ": { "id": "2AJXGMUUJ", "name": "md", "group": "md", "properties": {}, "status": "READY", "interpreterGroup": [ { "name": "md", "class": "org.apache.zeppelin.markdown.Markdown", "defaultInterpreter": false, "editor": { "language": "markdown", "editOnDblClick": true } } ], "dependencies": [], "option": { "remote": true, "port": -1, "perNote": "shared", "perUser": "shared", "isExistingProcess": false, "setPermission": false, "isUserImpersonate": false } }, "2AKK3QQXU": { "id": "2AKK3QQXU", "name": "sh", "group": "sh", "properties": { "shell.command.timeout.millisecs": "60000" }, "status": "READY", "interpreterGroup": [ { "name": "sh", "class": "org.apache.zeppelin.shell.ShellInterpreter", "defaultInterpreter": false, "editor": { "language": "sh", "editOnDblClick": false } } ], "dependencies": [], "option": { "remote": true, "port": -1, "perNote": "shared", "perUser": "shared", "isExistingProcess": false, "setPermission": false, "isUserImpersonate": false } } }, "interpreterBindings": { "2EMW16J14": [ "2ANGGHHMQ", "2AJXGMUUJ", "2AM1YV5CU", "2AKK3QQXU", "2BRWU4WXC" ], "2A94M5J1Z": [ "2ANGGHHMQ", "2AJXGMUUJ", "2AM1YV5CU", "2AKK3QQXU", "2BRWU4WXC" ] }, "interpreterRepositories": [ { "id": "central", "type": "default", "url": "http://repo1.maven.org/maven2/", "releasePolicy": { "enabled": true, "updatePolicy": "daily", "checksumPolicy": "warn" }, "snapshotPolicy": { "enabled": true, "updatePolicy": "daily", "checksumPolicy": "warn" }, "mirroredRepositories": [], "repositoryManager": false }, { "id": "local", "type": "default", "url": "file:///var/lib/zeppelin/.m2/repository", "releasePolicy": { "enabled": true, "updatePolicy": "daily", "checksumPolicy": "warn" }, "snapshotPolicy": { "enabled": true, "updatePolicy": "daily", "checksumPolicy": "warn" }, "mirroredRepositories": [], "repositoryManager": false } ] } endmsg |
Now, I need to tune default spark submit options:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
sudo cat | sudo tee /etc/zeppelin/conf.dist/zeppelin-env.sh <<'endmsg' export ZEPPELIN_PORT=8890 export ZEPPELIN_CONF_DIR=/etc/zeppelin/conf export ZEPPELIN_LOG_DIR=/var/log/zeppelin export ZEPPELIN_PID_DIR=/var/run/zeppelin export ZEPPELIN_PID=$ZEPPELIN_PID_DIR/zeppelin.pid export ZEPPELIN_WAR_TEMPDIR=/var/run/zeppelin/webapps export ZEPPELIN_NOTEBOOK_DIR=/var/lib/zeppelin/notebook export MASTER=yarn-client export SPARK_HOME=/usr/lib/spark export HADOOP_CONF_DIR=/etc/hadoop/conf export LD_LIBRARY_PATH=/usr/local/cuda/lib64/ export CLASSPATH=":/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar" export SPARK_SUBMIT_OPTIONS="--jars=YOUR_JARS_HERE --conf spark.executor.extraClassPath=/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/* --conf spark.driver.extraClassPath=/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/* --conf 'spark.executorEnv.PYTHONPATH=/usr/lib/spark/python/lib/py4j-src.zip:/usr/lib/spark/python/:<CPS>{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-src.zip' --conf spark.yarn.isPython=true --conf spark.driver.maxResultSize=80G --num-executors 56 --executor-cores 5 --executor-memory 38G --driver-memory 90G --conf 'spark.dynamicAllocation.enabled=false' --conf 'spark.hadoop.fs.s3.canned.acl=BucketOwnerFullControl' --conf 'spark.hadoop.fs.s3.acl.default=BucketOwnerFullControl' --conf 'spark.hadoop.fs.s3.acl=bucket-owner-full-control' --conf 'spark.hadoop.fs.s3n.acl=bucket-owner-full-control' --conf 'spark.hadoop.fs.s3.cse.enabled=false'" export ZEPPELIN_NOTEBOOK_STORAGE=org.apache.zeppelin.notebook.repo.S3NotebookRepo endmsg |
This is not the full content! I omit some of my internal settings so generally don’t copy it blindly, just extend the zeppelin-env.sh
file as needed. Important things are:
export LD_LIBRARY_PATH=/usr/local/cuda/lib64/
— this points to CUDA libraries
--conf spark.driver.maxResultSize=80G --num-executors 56 --executor-cores 5 --executor-memory 38G --driver-memory 90G --conf 'spark.dynamicAllocation.enabled=false'
— this configures executors and memory. You need to tune it for your cluster size.
Now, restart Zeppelin. You should now be able to run:
1 2 3 4 5 6 7 |
%spark.pyspark print(mx.__version__) print(np.__version__) 1.6.0 1.14.6 |
Now you can create GPU context:
1 |
ctx = mx.gpu(0) |
and it should work as a charm.
So now you have power of Spark — you can easily distribute job and use all slaves. And also, you have GPU at your hand, so whenever you use ndarray
from mxnet, it can use the GPU power.
If you don’t want to use GPU, then just install these libraries instead:
1 2 |
yes | sudo /mnt/home/hadoop/anaconda/bin/pip install --pre mxnet /mnt/home/hadoop/anaconda/bin/conda install -y -c dglteam dgl |
and use mx.cpu()
context. This works as well, obviously, much slower. For my use case GPU calculations were 80 times faster than when running on CPU.