JVM – Random IT Utensils https://blog.adamfurmanek.pl IT, operating systems, maths, and more. Thu, 19 Mar 2020 00:51:49 +0000 en-US hourly 1 https://wordpress.org/?v=6.5.2 Running Anaconda with DGL and mxnet on CUDA GPU in Spark running in EMR https://blog.adamfurmanek.pl/2020/03/28/running-anaconda-with-dgl-and-mxnet-on-cuda-gpu-in-spark-running-in-emr/ https://blog.adamfurmanek.pl/2020/03/28/running-anaconda-with-dgl-and-mxnet-on-cuda-gpu-in-spark-running-in-emr/#respond Sat, 28 Mar 2020 09:00:14 +0000 https://blog.adamfurmanek.pl/?p=3266 Continue reading Running Anaconda with DGL and mxnet on CUDA GPU in Spark running in EMR]]> Today I’m going to share my configuration for running custom Anaconda Python with DGL (Deep Graph Library) and mxnet library, with GPU support via CUDA, running in Spark hosted in EMR. Actually, I have Redshift configuration as well, with support for gensim, tensorflow, keras, theano, pygpu, and cloudpickle. You can also install more libraries if needed. All this for Google to index keywords. Let’s begin.

My configuration uses EMR 5.17.2 and CUDA 9.2. When I’m writing it, there is EMR 5.27 available but it comes with the same CUDA version so I presume it should work as well. I’m also using Python 3.7.

First, create a cluster. Do not select mxnet as a provided library in EMR, we will install it later. As a master node use p3.8xlarge instance type — this instance must have GPU and this is where we will run DGL and mxnet. For slaves you can use anything, I’m going with 19 r3.4xlarge nodes (they don’t have GPU).

We need to install some custom libraries. I am using bootstrap script for that but you can just SSH into the host manually and run this code:

sudo mkdir /mnt/usr-moved
sudo mv /usr/local /mnt/usr-moved/
sudo ln -s /mnt/usr-moved/local /usr/
sudo mv /usr/share /mnt/usr-moved/
sudo ln -s /mnt/usr-moved/share /usr/

sudo mkdir /mnt/home
sudo chown hadoop.hadoop /mnt/home

wget https://repo.anaconda.com/archive/Anaconda3-2019.03-Linux-x86_64.sh -O ~/anaconda.sh
bash ~/anaconda.sh -b -p /mnt/home/hadoop/anaconda
echo -e '\nexport PATH=/mnt/home/hadoop/anaconda/bin:$PATH' >> $HOME/.bashrc && source $HOME/.bashrc
echo -e '\nexport PYSPARK_PYTHON=/mnt/home/hadoop/anaconda/bin/python' >> $HOME/.bashrc && source $HOME/.bashrc

/mnt/home/hadoop/anaconda/bin/conda install -y gensim
/mnt/home/hadoop/anaconda/bin/conda install -y tensorflow
/mnt/home/hadoop/anaconda/bin/conda install -y keras
/mnt/home/hadoop/anaconda/bin/conda install -y theano
/mnt/home/hadoop/anaconda/bin/conda install -y pygpu
/mnt/home/hadoop/anaconda/bin/conda upgrade -y cloudpickle
yes | sudo /mnt/home/hadoop/anaconda/bin/pip install --pre mxnet-cu92mkl
/mnt/home/hadoop/anaconda/bin/conda install -y -c dglteam dgl-cuda9.2
/mnt/home/hadoop/anaconda/bin/conda install -y s3fs

First, I’m making a symlink to not fill the disk while installing packages. Then in line 10 I download Anaconda. Finally, lines 15-23 install some additional libraries. Notice that in line 21 I install mxnet compiled for CUDA 9.2, and in line 22 the same for DGL. Also, s3fs is required for nice reading from s3.

When this is done and cluster is created, I replace Python for Zeppelin interpreter to point to /mnt/home/hadoop/anaconda/bin/python and add Redshift configuration. I do this with the following command line (this you need to run manually after the cluster is created):

sudo /usr/lib/zeppelin/bin/install-interpreter.sh --name jdbc
cd /usr/lib/zeppelin/interpreter/jdbc/
sudo wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

sudo cat | sudo tee /etc/zeppelin/conf/interpreter.json <<'endmsg'
{
  "interpreterSettings": {
    "2ANGGHHMQ": {
      "id": "2ANGGHHMQ",
      "name": "spark",
      "group": "spark",
      "properties": {
        "zeppelin.spark.printREPLOutput": "true",
        "spark.yarn.jar": "",
        "master": "yarn-client",
        "zeppelin.spark.maxResult": "1000",
        "zeppelin.dep.localrepo": "/usr/lib/zeppelin/local-repo",
        "spark.app.name": "Zeppelin",
        "zeppelin.spark.importImplicit": "true",
        "zeppelin.spark.useHiveContext": "true",
        "args": "",
        "spark.home": "/usr/lib/spark",
        "zeppelin.spark.concurrentSQL": "false",
        "zeppelin.pyspark.python": "/mnt/home/hadoop/anaconda/bin/python"
      },
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "spark",
          "class": "org.apache.zeppelin.spark.SparkInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "scala",
            "editOnDblClick": false
          }
        },
        {
          "name": "pyspark",
          "class": "org.apache.zeppelin.spark.PySparkInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "python",
            "editOnDblClick": false
          }
        },
        {
          "name": "sql",
          "class": "org.apache.zeppelin.spark.SparkSqlInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "sql",
            "editOnDblClick": false
          }
        }
      ],
      "dependencies": [
        {
          "groupArtifactVersion": "/usr/lib/zeppelin/interpreter/jdbc/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar",
          "local": false
        }
      ],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
                "setPermission": false,
        "users": [],
        "isUserImpersonate": false
      }
    },
    "2AM1YV5CU": {
      "id": "2AM1YV5CU",
      "name": "angular",
      "group": "angular",
      "properties": {},
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "angular",
          "class": "org.apache.zeppelin.angular.AngularInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "editOnDblClick": true
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "isUserImpersonate": false
      }
    },
    "2BRWU4WXC": {
      "id": "2BRWU4WXC",
      "name": "python",
      "group": "python",
      "properties": {
        "zeppelin.python": "/mnt/home/hadoop/anaconda/bin/python",
        "zeppelin.python.maxResult": "1000"
      },
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "python",
          "class": "org.apache.zeppelin.python.PythonInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "python",
            "editOnDblClick": false
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "users": [],
        "isUserImpersonate": false
              }
    },
    "2AJXGMUUJ": {
      "id": "2AJXGMUUJ",
      "name": "md",
      "group": "md",
      "properties": {},
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "md",
          "class": "org.apache.zeppelin.markdown.Markdown",
          "defaultInterpreter": false,
          "editor": {
            "language": "markdown",
            "editOnDblClick": true
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "isUserImpersonate": false
      }
    },
    "2AKK3QQXU": {
      "id": "2AKK3QQXU",
      "name": "sh",
      "group": "sh",
      "properties": {
        "shell.command.timeout.millisecs": "60000"
      },
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "sh",
          "class": "org.apache.zeppelin.shell.ShellInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "sh",
            "editOnDblClick": false
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "isUserImpersonate": false
      }
    }
  },
  "interpreterBindings": {
    "2EMW16J14": [
      "2ANGGHHMQ",
      "2AJXGMUUJ",
      "2AM1YV5CU",
      "2AKK3QQXU",
      "2BRWU4WXC"
    ],
    "2A94M5J1Z": [
      "2ANGGHHMQ",
      "2AJXGMUUJ",
      "2AM1YV5CU",
      "2AKK3QQXU",
      "2BRWU4WXC"
    ]
  },
  "interpreterRepositories": [
    {
      "id": "central",
      "type": "default",
      "url": "http://repo1.maven.org/maven2/",
      "releasePolicy": {
      "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "snapshotPolicy": {
        "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "mirroredRepositories": [],
      "repositoryManager": false
    },
    {
      "id": "local",
      "type": "default",
      "url": "file:///var/lib/zeppelin/.m2/repository",
      "releasePolicy": {
        "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "snapshotPolicy": {
        "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "mirroredRepositories": [],
      "repositoryManager": false
    }
  ]
}
endmsg

Now, I need to tune default spark submit options:

sudo cat | sudo tee /etc/zeppelin/conf.dist/zeppelin-env.sh <<'endmsg'
export ZEPPELIN_PORT=8890
export ZEPPELIN_CONF_DIR=/etc/zeppelin/conf
export ZEPPELIN_LOG_DIR=/var/log/zeppelin
export ZEPPELIN_PID_DIR=/var/run/zeppelin
export ZEPPELIN_PID=$ZEPPELIN_PID_DIR/zeppelin.pid
export ZEPPELIN_WAR_TEMPDIR=/var/run/zeppelin/webapps
export ZEPPELIN_NOTEBOOK_DIR=/var/lib/zeppelin/notebook
export MASTER=yarn-client
export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf
export LD_LIBRARY_PATH=/usr/local/cuda/lib64/
export CLASSPATH=":/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar"


export SPARK_SUBMIT_OPTIONS="--jars=YOUR_JARS_HERE --conf spark.executor.extraClassPath=/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/* --conf spark.driver.extraClassPath=/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/* --conf 'spark.executorEnv.PYTHONPATH=/usr/lib/spark/python/lib/py4j-src.zip:/usr/lib/spark/python/:<CPS>{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-src.zip' --conf spark.yarn.isPython=true --conf spark.driver.maxResultSize=80G --num-executors 56 --executor-cores 5 --executor-memory 38G --driver-memory 90G --conf 'spark.dynamicAllocation.enabled=false' --conf 'spark.hadoop.fs.s3.canned.acl=BucketOwnerFullControl' --conf 'spark.hadoop.fs.s3.acl.default=BucketOwnerFullControl' --conf 'spark.hadoop.fs.s3.acl=bucket-owner-full-control' --conf 'spark.hadoop.fs.s3n.acl=bucket-owner-full-control' --conf 'spark.hadoop.fs.s3.cse.enabled=false'"
export ZEPPELIN_NOTEBOOK_STORAGE=org.apache.zeppelin.notebook.repo.S3NotebookRepo
endmsg

This is not the full content! I omit some of my internal settings so generally don’t copy it blindly, just extend the zeppelin-env.sh file as needed. Important things are:
export LD_LIBRARY_PATH=/usr/local/cuda/lib64/ — this points to CUDA libraries
--conf spark.driver.maxResultSize=80G --num-executors 56 --executor-cores 5 --executor-memory 38G --driver-memory 90G --conf 'spark.dynamicAllocation.enabled=false' — this configures executors and memory. You need to tune it for your cluster size.

Now, restart Zeppelin. You should now be able to run:

%spark.pyspark
print(mx.__version__)
print(np.__version__)

1.6.0
1.14.6

Now you can create GPU context:

ctx = mx.gpu(0)

and it should work as a charm.

So now you have power of Spark — you can easily distribute job and use all slaves. And also, you have GPU at your hand, so whenever you use ndarray from mxnet, it can use the GPU power.

If you don’t want to use GPU, then just install these libraries instead:

yes | sudo /mnt/home/hadoop/anaconda/bin/pip install --pre mxnet
/mnt/home/hadoop/anaconda/bin/conda install -y -c dglteam dgl

and use mx.cpu() context. This works as well, obviously, much slower. For my use case GPU calculations were 80 times faster than when running on CPU.

]]>
https://blog.adamfurmanek.pl/2020/03/28/running-anaconda-with-dgl-and-mxnet-on-cuda-gpu-in-spark-running-in-emr/feed/ 0
Running any query in Redshift or JDBC from Spark in EMR https://blog.adamfurmanek.pl/2020/03/21/running-any-query-in-redshift-or-jdbc-from-spark-in-emr/ https://blog.adamfurmanek.pl/2020/03/21/running-any-query-in-redshift-or-jdbc-from-spark-in-emr/#respond Sat, 21 Mar 2020 09:00:20 +0000 https://blog.adamfurmanek.pl/?p=3264 Continue reading Running any query in Redshift or JDBC from Spark in EMR]]> Last time we saw how to connect to Redshift from Spark running in EMR. Provided solution was nice but allowed for reading data only. Sometimes we might want to run any DDL or DML query, not only simple read statements.

To do that, we need to connect to Redshift directly over JDBC. I assume you configured your cluster the same way as in the previous part. Now use this code:

def executeNonQuery(query: String) = {
    import java.sql._
    import java.util._
    
    var connectionProps = new Properties();

    var connection = DriverManager.getConnection(getConnectionString(), connectionProps);
    var statement = connection.createStatement(query);
    statement.executeUpdate();
}

We first import packages for JDBC. Next, we create new properties for the connection which can be empty. Then, we open the connection using JDBC infrastructure, prepare the query and execute it. Please remember that this query must return row count so it shouldn’t be SELECT query.

We can use it like this:

%spark
executeNonQuery(s"""DROP TABLE IF EXISTS table""")

Query is pretty much anything you can run in SQL Workbench. It works with temporary tables as well (unlike prepareStatement).

]]>
https://blog.adamfurmanek.pl/2020/03/21/running-any-query-in-redshift-or-jdbc-from-spark-in-emr/feed/ 0
Connecting to Redshift from Spark running in EMR https://blog.adamfurmanek.pl/2020/03/14/connecting-to-redshift-from-spark-running-in-emr/ https://blog.adamfurmanek.pl/2020/03/14/connecting-to-redshift-from-spark-running-in-emr/#respond Sat, 14 Mar 2020 09:00:22 +0000 https://blog.adamfurmanek.pl/?p=3261 Continue reading Connecting to Redshift from Spark running in EMR]]> Today I’ll share my configuration for Spark running in EMR to connect to Redshift cluster. First, I assume the cluster is accessible (so configure virtual subnet, allowed IPs and all network stuff before running this).

I’m using Zeppelin so I’ll show two interpreters configured for the connection, but the same thing should work with standalone job (as long as it has the same libraries configured). I tested things with EMR 5.17.2 but it should work with other versions as well.

Redshift interpreter

First, let’s configure separate interpreter to use in Zeppelin. SSH into the master node of the cluster and install JDBC interpreter:

sudo /usr/lib/zeppelin/bin/install-interpreter.sh --name jdbc

Next, download the driver:

cd /usr/lib/zeppelin/interpreter/jdbc/
sudo wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

Restart zeppelin:

sudo stop zeppelin
sudo start zeppelin

Go to interpreters configuration in Zeppelin and add new JDBC named redshift. Use the following settings:

default.driver	com.amazon.redshift.jdbc42.Driver
default.url	jdbc:redshift://your-redshift-instance-address.redshift.amazonaws.com:5439/your-database
default.user	redshift_user
default.password	redshift_password

Now create new paragraph like below:

%redshift
SELECT * FROM table

And it should work.

Spark interpreter

Download driver the same way as before. Now, go to interpreter settings and add dependency to Spark interpreter:

/usr/lib/zeppelin/interpreter/jdbc/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

Now you can start reading data like this:

%spark
import org.apache.spark.sql._

def getConnectionString() = {
    val url = "cluster url"
    val port = 8192
    val database = "database name"
    val user = "user"
    val password = "password"
    
    s"jdbc:redshift://${url}:$port/$database?user=$user&password=$password"
}

def runQuery(query: String) = {
    val df: DataFrame = sqlContext.read
      .format("jdbc")
      .option("driver", "com.amazon.redshift.jdbc42.Driver")
      .option("url", getConnectionString())
      .option("dbtable", s"($query) tmp")
      .load()
    
    df
}

var table = runQuery(s"""
SELECT * FROM Table AS t
""")

This is even nicer because you can use string interpolation to provide parameters for queries.

]]>
https://blog.adamfurmanek.pl/2020/03/14/connecting-to-redshift-from-spark-running-in-emr/feed/ 0
Erasure type inference issue in Java https://blog.adamfurmanek.pl/2020/02/15/erasure-type-inference-issue-in-java/ https://blog.adamfurmanek.pl/2020/02/15/erasure-type-inference-issue-in-java/#respond Sat, 15 Feb 2020 09:00:13 +0000 https://blog.adamfurmanek.pl/?p=3243 Continue reading Erasure type inference issue in Java]]> Recently I was working with the following code:

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class Test {
    public static void main(String[] args) {
        List<String> strings = new ArrayList<>();
        List<Item> items = strings.stream().map(item -> new Item(new HashMap())).collect(Collectors.toList());
    }
}

class Item {
    public Map<String, String> metadata;

    Item(Map<String, String> metadata) {
        this.metadata = metadata;
    }
}

I was compiling it with

java version "1.8.0_221"
Java(TM) SE Runtime Environment (build 1.8.0_221-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.221-b11, mixed mode)

on Windows 10 x64. It wasn’t working because of the following:

/tmp/java_fR1LWz/Test.java:10: warning: [unchecked] unchecked method invocation: constructor <init> in class Item is applied to given types
        List<Item> items = strings.stream().map(item -> new Item(new HashMap())).collect(Collectors.toList());
                                                            ^
  required: Map<String,String>
  found: HashMap
/tmp/java_fR1LWz/Test.java:10: warning: [unchecked] unchecked conversion
        List<Item> items = strings.stream().map(item -> new Item(new HashMap())).collect(Collectors.toList());
                                                                     ^
  required: Map<String,String>
  found:    HashMap
/tmp/java_fR1LWz/Test.java:10: warning: [unchecked] unchecked method invocation: method map in interface Stream is applied to given types
        List<Item> items = strings.stream().map(item -> new Item(new HashMap())).collect(Collectors.toList());
                                               ^
  required: Function<? super T,? extends R>
  found: Function<String,Item>
  where T,R are type-variables:
    T extends Object declared in interface Stream
    R extends Object declared in method <R>map(Function<? super T,? extends R>)
/tmp/java_fR1LWz/Test.java:10: warning: [unchecked] unchecked call to <R,A>collect(Collector<? super T,A,R>) as a member of the raw type Stream
        List<Item> items = strings.stream().map(item -> new Item(new HashMap())).collect(Collectors.toList());
                                                                                            ^
  where R,A,T are type-variables:
    R extends Object declared in method <R,A>collect(Collector<? super T,A,R>)
    A extends Object declared in method <R,A>collect(Collector<? super T,A,R>)
    T extends Object declared in interface Stream
/tmp/java_fR1LWz/Test.java:10: error: incompatible types: Object cannot be converted to List<Item>
        List<Item> items = strings.stream().map(item -> new Item(new HashMap())).collect(Collectors.toList());
                                                                                            ^
1 error
4 warnings

You can try reproducing the issue at compilejava.net, it throws the error at the moment.

I was almost sure that it was a bug in javac, especially that the code was working fine in Java 12 as indicated by Ideone.

Fortunately, with some help from 4programmers.net community I was finally pointed out in the right direction. It is a bug and Oracle knows about that. You can see details at Oracle page.

Takeaway is: most of the time the problem is on the user side, compiler/platform/OS/library/CPU works well. However, sometimes one just hits a bug.

]]>
https://blog.adamfurmanek.pl/2020/02/15/erasure-type-inference-issue-in-java/feed/ 0
JVM Inside Out Part 4 — Locks and out of band exceptions https://blog.adamfurmanek.pl/2020/02/01/jvm-inside-out-part-4/ https://blog.adamfurmanek.pl/2020/02/01/jvm-inside-out-part-4/#respond Sat, 01 Feb 2020 09:00:18 +0000 https://blog.adamfurmanek.pl/?p=3229 Continue reading JVM Inside Out Part 4 — Locks and out of band exceptions]]>

This is the fourth part of the JVM Inside Out series. For your convenience you can find other parts in the table of contents in Part 1 — Getting object address

Typical locking pattern in Java (and other languages, even outside them JVM ecosystem) looks like this:

lock.lock();
try{
   ...
}finally{
    lock.unlock();
}

Simple enough, nothing should break here. However, there is a catch.

Our code is optimized a lot. Compiler (javac) does that, JIT does that, even CPU does that. It tries to preserve semantic of our application but if we don’t obey the rules (i.e. we don’t use barriers when accessing variables modified in other threads) we may get unexpected results.

try block in JVM is implemented using metadata. There is a piece of information saying that try is between instructions X and Y. If we don’t get to those lines then the try is not respected (and finally is not called). Under the hood it is very „basic” approach — operating system mechanisms are used (SEH, SJLJ, signals etc) to catch interrupt (whether hardware or software) and ultimately to compare addresses. Details may differ but general concept is similar across platforms.

Now, what happens if JIT decides to compile the code like this:

1: call lock.lock();
2: nop
3: anything from try

We finish taking lock and we end up in instruction 2 but we are not in try block yet. Now, if some out of band exception appears we never release the lock. Out of band exception like ThreadDeath or OutOfMemory.

Typically we would like to kill JVM when any of these out of band situations happen. But nothing stops us from catching them and stop the thread from being killed.

Let’s take this code:

import java.sql.Date;
import java.util.concurrent.locks.ReentrantLock;

public class Play{
    public static void main(String[] args) throws InterruptedException {
        final ReentrantLock lock = new ReentrantLock();
        Thread t = new Thread(){
            @Override
            public void run(){
                try {
                    lock.lock();
                    while (new Date(2019, 9, 19).getTime() > 0) {} // This emulates nop instruction (and infinite loop which isn't clearly infinite so the compiler accepts the code)
                    try{
                        System.out.println("Try: Never gonna get here");
                    }finally{
                        System.out.println("Finally: Never gonna get here");
                        lock.unlock();
                    }
                }catch(Throwable e){
                    System.out.println(e);
                }
                System.out.println("We caught the exception and can 'safely' carry on");
            }
        };
        t.start();

        Thread.sleep(1000);
        t.stop();

        System.out.println("Checking deadlock");
        lock.lock();
        System.out.println("Done, no deadlock");
        lock.unlock();
    }
}

Output is:

Checking deadlock
java.lang.ThreadDeath
We caught the exception and can 'safely' carry on

and the application hangs forever.

So what happened? We emulated the nop instruction inserted just before the try block and exception thrown right in that place. We can see that background thread handles the exception and continues execution but the lock is never released so the main thread is blocked forever.

Now let’s see what happens if we try taking the lock in the try block (warning: this code is not correct! it is just to show the idea):

import java.sql.Date;
import java.util.concurrent.locks.ReentrantLock;

public class Play{
    public static void main(String[] args) throws InterruptedException {
        final ReentrantLock lock = new ReentrantLock();
        Thread t = new Thread(){
            @Override
            public void run(){
                try {
                    try{
                        lock.lock();
                        while (new Date(2019, 9, 19).getTime() > 0) {} // This emulates nop instruction (and infinite loop which isn't clearly infinite so the compiler accepts the code)
                        System.out.println("Try: Never gonna get here");
                    }finally{
                        System.out.println("Finally: Never gonna get here");
                        lock.unlock();
                    }
                }catch(Throwable e){
                    System.out.println(e);
                }
                System.out.println("We caught the exception and can 'safely' carry on");
            }
        };
        t.start();

        Thread.sleep(1000);
        t.stop();

        System.out.println("Checking deadlock");
        lock.lock();
        System.out.println("Done, no deadlock");
        lock.unlock();
    }
}

Output:

Checking deadlock
Finally: Never gonna get here
Done, no deadlock
java.lang.ThreadDeath
We caught the exception and can 'safely' carry on

Application finishes successfully. Why is this code wrong? It’s because we try to release the lock in finally but we don’t know if we locked it. If someone else locked it then we may release it incorrectly or get exception. We may also break it in case of recursive situation.

Now the question is: is this just a theory or did it actually happen? I don’t know of any example in JVM world but this happened in .NET and was fixed in .NET 4.0. On the other hand I am not aware of any guarantee that this will not happen in JVM.

How to solve it? Avoid Thread.stop() as stopping threads is bad. But remember that it doesn’t solve the „problem” — what if you have distributed lock (whether it is OS lock across processes or something across machines)? You have exactly the same issue and saying „avoid Process.kill()” or „avoid getting your machine broken” is not an answer. This problem can always appear so think about it whenever you take the lock. And as a rule of thumb, track the owner and always take the lock with timeout.

]]>
https://blog.adamfurmanek.pl/2020/02/01/jvm-inside-out-part-4/feed/ 0
JVM Inside Out Part 3 — Java raw type trickery https://blog.adamfurmanek.pl/2020/01/25/jvm-inside-out-part-3/ https://blog.adamfurmanek.pl/2020/01/25/jvm-inside-out-part-3/#comments Sat, 25 Jan 2020 09:00:51 +0000 https://blog.adamfurmanek.pl/?p=3221 Continue reading JVM Inside Out Part 3 — Java raw type trickery]]>

This is the third part of the JVM Inside Out series. For your convenience you can find other parts in the table of contents in Part 1 — Getting object address

Erasure in Java seems pretty easy but sometimes it has unexpected consequences. One of them is erasure of whole class content, not only the generic type. According to JLS 4.6 we have

Type erasure also maps the signature (§8.4.2) of a constructor or method to a signature that has no parameterized types or type variables. The erasure of a constructor or method signature s is a signature consisting of the same name as s and the erasures of all the formal parameter types given in s.

Let’s take this code:

import java.util.*;
import java.lang.*;
import java.io.*;
 
class Ideone
{
	public static List<? extends Object> produce(){
		return null; // Whatever
	}
 
	public static void main (String[] args) throws java.lang.Exception
	{
	}
 
	public static void a(NoGeneric noGeneric){
		noGeneric.call(produce());
	}
 
	public static <T> void b(Generic<T> generic){
		generic.call(produce());
	}
 
	public static <T, U extends Generic<T>> void d(U generic){
		generic.call(produce());
	}
 
	public static <T extends Generic> void c(T raw){
		raw.call(produce());
	}
}
 
class NoGeneric{
	public void call(List<Object> objects){	}
}
 
class Generic<T> {
	public void call(List<Object> objects){}
}

Compiler signals this:

Main.java:16: error: incompatible types: List<CAP#1> cannot be converted to List<Object>
		noGeneric.call(produce());
		                      ^
  where CAP#1 is a fresh type-variable:
    CAP#1 extends Object from capture of ? extends Object
Main.java:20: error: incompatible types: List<CAP#1> cannot be converted to List<Object>
		generic.call(produce());
		                    ^
  where CAP#1 is a fresh type-variable:
    CAP#1 extends Object from capture of ? extends Object
Main.java:24: error: incompatible types: List<CAP#1> cannot be converted to List<Object>
		generic.call(produce());
		                    ^
  where CAP#1 is a fresh type-variable:
    CAP#1 extends Object from capture of ? extends Object
Note: Main.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
Note: Some messages have been simplified; recompile with -Xdiags:verbose to get full output
3 errors

Line 16 doesn’t work because we try to pass list of ? extends Object to list of Object. Java doesn’t allow this for generic types (it would work for arrays, though).

Line 20 doesn’t work because generic of T is not a raw type so we do the same as in line 16.

Line 24 doesn’t work because of the same reasoning.

However, line 28 works. It is because T extends Generic uses a raw type. According to JLS we remove generic parameters not only related to T in Generic but to other things as well. So method call(List< Object > objects) becomes call(List objects).

]]>
https://blog.adamfurmanek.pl/2020/01/25/jvm-inside-out-part-3/feed/ 1
JVM Inside Out Part 2 — Reading object content https://blog.adamfurmanek.pl/2020/01/04/jvm-inside-out-part-2/ https://blog.adamfurmanek.pl/2020/01/04/jvm-inside-out-part-2/#comments Sat, 04 Jan 2020 09:00:31 +0000 https://blog.adamfurmanek.pl/?p=3200 Continue reading JVM Inside Out Part 2 — Reading object content]]>

This is the second part of the JVM Inside Out series. For your convenience you can find other parts in the table of contents in Part 1 — Getting object address

Last time we saw how to read object address. We can use similar trick to read object contents as integers. Let’s see this:

package unsafe;

import java.lang.reflect.AccessibleObject;
import java.lang.reflect.Field;

public class Play {
    public static void main (String[] args) throws java.lang.Exception
    {
        Dummy foo = new Dummy();
        foo.value = 0xBADF00D;
        foo.value2 = 0xDEADBEEF;
        foo.value3 = 0xFEDCFEDC;

        AddressExtractor addressExtractor = new AddressExtractor();
        Class<? extends AddressExtractor> clazz = addressExtractor.getClass();
        Field field = clazz.getDeclaredField("pointerValue");
        Field type = Field.class.getDeclaredField("type");
        AccessibleObject.setAccessible(new AccessibleObject[]{field, type}, true);
        type.set(field, Object.class);

        IntsExtractor intsExtractor = new IntsExtractor();
        Class<? extends IntsExtractor> clazz2 = intsExtractor.getClass();
        Field field2 = clazz2.getDeclaredField("ints");
        AccessibleObject.setAccessible(new AccessibleObject[]{field2, type}, true);
        type.set(field2, long.class);

        field.set(addressExtractor, foo);
        long trickyAddress = addressExtractor.pointerValue - 8;
        field2.setLong(intsExtractor, trickyAddress);

        System.out.println("Length: " + intsExtractor.ints.length);
        for(int i=0;i<3;++i){
            System.out.println(Integer.toHexString(intsExtractor.ints[i]));
        }
    }
}

class AddressExtractor {
    public long pointerValue;
}

class IntsExtractor {
    public int[] ints;
}

class Dummy{
    public int value;
    public int value2;
    public int value3;
}

First, we create new object and set some dummy values. Next, we create helper instances for reflection.

In line 27 we do the same trick as last time. We assign object to a long field which in turn assigns reference. So we have an address.

Now, we would like to create an array of integers which would contain the object. This is a common trick, since array can be used to read the values, we can effectively use array as a pointer. Very similar to base pointer or segment address.

So we could assign the foo object directly to that array but then we wouldn’t be able to read first field. That’s because first field would be internally storing array size. We need to move back by one long value, so in line 28 we calculate address of the fake array.

Next, in line 29 we just assign this fake object to int[] field.

Finally, we can read all values using loop.

Obviously, this is very hacky approach and cannot be considered reliable. It highly depends on the architecture, JVM parameters (whether OOP are compressed or not) and multiple other scenarios.

]]>
https://blog.adamfurmanek.pl/2020/01/04/jvm-inside-out-part-2/feed/ 1
JVM Inside Out Part 1 — Getting object address https://blog.adamfurmanek.pl/2019/12/28/jvm-inside-out-part-1/ https://blog.adamfurmanek.pl/2019/12/28/jvm-inside-out-part-1/#comments Sat, 28 Dec 2019 09:00:25 +0000 https://blog.adamfurmanek.pl/?p=3196 Continue reading JVM Inside Out Part 1 — Getting object address]]>

This is the first part of the JVM Inside Out series. For your convenience you can find other parts using the links below:
Part 1 — Getting object address
Part 2 — Reading object content
Part 3 — Java raw type trickery
Part 4 — Locks and out of band exceptions

How to get object address in JVM? There is actually one easy trick to do it via reflection. See this code:

package unsafe;

import java.lang.reflect.AccessibleObject;
import java.lang.reflect.Field;

public class Play {
    public static void main (String[] args) throws java.lang.Exception
    {
        AddressExtractor addressExtractor = new AddressExtractor();
        Class<? extends AddressExtractor> clazz = addressExtractor.getClass();
        Field field = clazz.getDeclaredField("pointerValue");
        Field type = Field.class.getDeclaredField("type");
        AccessibleObject.setAccessible(new AccessibleObject[]{field, type}, true);
        type.set(field, Object.class);

        Dummy foo = new Dummy();
        foo.value = 0xBADF00D;
        field.set(addressExtractor, foo);

        System.out.println(Long.toHexString(addressExtractor.pointerValue));
        System.out.println(field.get(addressExtractor) == foo);

        System.in.read();
    }
}

class AddressExtractor {
    public long pointerValue;
}

class Dummy{
    public long value;
}

We create an object of type AddressExtractor which has one long field for storing the address. Next, we use reflection to get field named pointerValue. Trick is that we can use reflection to examine Field instance pointing to the pointerValue field. Since this instance must know the type, it stores it in a field called type. We can now use reflection to modify it and trick reflection to think that pointerValue is of type Object.

Next, in lines 16-18 we just create some dummy object to see if we are right. We initialize its field to some readable value and then assign this object to pointerValue long field. JVM now needs to assign Ordinary Object Pointer to the long value. We can then print it out.

I’m using 64-bit JVM and disable OOP compression using -XX:-UseCompressedOops. I get the following output:

12999a630
true

Let’s attach WinDBG and see what happens:

0:019> dd 0x12999a630
00000001`2999a630  00000001 00000000 1b5a3bf8 00000000
00000001`2999a640  0badf00d 00000000

We can see first two integers being a mark-word, next two integers for klass word (remember, we disabled OOP compression). Finally, we get one long field (two integers) with expected value.

Of course, this address is valid as long as GC doesn’t move the object. Most likely, if you add System.gc() at the end then the object will be moved.

]]>
https://blog.adamfurmanek.pl/2019/12/28/jvm-inside-out-part-1/feed/ 3
Dynamically loading JAR file in Zeppelin https://blog.adamfurmanek.pl/2018/07/14/dynamically-loading-jar-file-in-zeppelin/ https://blog.adamfurmanek.pl/2018/07/14/dynamically-loading-jar-file-in-zeppelin/#respond Sat, 14 Jul 2018 08:00:24 +0000 https://blog.adamfurmanek.pl/?p=2535 Continue reading Dynamically loading JAR file in Zeppelin]]> Imagine that you need to load JAR file dynamically in Zeppelin working on your EMR cluster. One easy way is to deploy the file to the instance and load it from there, however, what can you do if you have almost no access to the cluster and the filesystem? You can load the JAR from S3 and load it dynamically via custom classloader.

First, load the file:

val jarBinary = sc.binaryFiles("s3://bucket/file.jar").map(_._2.toArray).collect.head

Next, implement the classloader:

class RemoteClassLoader(jarBytes: Array[Byte]) extends ClassLoader{
  override def loadClass(name: String, resolve: Boolean): Class[_] = {
    var clazz = findLoadedClass(name)
    if(clazz != null){
      return clazz
    }
    try{
      val in = getResourceAsStream(name.replace(".", "/") + ".class")
      val out = new java.io.ByteArrayOutputStream()
      copy(in, out)
      val bytes = out.toByteArray
      clazz = defineClass(name, bytes, 0, bytes.length)
      if(resolve){
        resolveClass(clazz)
      }
    }catch{
      case e: Exception => clazz = super.loadClass(name, resolve)
    }
    return clazz
  }
  override def getResource(name: String) = null
  override def getResourceAsStream(name: String): java.io.InputStream = {
    try{
      val jis = new java.util.jar.JarInputStream(new java.io.ByteArrayInputStream(jarBytes))
      var entry = jis.getNextJarEntry
      while(entry != null){
        if(entry.getName().equals(name)){
          return jis;
        }
        entry = jis.getNextJarEntry
      }
    }catch{
      case e: Exception => return null
    }
    return null
  }
  def copy(from: java.io.InputStream, to: java.io.OutputStream): Long = {
    val buf = new Array[Byte](8192)
    var total = 0
    while (true) {
      val r = from.read(buf)
      if (r == -1) return total
      to.write(buf, 0, r)
      total += r
    }
    total
  }
}

It extracts JAR from byte array and goes through the resources. Finally, just create the class:

val loader = new RemoteClassLoader(jarBinary);
val classToLoad = Class.forName("pl.adamfurmanek.blog.SampleClass", true, loader);
val instance = classToLoad.newInstance();

Of course, using this instance will be harder as it is loaded in different classloader so you will probably need a lot of reflection.

]]>
https://blog.adamfurmanek.pl/2018/07/14/dynamically-loading-jar-file-in-zeppelin/feed/ 0
Generating class in Zeppelin https://blog.adamfurmanek.pl/2018/07/07/generating-class-in-zeppelin/ https://blog.adamfurmanek.pl/2018/07/07/generating-class-in-zeppelin/#respond Sat, 07 Jul 2018 08:00:45 +0000 https://blog.adamfurmanek.pl/?p=2533 Continue reading Generating class in Zeppelin]]> If you want to declare a class in Zeppelin and create instance of it, you might be surprised:

class K

defined class K

classOf[K].newInstance()

java.lang.InstantiationException: K
at java.lang.Class.newInstance(Class.java:427)
... 52 elided
Caused by: java.lang.NoSuchMethodException: K.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)
... 52 more

What happened? Whatever you declare in Zeppelin notebook is a part of some internal class so the newly declared class K doesn’t have parameterless constructor because it expects the instance of enclosing type. There are two simple ways to handle this.

Quasiquotes

Just generate a class with scala’s quaisquotes:

import reflect.runtime._
import universe._
import tools.reflect.ToolBox

val tb = currentMirro.mkToolBox()
val l = tb.compile(q"""class L; classOf[L].newInstance()""")()

l: Any = __wrapper$8$bb3239e978f24dc98e740075eecad313.__wrapper$8$bb3239e978f24dc98e740075eeacad313$L$1@7e2b9e13

Javax.tools

Use the following method to dynamically compile java code:

def generateClass(className: String, source: String): (Class[_], Any) = {
    val byteArrayOutputStream = new java.io.ByteArrayOutputStream()
    val simpleJavaFileObject = new javax.tools.SimpleJavaFileObject(java.net.URI.create(s"$className.java"), javax.tools.JavaFileObject.Kind.SOURCE) {
        override def getCharContent(ignoreEncodingErrors: Boolean):CharSequence = {
            return source;
        }
        override def openOutputStream(): java.io.OutputStream = {
            return byteArrayOutputStream;
        }
    };
    val standardManager = javax.tools.ToolProvider.getSystemJavaCompiler().getStandardFileManager(null, null, null);
    val customForwardingManager = new javax.tools.JavaFileManager {
        override def close() = standardManager.close()
        override def flush() = standardManager.flush()
        override def getClassLoader(location: javax.tools.JavaFileManager.Location) = standardManager.getClassLoader(location)
        override def getFileForInput(location: javax.tools.JavaFileManager.Location, packageName: String, relativeName: String) = standardManager.getFileForInput(location, packageName, relativeName)
        override def getFileForOutput(location: javax.tools.JavaFileManager.Location, packageName: String, relativeName: String, sibling: javax.tools.FileObject) = standardManager.getFileForOutput(location, packageName, relativeName, sibling)
        override def getJavaFileForInput(location: javax.tools.JavaFileManager.Location, className: String, kind: javax.tools.JavaFileObject.Kind) = standardManager.getJavaFileForInput(location, className, kind)
        override def getJavaFileForOutput(location: javax.tools.JavaFileManager.Location,
                                                   className: String,
                                                   kind: javax.tools.JavaFileObject.Kind,
                                                   sibling: javax.tools.FileObject): javax.tools.JavaFileObject = {
            return simpleJavaFileObject;
        }
        override def handleOption(current: String, remaining: java.util.Iterator[String]) = standardManager.handleOption(current, remaining)
        override def hasLocation(location: javax.tools.JavaFileManager.Location) = standardManager.hasLocation(location)
        override def inferBinaryName(location: javax.tools.JavaFileManager.Location, file: javax.tools.JavaFileObject) = standardManager.inferBinaryName(location, file)
        override def isSameFile(a: javax.tools.FileObject, b: javax.tools.FileObject) = standardManager.isSameFile(a, b)
        override def isSupportedOption(option: String) = standardManager.isSupportedOption(option)
        override def list(location: javax.tools.JavaFileManager.Location, packageName: String, kinds: java.util.Set[javax.tools.JavaFileObject.Kind], recurse: Boolean) = standardManager.list(location, packageName, kinds, recurse)
    }
    val list = new java.util.ArrayList[javax.tools.JavaFileObject]()
    list.add(simpleJavaFileObject)
    javax.tools.ToolProvider.getSystemJavaCompiler().getTask(null, customForwardingManager, null, null, null, list).call();
    val bytes = byteArrayOutputStream.toByteArray();
    val f = classOf[sun.misc.Unsafe].getDeclaredField("theUnsafe");
    f.setAccessible(true);
    val unsafe: sun.misc.Unsafe = f.get(null).asInstanceOf[sun.misc.Unsafe];
    val aClass = unsafe.defineClass(className, bytes, 0, bytes.length, null, null);
    val o = aClass.newInstance();
    (aClass, o)
}

Invoke it like this:

val (kClass, kInstance) = generateClass("K", """
public class K{
    public K(){}
}
""")

kClass: Class[_] = class K
kInstance: Any = K@adfd330

And you are done.

]]>
https://blog.adamfurmanek.pl/2018/07/07/generating-class-in-zeppelin/feed/ 0