Scala – Random IT Utensils https://blog.adamfurmanek.pl IT, operating systems, maths, and more. Mon, 08 Feb 2021 03:29:22 +0000 en-US hourly 1 https://wordpress.org/?v=6.6.2 Types and Programming Languages Part 4 – Diamond problem https://blog.adamfurmanek.pl/2021/02/06/types-and-programming-languages-part-4/ https://blog.adamfurmanek.pl/2021/02/06/types-and-programming-languages-part-4/#comments Sat, 06 Feb 2021 09:00:56 +0000 https://blog.adamfurmanek.pl/?p=3751 Continue reading Types and Programming Languages Part 4 – Diamond problem]]>

This is the fourth part of the Types and Programming Languages series. For your convenience you can find other parts in the table of contents in Part 1 — Do not return in finally

The Diamond Problem, sometimes called Deadly Diamond of Death, is a problem in which we inherit the same thing through multiple base entities. If you think of a diamond problem as “the one in C++ in which there are multiple instances created” or “the one that Java doesn’t have but C++ does” then you focus on the technical part too much. In this post I’ll show why there is a diamond problem in Java and that the issue is there since the day one.

Inheritance

We typically say that there is a single inheritance in Java and multiple interface implementation. This is true but it hides the much bigger picture.

Inheritance allows to inherit characteristics and features from the base entity (most of the times from a class or an object). There are many things we can inherit or many levels of inheritance:

  • Signature inheritance
  • Implementation inheritance
  • State inheritance
  • Identity inheritance

I’m using a bit different wording than you may be used to because I want to redefine couple things. Also, I’m not going much deeper into things like Hindley-Milner type system or theory of objects in this part, we’ll cover that some other day.

Signature inheritance

Signature inheritance can be considered an interface implementation in Java. There is some method declared in the interface, we inherit it and provide an implementation. Signature here indicates that it’s only a “header” of the method, no body or whatever else. It’s important to understand that this “inheritance signature” does not need to be the same one as the “calling signature” for method call. For instance, you cannot change return type when implementing interface in C# but return type is not a part of the “calling signature” (although there is an edge case where it is, but that’s a side note). Java allows for that (C# is also considering this feature) via bridge methods but it’s an implementation detail. What we think when talking about “signature inheritance” is just the method header we get from the base entity.

Implementation inheritance

In this type of inheritance we get not only the signature but also the whole method body. It wasn’t allowed in Java nor C# via interfaces but it’s now allowed via default interface implementations. We’ll cover implications of that a little later.

We can think of this as of traits. Even though there are some differences between implementation inheritance and traits, they are pretty close to each other. Also, “trait” here is not the “trait” in Scala, even though they are similar to some extent.

State inheritance

This is an inheritance of fields. You can emulate state inheritance with implementation inheritance only but most of the times it’s considered separate. In state inheritance we get the field from the base entity which we can use in subentity (subobject or subclass).

This is similar to mixins to some extent. It’s also worth noting that we may have state inheritance without implementation inheritance but most of the times these two come together.

Identity inheritance

This can be considered “constructor inheritance” (whout going much into the type theory). When you think what is the difference between mixing a mixin and inheriting from a class – it comes down to the constructor. You can create an instance and have a new identity.

Typically, we get the identity by constructing the base entity and “holding” it inside the subentity. It’s good to keep in mind that on a technical level we don’t need to hold the parent object as a part of the child object (the way it is implemented in JVM or CLR), these two can be linked via pointers but it’s not a popular way of implementing it. It is kind of similar to prototype inheritance in JavaScript but the latter uses one “base instance” which is reused across all subobjects. Also, when inheriting from multiple base classes we may end up with multiple base instances held in a single object (which can be controlled with virtual inheritance etc).

Inheritance in Java

C++ had multiple inheritance and didn’t differentiate between a class and an interface. Java was so scared of a multiple inheritance (because of the diamond problem) so it decided to ban everything but signature inheritance. It also introduced different terminology for signature inheritance, added separate keywords and made this difference clear and visible.

However, it is important to understand that saying that “there is no multiple inheritance in Java” is not true. There is a multiple inheritance for signatures and single inheritance for everything else (at least until Java 7).

So Java removed multiple inheritance and C# did the same. However, we later realized that it may not be the best idea and so Java added default interface implementation which is basically a “implementation inheritance” (to some extent as it doesn’t support full-blown polymorphism). Because of that we have the diamond problem “back”. As we’ll see later in this post, it was there from the very beginning.

Diamond problem

Wikipedia defines the diamond problem as a situation when two classes B and C inherit from class A, override something, and then class D inherits from classes B and C without overriding the thing from A. When we now want to use the thing from A in class D, we don’t know which one to use (the one from B or the one from C).

It’s important to understand that this has nothing to do with technical implementation of virtual inheritance in C++ or something like this. It’s a logical problem, not the technical one. C++ only provided a way of controlling internals a little bit better but it’s not the only approach we can take.

Before focusing on the problem itself, let’s talk about Diamond Situation. When we say “problem” we typically think of something which is not obvious how to tackle. However, the Diamond Situation can be trivially solved in some cases. For instance with the signature inheritance:

interface A{
	void foo();
}
 
interface B{
	void foo();
}
 
class C implements A, B{
	public void foo(){
		System.out.println("FOO");
	}
}
 
class Ideone
{
	public static void main (String[] args) throws java.lang.Exception
	{
		C c = new C();
		c.foo();
	}
}

Classes A and B declare method void foo. Class C implements both interfaces. We call foo in line 20 and it works — there is no issue here. Why is there no issue? Because it doesn’t matter which interface we use, signatures are the same. However, if we change the return type:

interface A{
	Object foo();
}
 
interface B{
	String foo();
}
 
class C implements A, B{
	public String foo(){
		return "Foo";
	}
}
 
class Ideone
{
	public static void main (String[] args) throws java.lang.Exception
	{
		C c = new C();
		System.out.println(c.foo());
	}
}

it works correctly in Java but doesn’t work in C# (Compilation error (line 11, col 11): ‘C’ does not implement interface member ‘A.foo()’. ‘C.foo()’ cannot implement ‘A.foo()’ because it does not have the matching return type of ‘object’.).

So we can see that the Diamond Situation in Java is actually a problem in C# because C# doesn’t use bridge methods.

Coming back to the problem. I mentioned that with default interface implementations the Diamond Problem is back. Let’s see the code:

interface A{
	default void foo(){
		System.out.println("A");
	}
}
 
interface B{
	default void foo(){
		System.out.println("B");
	}
}
 
class C implements A, B{
}
 
class Ideone
{
	public static void main (String[] args) throws java.lang.Exception
	{
		C c = new C();
		c.foo();
	}
}

It shows the compilation error

Main.java:13: error: types A and B are incompatible;
class C implements A, B{
^
  class C inherits unrelated defaults for foo() from types A and B
1 error

You may argue that we don’t have the diamond situation here but the code is written this on purpose to show that it’s not about some base type but about which things to use. How do we solve it? In Java we can add the following method to C:

public void foo(){
	A.super.foo();
}

And it works. No problem anymore, no virtual inheritance like in C++ etc.

So what is the Diamond Problem about? It’s not about inheriting incompatible things. It’s about deciding which one to use.

Diamond Situation is almost not interesting at all when we’re dealing with methods. It gets trickier when we introduce state in the base class. We need to decide whether we want to have independent states for each subclass (the regular inheritance) or share it between subclasses (virtual inheritance). If we share it then it may get broken easily (as two different implementations use the same variables). If we don’t share it then we need to specify which variables we’re referring to in the lowest subclass.

How did Java solve the problem? It gives compilation time error. But other languages do not stop here, for instance, Scala relies on linearization of traits and chooses “the rightmost one” first. It’s important to understand that the problem is not about getting two things but about how we decide which one wins. Compilation error is one of the solutions.

So we can see the problem is back in Java and has nice solution. No need to ban multiple inheritance, just show nice compilation error. But it’s not the end of the story.

Diamond Problem in Java since the day one

There is one more thing we need to consider with the Diamond Problem — compatibility. It may happen that your perfectly valid code works today but stops working tomorrow. How? Imagine that you implement two interfaces and only one of them provides default for method foo while the other interface doesn’t have foo at all. Your code works correctly. Then someone comes and adds default foo method to the second interface. When you recompile your code — it breaks.

That’s a big issue (just like each time we break compatibility) but it’s not something new. The Diamond Problem wasn’t in Java until version 8 but the essence of the problem was there since the beginning. Like we said in previous section, the problem is about deciding which thing wins when we have two of them. Let’s take this code:

class A {
	public void foo(long l){
		System.out.println("Long");
	}
 
	public void foo(double d){
		System.out.println("Double");
	}
}
 
class Ideone
{
	public static void main (String[] args) throws java.lang.Exception
	{
		A a = new A();
		a.foo(123);
	}
}

There are two foo methods, one accepting long, other accepting double. Can you easily tell which one is going to be used? The answer is: the former, accepting long parameter.

But let’s stop here and see what’s happening. We have two methods with different signatures. We want to call the method and we pass invalid value — value of a different type. However, Java is “clever” and just casts the value to a type which it likes more (here: long).

It’s exactly the same diamond problem as before when it comes to the essence. We have two things and we cannot decide which one to use. In the Diamond Problem with default interface implementations Java shows a compilation error but with method overloading it just chooses one method over another. Also, it has the same implications when it comes to breaking the compatibility — imagine that someone comes and adds another foo(int i) method. What’s going to happen with your code? Previously Java was casting int to long but after new method is added no cast is required — you’ll call the new method. It breaks the compatibility.

While accepting different numbers is a plausible situation, there is actually much more serious place where you may hit this issue. Source compatibility issue with Google Guava library post shows when Guava library added new override when accepting params array versus explicit parameters.

Summary

While it’s correct to say that there is no multiple inheritance in Java, it’s better to keep in mind that there are many levels of inheritance and we should be specific. Actually, we can inherit implementation since Java 8 — is it a multiple inheritance or not?
While it’s correct to say that there was no Diamond Problem in Java before version 8, the essence of the problem is there in methods overloading. And it has the same implications.
And it’s worth seeing how seemingly distant language elements lead to similar challenges. We’re all “afraid” of the Diamond Problem but we are not afraid of the method overloading. Even better — we think it’s a feature until one day we break compatibility.

]]>
https://blog.adamfurmanek.pl/2021/02/06/types-and-programming-languages-part-4/feed/ 1
Running Anaconda with DGL and mxnet on CUDA GPU in Spark running in EMR https://blog.adamfurmanek.pl/2020/03/28/running-anaconda-with-dgl-and-mxnet-on-cuda-gpu-in-spark-running-in-emr/ https://blog.adamfurmanek.pl/2020/03/28/running-anaconda-with-dgl-and-mxnet-on-cuda-gpu-in-spark-running-in-emr/#respond Sat, 28 Mar 2020 09:00:14 +0000 https://blog.adamfurmanek.pl/?p=3266 Continue reading Running Anaconda with DGL and mxnet on CUDA GPU in Spark running in EMR]]> Today I’m going to share my configuration for running custom Anaconda Python with DGL (Deep Graph Library) and mxnet library, with GPU support via CUDA, running in Spark hosted in EMR. Actually, I have Redshift configuration as well, with support for gensim, tensorflow, keras, theano, pygpu, and cloudpickle. You can also install more libraries if needed. All this for Google to index keywords. Let’s begin.

My configuration uses EMR 5.17.2 and CUDA 9.2. When I’m writing it, there is EMR 5.27 available but it comes with the same CUDA version so I presume it should work as well. I’m also using Python 3.7.

First, create a cluster. Do not select mxnet as a provided library in EMR, we will install it later. As a master node use p3.8xlarge instance type — this instance must have GPU and this is where we will run DGL and mxnet. For slaves you can use anything, I’m going with 19 r3.4xlarge nodes (they don’t have GPU).

We need to install some custom libraries. I am using bootstrap script for that but you can just SSH into the host manually and run this code:

sudo mkdir /mnt/usr-moved
sudo mv /usr/local /mnt/usr-moved/
sudo ln -s /mnt/usr-moved/local /usr/
sudo mv /usr/share /mnt/usr-moved/
sudo ln -s /mnt/usr-moved/share /usr/

sudo mkdir /mnt/home
sudo chown hadoop.hadoop /mnt/home

wget https://repo.anaconda.com/archive/Anaconda3-2019.03-Linux-x86_64.sh -O ~/anaconda.sh
bash ~/anaconda.sh -b -p /mnt/home/hadoop/anaconda
echo -e '\nexport PATH=/mnt/home/hadoop/anaconda/bin:$PATH' >> $HOME/.bashrc && source $HOME/.bashrc
echo -e '\nexport PYSPARK_PYTHON=/mnt/home/hadoop/anaconda/bin/python' >> $HOME/.bashrc && source $HOME/.bashrc

/mnt/home/hadoop/anaconda/bin/conda install -y gensim
/mnt/home/hadoop/anaconda/bin/conda install -y tensorflow
/mnt/home/hadoop/anaconda/bin/conda install -y keras
/mnt/home/hadoop/anaconda/bin/conda install -y theano
/mnt/home/hadoop/anaconda/bin/conda install -y pygpu
/mnt/home/hadoop/anaconda/bin/conda upgrade -y cloudpickle
yes | sudo /mnt/home/hadoop/anaconda/bin/pip install --pre mxnet-cu92mkl
/mnt/home/hadoop/anaconda/bin/conda install -y -c dglteam dgl-cuda9.2
/mnt/home/hadoop/anaconda/bin/conda install -y s3fs

First, I’m making a symlink to not fill the disk while installing packages. Then in line 10 I download Anaconda. Finally, lines 15-23 install some additional libraries. Notice that in line 21 I install mxnet compiled for CUDA 9.2, and in line 22 the same for DGL. Also, s3fs is required for nice reading from s3.

When this is done and cluster is created, I replace Python for Zeppelin interpreter to point to /mnt/home/hadoop/anaconda/bin/python and add Redshift configuration. I do this with the following command line (this you need to run manually after the cluster is created):

sudo /usr/lib/zeppelin/bin/install-interpreter.sh --name jdbc
cd /usr/lib/zeppelin/interpreter/jdbc/
sudo wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

sudo cat | sudo tee /etc/zeppelin/conf/interpreter.json <<'endmsg'
{
  "interpreterSettings": {
    "2ANGGHHMQ": {
      "id": "2ANGGHHMQ",
      "name": "spark",
      "group": "spark",
      "properties": {
        "zeppelin.spark.printREPLOutput": "true",
        "spark.yarn.jar": "",
        "master": "yarn-client",
        "zeppelin.spark.maxResult": "1000",
        "zeppelin.dep.localrepo": "/usr/lib/zeppelin/local-repo",
        "spark.app.name": "Zeppelin",
        "zeppelin.spark.importImplicit": "true",
        "zeppelin.spark.useHiveContext": "true",
        "args": "",
        "spark.home": "/usr/lib/spark",
        "zeppelin.spark.concurrentSQL": "false",
        "zeppelin.pyspark.python": "/mnt/home/hadoop/anaconda/bin/python"
      },
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "spark",
          "class": "org.apache.zeppelin.spark.SparkInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "scala",
            "editOnDblClick": false
          }
        },
        {
          "name": "pyspark",
          "class": "org.apache.zeppelin.spark.PySparkInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "python",
            "editOnDblClick": false
          }
        },
        {
          "name": "sql",
          "class": "org.apache.zeppelin.spark.SparkSqlInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "sql",
            "editOnDblClick": false
          }
        }
      ],
      "dependencies": [
        {
          "groupArtifactVersion": "/usr/lib/zeppelin/interpreter/jdbc/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar",
          "local": false
        }
      ],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
                "setPermission": false,
        "users": [],
        "isUserImpersonate": false
      }
    },
    "2AM1YV5CU": {
      "id": "2AM1YV5CU",
      "name": "angular",
      "group": "angular",
      "properties": {},
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "angular",
          "class": "org.apache.zeppelin.angular.AngularInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "editOnDblClick": true
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "isUserImpersonate": false
      }
    },
    "2BRWU4WXC": {
      "id": "2BRWU4WXC",
      "name": "python",
      "group": "python",
      "properties": {
        "zeppelin.python": "/mnt/home/hadoop/anaconda/bin/python",
        "zeppelin.python.maxResult": "1000"
      },
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "python",
          "class": "org.apache.zeppelin.python.PythonInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "python",
            "editOnDblClick": false
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "users": [],
        "isUserImpersonate": false
              }
    },
    "2AJXGMUUJ": {
      "id": "2AJXGMUUJ",
      "name": "md",
      "group": "md",
      "properties": {},
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "md",
          "class": "org.apache.zeppelin.markdown.Markdown",
          "defaultInterpreter": false,
          "editor": {
            "language": "markdown",
            "editOnDblClick": true
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "isUserImpersonate": false
      }
    },
    "2AKK3QQXU": {
      "id": "2AKK3QQXU",
      "name": "sh",
      "group": "sh",
      "properties": {
        "shell.command.timeout.millisecs": "60000"
      },
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "sh",
          "class": "org.apache.zeppelin.shell.ShellInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "sh",
            "editOnDblClick": false
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "isUserImpersonate": false
      }
    }
  },
  "interpreterBindings": {
    "2EMW16J14": [
      "2ANGGHHMQ",
      "2AJXGMUUJ",
      "2AM1YV5CU",
      "2AKK3QQXU",
      "2BRWU4WXC"
    ],
    "2A94M5J1Z": [
      "2ANGGHHMQ",
      "2AJXGMUUJ",
      "2AM1YV5CU",
      "2AKK3QQXU",
      "2BRWU4WXC"
    ]
  },
  "interpreterRepositories": [
    {
      "id": "central",
      "type": "default",
      "url": "http://repo1.maven.org/maven2/",
      "releasePolicy": {
      "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "snapshotPolicy": {
        "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "mirroredRepositories": [],
      "repositoryManager": false
    },
    {
      "id": "local",
      "type": "default",
      "url": "file:///var/lib/zeppelin/.m2/repository",
      "releasePolicy": {
        "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "snapshotPolicy": {
        "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "mirroredRepositories": [],
      "repositoryManager": false
    }
  ]
}
endmsg

Now, I need to tune default spark submit options:

sudo cat | sudo tee /etc/zeppelin/conf.dist/zeppelin-env.sh <<'endmsg'
export ZEPPELIN_PORT=8890
export ZEPPELIN_CONF_DIR=/etc/zeppelin/conf
export ZEPPELIN_LOG_DIR=/var/log/zeppelin
export ZEPPELIN_PID_DIR=/var/run/zeppelin
export ZEPPELIN_PID=$ZEPPELIN_PID_DIR/zeppelin.pid
export ZEPPELIN_WAR_TEMPDIR=/var/run/zeppelin/webapps
export ZEPPELIN_NOTEBOOK_DIR=/var/lib/zeppelin/notebook
export MASTER=yarn-client
export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf
export LD_LIBRARY_PATH=/usr/local/cuda/lib64/
export CLASSPATH=":/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar"


export SPARK_SUBMIT_OPTIONS="--jars=YOUR_JARS_HERE --conf spark.executor.extraClassPath=/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/* --conf spark.driver.extraClassPath=/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/* --conf 'spark.executorEnv.PYTHONPATH=/usr/lib/spark/python/lib/py4j-src.zip:/usr/lib/spark/python/:<CPS>{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-src.zip' --conf spark.yarn.isPython=true --conf spark.driver.maxResultSize=80G --num-executors 56 --executor-cores 5 --executor-memory 38G --driver-memory 90G --conf 'spark.dynamicAllocation.enabled=false' --conf 'spark.hadoop.fs.s3.canned.acl=BucketOwnerFullControl' --conf 'spark.hadoop.fs.s3.acl.default=BucketOwnerFullControl' --conf 'spark.hadoop.fs.s3.acl=bucket-owner-full-control' --conf 'spark.hadoop.fs.s3n.acl=bucket-owner-full-control' --conf 'spark.hadoop.fs.s3.cse.enabled=false'"
export ZEPPELIN_NOTEBOOK_STORAGE=org.apache.zeppelin.notebook.repo.S3NotebookRepo
endmsg

This is not the full content! I omit some of my internal settings so generally don’t copy it blindly, just extend the zeppelin-env.sh file as needed. Important things are:
export LD_LIBRARY_PATH=/usr/local/cuda/lib64/ — this points to CUDA libraries
--conf spark.driver.maxResultSize=80G --num-executors 56 --executor-cores 5 --executor-memory 38G --driver-memory 90G --conf 'spark.dynamicAllocation.enabled=false' — this configures executors and memory. You need to tune it for your cluster size.

Now, restart Zeppelin. You should now be able to run:

%spark.pyspark
print(mx.__version__)
print(np.__version__)

1.6.0
1.14.6

Now you can create GPU context:

ctx = mx.gpu(0)

and it should work as a charm.

So now you have power of Spark — you can easily distribute job and use all slaves. And also, you have GPU at your hand, so whenever you use ndarray from mxnet, it can use the GPU power.

If you don’t want to use GPU, then just install these libraries instead:

yes | sudo /mnt/home/hadoop/anaconda/bin/pip install --pre mxnet
/mnt/home/hadoop/anaconda/bin/conda install -y -c dglteam dgl

and use mx.cpu() context. This works as well, obviously, much slower. For my use case GPU calculations were 80 times faster than when running on CPU.

]]>
https://blog.adamfurmanek.pl/2020/03/28/running-anaconda-with-dgl-and-mxnet-on-cuda-gpu-in-spark-running-in-emr/feed/ 0
Connecting to Redshift from Spark running in EMR https://blog.adamfurmanek.pl/2020/03/14/connecting-to-redshift-from-spark-running-in-emr/ https://blog.adamfurmanek.pl/2020/03/14/connecting-to-redshift-from-spark-running-in-emr/#respond Sat, 14 Mar 2020 09:00:22 +0000 https://blog.adamfurmanek.pl/?p=3261 Continue reading Connecting to Redshift from Spark running in EMR]]> Today I’ll share my configuration for Spark running in EMR to connect to Redshift cluster. First, I assume the cluster is accessible (so configure virtual subnet, allowed IPs and all network stuff before running this).

I’m using Zeppelin so I’ll show two interpreters configured for the connection, but the same thing should work with standalone job (as long as it has the same libraries configured). I tested things with EMR 5.17.2 but it should work with other versions as well.

Redshift interpreter

First, let’s configure separate interpreter to use in Zeppelin. SSH into the master node of the cluster and install JDBC interpreter:

sudo /usr/lib/zeppelin/bin/install-interpreter.sh --name jdbc

Next, download the driver:

cd /usr/lib/zeppelin/interpreter/jdbc/
sudo wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

Restart zeppelin:

sudo stop zeppelin
sudo start zeppelin

Go to interpreters configuration in Zeppelin and add new JDBC named redshift. Use the following settings:

default.driver	com.amazon.redshift.jdbc42.Driver
default.url	jdbc:redshift://your-redshift-instance-address.redshift.amazonaws.com:5439/your-database
default.user	redshift_user
default.password	redshift_password

Now create new paragraph like below:

%redshift
SELECT * FROM table

And it should work.

Spark interpreter

Download driver the same way as before. Now, go to interpreter settings and add dependency to Spark interpreter:

/usr/lib/zeppelin/interpreter/jdbc/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

Now you can start reading data like this:

%spark
import org.apache.spark.sql._

def getConnectionString() = {
    val url = "cluster url"
    val port = 8192
    val database = "database name"
    val user = "user"
    val password = "password"
    
    s"jdbc:redshift://${url}:$port/$database?user=$user&password=$password"
}

def runQuery(query: String) = {
    val df: DataFrame = sqlContext.read
      .format("jdbc")
      .option("driver", "com.amazon.redshift.jdbc42.Driver")
      .option("url", getConnectionString())
      .option("dbtable", s"($query) tmp")
      .load()
    
    df
}

var table = runQuery(s"""
SELECT * FROM Table AS t
""")

This is even nicer because you can use string interpolation to provide parameters for queries.

]]>
https://blog.adamfurmanek.pl/2020/03/14/connecting-to-redshift-from-spark-running-in-emr/feed/ 0
Spark and NegativeArraySizeException https://blog.adamfurmanek.pl/2019/06/22/spark-and-negativearraysizeexception/ https://blog.adamfurmanek.pl/2019/06/22/spark-and-negativearraysizeexception/#respond Sat, 22 Jun 2019 08:00:44 +0000 https://blog.adamfurmanek.pl/?p=2905 Recently I was debugging the following crash in Spark:

java.lang.NegativeArraySizeException
	at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:447)
	at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:245)
	at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:239)
	at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:135)
	at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:246)
	at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:239)
	at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:135)
	at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:41)
	at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:658)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:623)
	at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
	at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
	at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29)
	at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
	at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27)
	at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
	at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:207)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:268)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:268)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
	at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:269)
	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1411)

Disabling Kryo solves the issue. To do that just set spark.serializer to org.apache.spark.serializer.JavaSerializer.
Other workaround is to change Kryo’s reference management, as explained on Github:

Kryo kryo = new Kryo();
kryo.setReferences(false);

]]>
https://blog.adamfurmanek.pl/2019/06/22/spark-and-negativearraysizeexception/feed/ 0
Spark and NullPointerException in UTF8String.contains https://blog.adamfurmanek.pl/2019/06/15/spark-and-nullpointerexception-in-utf8string-contains/ https://blog.adamfurmanek.pl/2019/06/15/spark-and-nullpointerexception-in-utf8string-contains/#respond Sat, 15 Jun 2019 08:00:51 +0000 https://blog.adamfurmanek.pl/?p=2903 Continue reading Spark and NullPointerException in UTF8String.contains]]> Recently I was debugging a NullPointerException in Spark. The stacktrace was indicating this:

java.lang.NullPointerException
	at org.apache.spark.unsafe.types.UTF8String.contains(UTF8String.java:284)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

After some digging I found out that the following query causes the problem:

df1
  .join(df2,
	df1("id1") === df2("id2")
	  && !isnull(df1("ref"))
	  && !isnull(df2("ref"))
	  && df2("ref").contains(df1("ref")) // <--- this is the problem
	, "left_outer"
  )
  .drop("id2")

If I commented out the line with the comment the NPE was no longer there. Also, when I replaced either df2("ref") or df1("ref") with lit("ref") it was not crashing as well so there was something wrong with the contains running on two dataframes.

In my case removing the cache helped — I was caching df2 with cache() method before running the join. When I removed the caching the problem disappeared. Spark version 2.1.0 with EMR 5.5.3.

]]>
https://blog.adamfurmanek.pl/2019/06/15/spark-and-nullpointerexception-in-utf8string-contains/feed/ 0
Phantom types to encode state in C# https://blog.adamfurmanek.pl/2019/01/19/phantom-types-to-encode-state-in-c/ https://blog.adamfurmanek.pl/2019/01/19/phantom-types-to-encode-state-in-c/#respond Sat, 19 Jan 2019 09:00:59 +0000 https://blog.adamfurmanek.pl/?p=2714 Continue reading Phantom types to encode state in C#]]> You cannot add or remove method from generic interface based on the type. Also, CLR doesn’t support higher kinded types (as of now). Let’s say that we want to encode state of an object in its type and disallow some operations depending on the state.

In our example we will use issue. We can assign issue if it is unassigned. We cannot assign it for the second time. We can move issue from backlog to sprint (so it is a todo item) but we cannot do it for the second time. Of course, those requirements are just for an example.

Let’s go with the following code:

public interface IAssignedState {
}

public class Assigned : IAssignedState {
	Assigned() {}
}

public class Unassigned : IAssignedState{
	public Unassigned() {}
}

public interface IAssignable< T,U> where T : IAssignedState {
	U assign(T t);
}

public interface IBacklogState {
}

public class InBacklog : IBacklogState {
	public InBacklog() {}
}

public class Todo : IBacklogState {
	Todo() {}
}

public interface ITodoable< T, U> where T : IBacklogState {
	U addToSprint(T t);
}


public class Issue< TAssignableState, TBacklogState> : IAssignable< TAssignableState, Issue< Assigned, TBacklogState>>, ITodoable< TBacklogState, Issue< TAssignableState, Todo>>
	where TAssignableState : IAssignedState 
	where TBacklogState : IBacklogState {
	
	public Issue< Assigned, TBacklogState> assign(TAssignableState t) {
		return new Issue< Assigned, TBacklogState>();
	}
	
	public Issue< TAssignableState, Todo> addToSprint(TBacklogState t){
		return new Issue< TAssignableState, Todo>();
	}
}

The trick here is to use phantom types, types which we cannot create. In this example those are Assigned and Todo. So we can do this:

public class Program
{
    public static void onlyUnassignedIssues< TBacklogState>(Issue< Unassigned, TBacklogState> a) where TBacklogState : IBacklogState{
    }
	
    public static void Main()
    {
        Issue< Unassigned, InBacklog> unassigned = new Issue< Unassigned, InBacklog>();
        onlyUnassignedIssues(unassigned);
        Issue< Assigned, InBacklog> assigned = unassigned.assign(new Unassigned());
    }
}

But we cannot do this:

onlyUnassignedIssues(assigned); // I cannot pass assigned issue
Issue< Assigned, InBacklog> assigned2 = assigned.assign(new Assigned()); // I cannot assign issue again

Issue< Unassigned, Todo> todo = unassigned.addToSprint(new InBacklog()); // I can first move it to sprint
Issue< Assigned, Todo> assignedTodo = todo.assign(new Unassigned()); // And then assign

Issue< Assigned, Todo> movedToSprintAgain = assignedTodo.addToSprint(new Todo()); // But I cannot move it to sprint again

If we didn’t use generics, we would need to use a lot of inheritance: AssignableTodoableIssue, TodoableAssignedIssue, AssignableTodoIssue, AssignedTodoIssue. Imagine now adding more state.

You can find similar solution in Scala in Action, look for phantom types.

]]>
https://blog.adamfurmanek.pl/2019/01/19/phantom-types-to-encode-state-in-c/feed/ 0
Dynamically loading JAR file in Zeppelin https://blog.adamfurmanek.pl/2018/07/14/dynamically-loading-jar-file-in-zeppelin/ https://blog.adamfurmanek.pl/2018/07/14/dynamically-loading-jar-file-in-zeppelin/#respond Sat, 14 Jul 2018 08:00:24 +0000 https://blog.adamfurmanek.pl/?p=2535 Continue reading Dynamically loading JAR file in Zeppelin]]> Imagine that you need to load JAR file dynamically in Zeppelin working on your EMR cluster. One easy way is to deploy the file to the instance and load it from there, however, what can you do if you have almost no access to the cluster and the filesystem? You can load the JAR from S3 and load it dynamically via custom classloader.

First, load the file:

val jarBinary = sc.binaryFiles("s3://bucket/file.jar").map(_._2.toArray).collect.head

Next, implement the classloader:

class RemoteClassLoader(jarBytes: Array[Byte]) extends ClassLoader{
  override def loadClass(name: String, resolve: Boolean): Class[_] = {
    var clazz = findLoadedClass(name)
    if(clazz != null){
      return clazz
    }
    try{
      val in = getResourceAsStream(name.replace(".", "/") + ".class")
      val out = new java.io.ByteArrayOutputStream()
      copy(in, out)
      val bytes = out.toByteArray
      clazz = defineClass(name, bytes, 0, bytes.length)
      if(resolve){
        resolveClass(clazz)
      }
    }catch{
      case e: Exception => clazz = super.loadClass(name, resolve)
    }
    return clazz
  }
  override def getResource(name: String) = null
  override def getResourceAsStream(name: String): java.io.InputStream = {
    try{
      val jis = new java.util.jar.JarInputStream(new java.io.ByteArrayInputStream(jarBytes))
      var entry = jis.getNextJarEntry
      while(entry != null){
        if(entry.getName().equals(name)){
          return jis;
        }
        entry = jis.getNextJarEntry
      }
    }catch{
      case e: Exception => return null
    }
    return null
  }
  def copy(from: java.io.InputStream, to: java.io.OutputStream): Long = {
    val buf = new Array[Byte](8192)
    var total = 0
    while (true) {
      val r = from.read(buf)
      if (r == -1) return total
      to.write(buf, 0, r)
      total += r
    }
    total
  }
}

It extracts JAR from byte array and goes through the resources. Finally, just create the class:

val loader = new RemoteClassLoader(jarBinary);
val classToLoad = Class.forName("pl.adamfurmanek.blog.SampleClass", true, loader);
val instance = classToLoad.newInstance();

Of course, using this instance will be harder as it is loaded in different classloader so you will probably need a lot of reflection.

]]>
https://blog.adamfurmanek.pl/2018/07/14/dynamically-loading-jar-file-in-zeppelin/feed/ 0
Generating class in Zeppelin https://blog.adamfurmanek.pl/2018/07/07/generating-class-in-zeppelin/ https://blog.adamfurmanek.pl/2018/07/07/generating-class-in-zeppelin/#respond Sat, 07 Jul 2018 08:00:45 +0000 https://blog.adamfurmanek.pl/?p=2533 Continue reading Generating class in Zeppelin]]> If you want to declare a class in Zeppelin and create instance of it, you might be surprised:

class K

defined class K

classOf[K].newInstance()

java.lang.InstantiationException: K
at java.lang.Class.newInstance(Class.java:427)
... 52 elided
Caused by: java.lang.NoSuchMethodException: K.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)
... 52 more

What happened? Whatever you declare in Zeppelin notebook is a part of some internal class so the newly declared class K doesn’t have parameterless constructor because it expects the instance of enclosing type. There are two simple ways to handle this.

Quasiquotes

Just generate a class with scala’s quaisquotes:

import reflect.runtime._
import universe._
import tools.reflect.ToolBox

val tb = currentMirro.mkToolBox()
val l = tb.compile(q"""class L; classOf[L].newInstance()""")()

l: Any = __wrapper$8$bb3239e978f24dc98e740075eecad313.__wrapper$8$bb3239e978f24dc98e740075eeacad313$L$1@7e2b9e13

Javax.tools

Use the following method to dynamically compile java code:

def generateClass(className: String, source: String): (Class[_], Any) = {
    val byteArrayOutputStream = new java.io.ByteArrayOutputStream()
    val simpleJavaFileObject = new javax.tools.SimpleJavaFileObject(java.net.URI.create(s"$className.java"), javax.tools.JavaFileObject.Kind.SOURCE) {
        override def getCharContent(ignoreEncodingErrors: Boolean):CharSequence = {
            return source;
        }
        override def openOutputStream(): java.io.OutputStream = {
            return byteArrayOutputStream;
        }
    };
    val standardManager = javax.tools.ToolProvider.getSystemJavaCompiler().getStandardFileManager(null, null, null);
    val customForwardingManager = new javax.tools.JavaFileManager {
        override def close() = standardManager.close()
        override def flush() = standardManager.flush()
        override def getClassLoader(location: javax.tools.JavaFileManager.Location) = standardManager.getClassLoader(location)
        override def getFileForInput(location: javax.tools.JavaFileManager.Location, packageName: String, relativeName: String) = standardManager.getFileForInput(location, packageName, relativeName)
        override def getFileForOutput(location: javax.tools.JavaFileManager.Location, packageName: String, relativeName: String, sibling: javax.tools.FileObject) = standardManager.getFileForOutput(location, packageName, relativeName, sibling)
        override def getJavaFileForInput(location: javax.tools.JavaFileManager.Location, className: String, kind: javax.tools.JavaFileObject.Kind) = standardManager.getJavaFileForInput(location, className, kind)
        override def getJavaFileForOutput(location: javax.tools.JavaFileManager.Location,
                                                   className: String,
                                                   kind: javax.tools.JavaFileObject.Kind,
                                                   sibling: javax.tools.FileObject): javax.tools.JavaFileObject = {
            return simpleJavaFileObject;
        }
        override def handleOption(current: String, remaining: java.util.Iterator[String]) = standardManager.handleOption(current, remaining)
        override def hasLocation(location: javax.tools.JavaFileManager.Location) = standardManager.hasLocation(location)
        override def inferBinaryName(location: javax.tools.JavaFileManager.Location, file: javax.tools.JavaFileObject) = standardManager.inferBinaryName(location, file)
        override def isSameFile(a: javax.tools.FileObject, b: javax.tools.FileObject) = standardManager.isSameFile(a, b)
        override def isSupportedOption(option: String) = standardManager.isSupportedOption(option)
        override def list(location: javax.tools.JavaFileManager.Location, packageName: String, kinds: java.util.Set[javax.tools.JavaFileObject.Kind], recurse: Boolean) = standardManager.list(location, packageName, kinds, recurse)
    }
    val list = new java.util.ArrayList[javax.tools.JavaFileObject]()
    list.add(simpleJavaFileObject)
    javax.tools.ToolProvider.getSystemJavaCompiler().getTask(null, customForwardingManager, null, null, null, list).call();
    val bytes = byteArrayOutputStream.toByteArray();
    val f = classOf[sun.misc.Unsafe].getDeclaredField("theUnsafe");
    f.setAccessible(true);
    val unsafe: sun.misc.Unsafe = f.get(null).asInstanceOf[sun.misc.Unsafe];
    val aClass = unsafe.defineClass(className, bytes, 0, bytes.length, null, null);
    val o = aClass.newInstance();
    (aClass, o)
}

Invoke it like this:

val (kClass, kInstance) = generateClass("K", """
public class K{
    public K(){}
}
""")

kClass: Class[_] = class K
kInstance: Any = K@adfd330

And you are done.

]]>
https://blog.adamfurmanek.pl/2018/07/07/generating-class-in-zeppelin/feed/ 0