Redshift – Random IT Utensils https://blog.adamfurmanek.pl IT, operating systems, maths, and more. Thu, 19 Mar 2020 00:51:49 +0000 en-US hourly 1 https://wordpress.org/?v=6.6.2 Running Anaconda with DGL and mxnet on CUDA GPU in Spark running in EMR https://blog.adamfurmanek.pl/2020/03/28/running-anaconda-with-dgl-and-mxnet-on-cuda-gpu-in-spark-running-in-emr/ https://blog.adamfurmanek.pl/2020/03/28/running-anaconda-with-dgl-and-mxnet-on-cuda-gpu-in-spark-running-in-emr/#respond Sat, 28 Mar 2020 09:00:14 +0000 https://blog.adamfurmanek.pl/?p=3266 Continue reading Running Anaconda with DGL and mxnet on CUDA GPU in Spark running in EMR]]> Today I’m going to share my configuration for running custom Anaconda Python with DGL (Deep Graph Library) and mxnet library, with GPU support via CUDA, running in Spark hosted in EMR. Actually, I have Redshift configuration as well, with support for gensim, tensorflow, keras, theano, pygpu, and cloudpickle. You can also install more libraries if needed. All this for Google to index keywords. Let’s begin.

My configuration uses EMR 5.17.2 and CUDA 9.2. When I’m writing it, there is EMR 5.27 available but it comes with the same CUDA version so I presume it should work as well. I’m also using Python 3.7.

First, create a cluster. Do not select mxnet as a provided library in EMR, we will install it later. As a master node use p3.8xlarge instance type — this instance must have GPU and this is where we will run DGL and mxnet. For slaves you can use anything, I’m going with 19 r3.4xlarge nodes (they don’t have GPU).

We need to install some custom libraries. I am using bootstrap script for that but you can just SSH into the host manually and run this code:

sudo mkdir /mnt/usr-moved
sudo mv /usr/local /mnt/usr-moved/
sudo ln -s /mnt/usr-moved/local /usr/
sudo mv /usr/share /mnt/usr-moved/
sudo ln -s /mnt/usr-moved/share /usr/

sudo mkdir /mnt/home
sudo chown hadoop.hadoop /mnt/home

wget https://repo.anaconda.com/archive/Anaconda3-2019.03-Linux-x86_64.sh -O ~/anaconda.sh
bash ~/anaconda.sh -b -p /mnt/home/hadoop/anaconda
echo -e '\nexport PATH=/mnt/home/hadoop/anaconda/bin:$PATH' >> $HOME/.bashrc && source $HOME/.bashrc
echo -e '\nexport PYSPARK_PYTHON=/mnt/home/hadoop/anaconda/bin/python' >> $HOME/.bashrc && source $HOME/.bashrc

/mnt/home/hadoop/anaconda/bin/conda install -y gensim
/mnt/home/hadoop/anaconda/bin/conda install -y tensorflow
/mnt/home/hadoop/anaconda/bin/conda install -y keras
/mnt/home/hadoop/anaconda/bin/conda install -y theano
/mnt/home/hadoop/anaconda/bin/conda install -y pygpu
/mnt/home/hadoop/anaconda/bin/conda upgrade -y cloudpickle
yes | sudo /mnt/home/hadoop/anaconda/bin/pip install --pre mxnet-cu92mkl
/mnt/home/hadoop/anaconda/bin/conda install -y -c dglteam dgl-cuda9.2
/mnt/home/hadoop/anaconda/bin/conda install -y s3fs

First, I’m making a symlink to not fill the disk while installing packages. Then in line 10 I download Anaconda. Finally, lines 15-23 install some additional libraries. Notice that in line 21 I install mxnet compiled for CUDA 9.2, and in line 22 the same for DGL. Also, s3fs is required for nice reading from s3.

When this is done and cluster is created, I replace Python for Zeppelin interpreter to point to /mnt/home/hadoop/anaconda/bin/python and add Redshift configuration. I do this with the following command line (this you need to run manually after the cluster is created):

sudo /usr/lib/zeppelin/bin/install-interpreter.sh --name jdbc
cd /usr/lib/zeppelin/interpreter/jdbc/
sudo wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

sudo cat | sudo tee /etc/zeppelin/conf/interpreter.json <<'endmsg'
{
  "interpreterSettings": {
    "2ANGGHHMQ": {
      "id": "2ANGGHHMQ",
      "name": "spark",
      "group": "spark",
      "properties": {
        "zeppelin.spark.printREPLOutput": "true",
        "spark.yarn.jar": "",
        "master": "yarn-client",
        "zeppelin.spark.maxResult": "1000",
        "zeppelin.dep.localrepo": "/usr/lib/zeppelin/local-repo",
        "spark.app.name": "Zeppelin",
        "zeppelin.spark.importImplicit": "true",
        "zeppelin.spark.useHiveContext": "true",
        "args": "",
        "spark.home": "/usr/lib/spark",
        "zeppelin.spark.concurrentSQL": "false",
        "zeppelin.pyspark.python": "/mnt/home/hadoop/anaconda/bin/python"
      },
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "spark",
          "class": "org.apache.zeppelin.spark.SparkInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "scala",
            "editOnDblClick": false
          }
        },
        {
          "name": "pyspark",
          "class": "org.apache.zeppelin.spark.PySparkInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "python",
            "editOnDblClick": false
          }
        },
        {
          "name": "sql",
          "class": "org.apache.zeppelin.spark.SparkSqlInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "sql",
            "editOnDblClick": false
          }
        }
      ],
      "dependencies": [
        {
          "groupArtifactVersion": "/usr/lib/zeppelin/interpreter/jdbc/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar",
          "local": false
        }
      ],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
                "setPermission": false,
        "users": [],
        "isUserImpersonate": false
      }
    },
    "2AM1YV5CU": {
      "id": "2AM1YV5CU",
      "name": "angular",
      "group": "angular",
      "properties": {},
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "angular",
          "class": "org.apache.zeppelin.angular.AngularInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "editOnDblClick": true
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "isUserImpersonate": false
      }
    },
    "2BRWU4WXC": {
      "id": "2BRWU4WXC",
      "name": "python",
      "group": "python",
      "properties": {
        "zeppelin.python": "/mnt/home/hadoop/anaconda/bin/python",
        "zeppelin.python.maxResult": "1000"
      },
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "python",
          "class": "org.apache.zeppelin.python.PythonInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "python",
            "editOnDblClick": false
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "users": [],
        "isUserImpersonate": false
              }
    },
    "2AJXGMUUJ": {
      "id": "2AJXGMUUJ",
      "name": "md",
      "group": "md",
      "properties": {},
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "md",
          "class": "org.apache.zeppelin.markdown.Markdown",
          "defaultInterpreter": false,
          "editor": {
            "language": "markdown",
            "editOnDblClick": true
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "isUserImpersonate": false
      }
    },
    "2AKK3QQXU": {
      "id": "2AKK3QQXU",
      "name": "sh",
      "group": "sh",
      "properties": {
        "shell.command.timeout.millisecs": "60000"
      },
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "sh",
          "class": "org.apache.zeppelin.shell.ShellInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "sh",
            "editOnDblClick": false
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "isUserImpersonate": false
      }
    }
  },
  "interpreterBindings": {
    "2EMW16J14": [
      "2ANGGHHMQ",
      "2AJXGMUUJ",
      "2AM1YV5CU",
      "2AKK3QQXU",
      "2BRWU4WXC"
    ],
    "2A94M5J1Z": [
      "2ANGGHHMQ",
      "2AJXGMUUJ",
      "2AM1YV5CU",
      "2AKK3QQXU",
      "2BRWU4WXC"
    ]
  },
  "interpreterRepositories": [
    {
      "id": "central",
      "type": "default",
      "url": "http://repo1.maven.org/maven2/",
      "releasePolicy": {
      "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "snapshotPolicy": {
        "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "mirroredRepositories": [],
      "repositoryManager": false
    },
    {
      "id": "local",
      "type": "default",
      "url": "file:///var/lib/zeppelin/.m2/repository",
      "releasePolicy": {
        "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "snapshotPolicy": {
        "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "mirroredRepositories": [],
      "repositoryManager": false
    }
  ]
}
endmsg

Now, I need to tune default spark submit options:

sudo cat | sudo tee /etc/zeppelin/conf.dist/zeppelin-env.sh <<'endmsg'
export ZEPPELIN_PORT=8890
export ZEPPELIN_CONF_DIR=/etc/zeppelin/conf
export ZEPPELIN_LOG_DIR=/var/log/zeppelin
export ZEPPELIN_PID_DIR=/var/run/zeppelin
export ZEPPELIN_PID=$ZEPPELIN_PID_DIR/zeppelin.pid
export ZEPPELIN_WAR_TEMPDIR=/var/run/zeppelin/webapps
export ZEPPELIN_NOTEBOOK_DIR=/var/lib/zeppelin/notebook
export MASTER=yarn-client
export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf
export LD_LIBRARY_PATH=/usr/local/cuda/lib64/
export CLASSPATH=":/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar"


export SPARK_SUBMIT_OPTIONS="--jars=YOUR_JARS_HERE --conf spark.executor.extraClassPath=/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/* --conf spark.driver.extraClassPath=/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/* --conf 'spark.executorEnv.PYTHONPATH=/usr/lib/spark/python/lib/py4j-src.zip:/usr/lib/spark/python/:<CPS>{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-src.zip' --conf spark.yarn.isPython=true --conf spark.driver.maxResultSize=80G --num-executors 56 --executor-cores 5 --executor-memory 38G --driver-memory 90G --conf 'spark.dynamicAllocation.enabled=false' --conf 'spark.hadoop.fs.s3.canned.acl=BucketOwnerFullControl' --conf 'spark.hadoop.fs.s3.acl.default=BucketOwnerFullControl' --conf 'spark.hadoop.fs.s3.acl=bucket-owner-full-control' --conf 'spark.hadoop.fs.s3n.acl=bucket-owner-full-control' --conf 'spark.hadoop.fs.s3.cse.enabled=false'"
export ZEPPELIN_NOTEBOOK_STORAGE=org.apache.zeppelin.notebook.repo.S3NotebookRepo
endmsg

This is not the full content! I omit some of my internal settings so generally don’t copy it blindly, just extend the zeppelin-env.sh file as needed. Important things are:
export LD_LIBRARY_PATH=/usr/local/cuda/lib64/ — this points to CUDA libraries
--conf spark.driver.maxResultSize=80G --num-executors 56 --executor-cores 5 --executor-memory 38G --driver-memory 90G --conf 'spark.dynamicAllocation.enabled=false' — this configures executors and memory. You need to tune it for your cluster size.

Now, restart Zeppelin. You should now be able to run:

%spark.pyspark
print(mx.__version__)
print(np.__version__)

1.6.0
1.14.6

Now you can create GPU context:

ctx = mx.gpu(0)

and it should work as a charm.

So now you have power of Spark — you can easily distribute job and use all slaves. And also, you have GPU at your hand, so whenever you use ndarray from mxnet, it can use the GPU power.

If you don’t want to use GPU, then just install these libraries instead:

yes | sudo /mnt/home/hadoop/anaconda/bin/pip install --pre mxnet
/mnt/home/hadoop/anaconda/bin/conda install -y -c dglteam dgl

and use mx.cpu() context. This works as well, obviously, much slower. For my use case GPU calculations were 80 times faster than when running on CPU.

]]>
https://blog.adamfurmanek.pl/2020/03/28/running-anaconda-with-dgl-and-mxnet-on-cuda-gpu-in-spark-running-in-emr/feed/ 0
Running any query in Redshift or JDBC from Spark in EMR https://blog.adamfurmanek.pl/2020/03/21/running-any-query-in-redshift-or-jdbc-from-spark-in-emr/ https://blog.adamfurmanek.pl/2020/03/21/running-any-query-in-redshift-or-jdbc-from-spark-in-emr/#respond Sat, 21 Mar 2020 09:00:20 +0000 https://blog.adamfurmanek.pl/?p=3264 Continue reading Running any query in Redshift or JDBC from Spark in EMR]]> Last time we saw how to connect to Redshift from Spark running in EMR. Provided solution was nice but allowed for reading data only. Sometimes we might want to run any DDL or DML query, not only simple read statements.

To do that, we need to connect to Redshift directly over JDBC. I assume you configured your cluster the same way as in the previous part. Now use this code:

def executeNonQuery(query: String) = {
    import java.sql._
    import java.util._
    
    var connectionProps = new Properties();

    var connection = DriverManager.getConnection(getConnectionString(), connectionProps);
    var statement = connection.createStatement(query);
    statement.executeUpdate();
}

We first import packages for JDBC. Next, we create new properties for the connection which can be empty. Then, we open the connection using JDBC infrastructure, prepare the query and execute it. Please remember that this query must return row count so it shouldn’t be SELECT query.

We can use it like this:

%spark
executeNonQuery(s"""DROP TABLE IF EXISTS table""")

Query is pretty much anything you can run in SQL Workbench. It works with temporary tables as well (unlike prepareStatement).

]]>
https://blog.adamfurmanek.pl/2020/03/21/running-any-query-in-redshift-or-jdbc-from-spark-in-emr/feed/ 0
Connecting to Redshift from Spark running in EMR https://blog.adamfurmanek.pl/2020/03/14/connecting-to-redshift-from-spark-running-in-emr/ https://blog.adamfurmanek.pl/2020/03/14/connecting-to-redshift-from-spark-running-in-emr/#respond Sat, 14 Mar 2020 09:00:22 +0000 https://blog.adamfurmanek.pl/?p=3261 Continue reading Connecting to Redshift from Spark running in EMR]]> Today I’ll share my configuration for Spark running in EMR to connect to Redshift cluster. First, I assume the cluster is accessible (so configure virtual subnet, allowed IPs and all network stuff before running this).

I’m using Zeppelin so I’ll show two interpreters configured for the connection, but the same thing should work with standalone job (as long as it has the same libraries configured). I tested things with EMR 5.17.2 but it should work with other versions as well.

Redshift interpreter

First, let’s configure separate interpreter to use in Zeppelin. SSH into the master node of the cluster and install JDBC interpreter:

sudo /usr/lib/zeppelin/bin/install-interpreter.sh --name jdbc

Next, download the driver:

cd /usr/lib/zeppelin/interpreter/jdbc/
sudo wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

Restart zeppelin:

sudo stop zeppelin
sudo start zeppelin

Go to interpreters configuration in Zeppelin and add new JDBC named redshift. Use the following settings:

default.driver	com.amazon.redshift.jdbc42.Driver
default.url	jdbc:redshift://your-redshift-instance-address.redshift.amazonaws.com:5439/your-database
default.user	redshift_user
default.password	redshift_password

Now create new paragraph like below:

%redshift
SELECT * FROM table

And it should work.

Spark interpreter

Download driver the same way as before. Now, go to interpreter settings and add dependency to Spark interpreter:

/usr/lib/zeppelin/interpreter/jdbc/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

Now you can start reading data like this:

%spark
import org.apache.spark.sql._

def getConnectionString() = {
    val url = "cluster url"
    val port = 8192
    val database = "database name"
    val user = "user"
    val password = "password"
    
    s"jdbc:redshift://${url}:$port/$database?user=$user&password=$password"
}

def runQuery(query: String) = {
    val df: DataFrame = sqlContext.read
      .format("jdbc")
      .option("driver", "com.amazon.redshift.jdbc42.Driver")
      .option("url", getConnectionString())
      .option("dbtable", s"($query) tmp")
      .load()
    
    df
}

var table = runQuery(s"""
SELECT * FROM Table AS t
""")

This is even nicer because you can use string interpolation to provide parameters for queries.

]]>
https://blog.adamfurmanek.pl/2020/03/14/connecting-to-redshift-from-spark-running-in-emr/feed/ 0
Aborted transactions in Redshift due to Serializable isolation level https://blog.adamfurmanek.pl/2020/02/22/aborted-transactions-in-redshift-due-to-serializable-isolation-level/ https://blog.adamfurmanek.pl/2020/02/22/aborted-transactions-in-redshift-due-to-serializable-isolation-level/#respond Sat, 22 Feb 2020 09:00:27 +0000 https://blog.adamfurmanek.pl/?p=3246 Continue reading Aborted transactions in Redshift due to Serializable isolation level]]> I was working with seemingly simple load job in Redshift. I was generating some data and wanted to store the output in separate table, just to persist it for debugging purposes. However, since the job could be reran if needed, I wanted to remove rows generated previously and add new ones. So I tried the following:

CREATE TABLE locks_tests (
  market INT,
  value INT
);

INSERT INTO locks_tests(market, value) VALUES
  (1, 1),
  (1, 2),
  (2, 1),
  (2, 2)
;

This is our table. Let’s assume that job can generate data for market 1 or market 2. So the job for market 1 does this:

DELETE FROM locks_tests WHERE market = 1;
INSERT INTO locks_tests VALUES (1, 1), (1,2);

Similarly, job for market 2 does this:

DELETE FROM locks_tests WHERE market = 2;
INSERT INTO locks_tests VALUES (2, 1), (2,2);

As you can see, both jobs use the same table but access different rows.

Now the thing is, if you try running both transactions in parallel, most likely you will get an error that transactions collide and one of them had to be aborted.

I didn’t understand why it works this way as per my understanding there is nothing wrong. Either it should use table lock and one transaction should wait for the other, ot it should use some range locks and figure out that there is nothing wrong.

Unfortunately, it is not that simple. Since Redshift is based on Postgresql, we can reproduce the problem over there as well. For instance, see dba.stackexchange.

Okay, we know what the problem is, how to solve it? I considered locking tables explicitly with LOCK statement but I never find it nice. I checked if it is possible to control partitions of the table but it doesn’t seem to be allowed in Redshift. I thought about doing so called “poor men’s clustering” with multiple tables joined using single view, but I didn’t want to increase the complexity.

Finally, I decided to go with simple MVCC-like approach. Add new column to the table, indicating timestamp of inserted row:

CREATE TABLE locks_tests (
  market INT,
  value INT,
  insert_time TIMESTAMP
);

Now, when inserting data to the table, use this:

INSERT INTO locks_tests(market, value) VALUES
  (1, 1, getdate())
;

Finally, prepare one view extracting only latest data:

CREATE OR REPLACE VIEW locks_tests_view AS (
  WITH max_timestamps AS (
    SELECT market, MAX(insert_time) AS max_insert_time
    FROM locks_tests
    GROUP BY market
  )
  SELECT C.*
  FROM locks_tests AS C
  JOIN max_timestamps AS T ON T.market = C.market
  WHERE C.insert_time = T.max_insert_time
);

And now query the view instead of the original table. You can also run deletion job periodically if needed.

]]>
https://blog.adamfurmanek.pl/2020/02/22/aborted-transactions-in-redshift-due-to-serializable-isolation-level/feed/ 0