Databases – Random IT Utensils https://blog.adamfurmanek.pl IT, operating systems, maths, and more. Thu, 19 Mar 2020 00:51:49 +0000 en-US hourly 1 https://wordpress.org/?v=6.7.1 Data encryption in s3 in Spark in EMR with multiple encryption schemes https://blog.adamfurmanek.pl/2020/04/04/data-encryption-in-s3-in-spark-in-emr-with-multiple-encryption-schemes/ https://blog.adamfurmanek.pl/2020/04/04/data-encryption-in-s3-in-spark-in-emr-with-multiple-encryption-schemes/#respond Sat, 04 Apr 2020 08:00:59 +0000 https://blog.adamfurmanek.pl/?p=3269 Continue reading Data encryption in s3 in Spark in EMR with multiple encryption schemes]]> Spark supports multiple encryption schemes. You can use client side encryption, server side encryption, etc. What wasn’t working for me for a long time is reading encrypted data and writing as a plain text. Before reading I was configuring encryption and this was working fine. However, writing as plain text didn’t work (data was encrypted), even though I was disabling encryption.

I was told that this is because encryption settings are cached and my changes are not honored. However, what works for me now is using different access protocols to read and write s3 files.

So, for configuration do this:

// Enable CSE for s3:// prefix  
spark.conf.set("fs.s3.enableServerSideEncryption", "false")
spark.conf.set("fs.s3.cse.enabled", "true")
spark.conf.set("fs.s3.cse.encryptionMaterialsProvider", "com.amazon.ws.emr.hadoop.fs.cse.KMSEncryptionMaterialsProvider")
spark.conf.set("fs.s3.cse.kms.keyId", "KMS ID") // KMS key to encrypt the data with
spark.conf.set("fs.s3.cse.kms.region", "us-east-1") // the region for the KMS key

// Disable CSE for s3a:// prefix to not encrypt 
spark.conf.set("fs.s3a.enableServerSideEncryption", "false")
spark.conf.set("fs.s3a.cse.enabled", "false")
spark.conf.set("fs.s3a.canned.acl","BucketOwnerFullControl")
spark.conf.set("fs.s3a.acl.default","BucketOwnerFullControl")
spark.conf.set("fs.s3a.acl","bucket-owner-full-control")

or in Python do this:

# Enable CSE for s3:// prefix 
spark._jsc.hadoopConfiguration().set("fs.s3.enableServerSideEncryption", "false")
spark._jsc.hadoopConfiguration().set("fs.s3.cse.enabled", "true")
spark._jsc.hadoopConfiguration().set("fs.s3.cse.encryptionMaterialsProvider", "com.amazon.ws.emr.hadoop.fs.cse.KMSEncryptionMaterialsProvider")
spark._jsc.hadoopConfiguration().set("fs.s3.cse.kms.keyId", "KMS ID") # KMS key to encrypt the data with
spark._jsc.hadoopConfiguration().set("fs.s3.cse.kms.region", "us-east-1") # the region for the KMS key

# Disable CSE for s3a:// prefix to not encrypt 
spark._jsc.hadoopConfiguration().set("fs.s3a.enableServerSideEncryption", "false")
spark._jsc.hadoopConfiguration().set("fs.s3a.cse.enabled", "false")
spark._jsc.hadoopConfiguration().set("fs.s3a.canned.acl","BucketOwnerFullControl")
spark._jsc.hadoopConfiguration().set("fs.s3a.acl.default","BucketOwnerFullControl")
spark._jsc.hadoopConfiguration().set("fs.s3a.acl","bucket-owner-full-control")

Now, when you read or write file using s3 prefix, it uses encryption with KMS key. However, if you read or write using s3a, it doesn’t encrypt. You can use s3n prefix to configure yet another encryption scheme. If you want to do more, you need to dig into protocol handlers.

]]>
https://blog.adamfurmanek.pl/2020/04/04/data-encryption-in-s3-in-spark-in-emr-with-multiple-encryption-schemes/feed/ 0
Running Anaconda with DGL and mxnet on CUDA GPU in Spark running in EMR https://blog.adamfurmanek.pl/2020/03/28/running-anaconda-with-dgl-and-mxnet-on-cuda-gpu-in-spark-running-in-emr/ https://blog.adamfurmanek.pl/2020/03/28/running-anaconda-with-dgl-and-mxnet-on-cuda-gpu-in-spark-running-in-emr/#respond Sat, 28 Mar 2020 09:00:14 +0000 https://blog.adamfurmanek.pl/?p=3266 Continue reading Running Anaconda with DGL and mxnet on CUDA GPU in Spark running in EMR]]> Today I’m going to share my configuration for running custom Anaconda Python with DGL (Deep Graph Library) and mxnet library, with GPU support via CUDA, running in Spark hosted in EMR. Actually, I have Redshift configuration as well, with support for gensim, tensorflow, keras, theano, pygpu, and cloudpickle. You can also install more libraries if needed. All this for Google to index keywords. Let’s begin.

My configuration uses EMR 5.17.2 and CUDA 9.2. When I’m writing it, there is EMR 5.27 available but it comes with the same CUDA version so I presume it should work as well. I’m also using Python 3.7.

First, create a cluster. Do not select mxnet as a provided library in EMR, we will install it later. As a master node use p3.8xlarge instance type — this instance must have GPU and this is where we will run DGL and mxnet. For slaves you can use anything, I’m going with 19 r3.4xlarge nodes (they don’t have GPU).

We need to install some custom libraries. I am using bootstrap script for that but you can just SSH into the host manually and run this code:

sudo mkdir /mnt/usr-moved
sudo mv /usr/local /mnt/usr-moved/
sudo ln -s /mnt/usr-moved/local /usr/
sudo mv /usr/share /mnt/usr-moved/
sudo ln -s /mnt/usr-moved/share /usr/

sudo mkdir /mnt/home
sudo chown hadoop.hadoop /mnt/home

wget https://repo.anaconda.com/archive/Anaconda3-2019.03-Linux-x86_64.sh -O ~/anaconda.sh
bash ~/anaconda.sh -b -p /mnt/home/hadoop/anaconda
echo -e '\nexport PATH=/mnt/home/hadoop/anaconda/bin:$PATH' >> $HOME/.bashrc && source $HOME/.bashrc
echo -e '\nexport PYSPARK_PYTHON=/mnt/home/hadoop/anaconda/bin/python' >> $HOME/.bashrc && source $HOME/.bashrc

/mnt/home/hadoop/anaconda/bin/conda install -y gensim
/mnt/home/hadoop/anaconda/bin/conda install -y tensorflow
/mnt/home/hadoop/anaconda/bin/conda install -y keras
/mnt/home/hadoop/anaconda/bin/conda install -y theano
/mnt/home/hadoop/anaconda/bin/conda install -y pygpu
/mnt/home/hadoop/anaconda/bin/conda upgrade -y cloudpickle
yes | sudo /mnt/home/hadoop/anaconda/bin/pip install --pre mxnet-cu92mkl
/mnt/home/hadoop/anaconda/bin/conda install -y -c dglteam dgl-cuda9.2
/mnt/home/hadoop/anaconda/bin/conda install -y s3fs

First, I’m making a symlink to not fill the disk while installing packages. Then in line 10 I download Anaconda. Finally, lines 15-23 install some additional libraries. Notice that in line 21 I install mxnet compiled for CUDA 9.2, and in line 22 the same for DGL. Also, s3fs is required for nice reading from s3.

When this is done and cluster is created, I replace Python for Zeppelin interpreter to point to /mnt/home/hadoop/anaconda/bin/python and add Redshift configuration. I do this with the following command line (this you need to run manually after the cluster is created):

sudo /usr/lib/zeppelin/bin/install-interpreter.sh --name jdbc
cd /usr/lib/zeppelin/interpreter/jdbc/
sudo wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

sudo cat | sudo tee /etc/zeppelin/conf/interpreter.json <<'endmsg'
{
  "interpreterSettings": {
    "2ANGGHHMQ": {
      "id": "2ANGGHHMQ",
      "name": "spark",
      "group": "spark",
      "properties": {
        "zeppelin.spark.printREPLOutput": "true",
        "spark.yarn.jar": "",
        "master": "yarn-client",
        "zeppelin.spark.maxResult": "1000",
        "zeppelin.dep.localrepo": "/usr/lib/zeppelin/local-repo",
        "spark.app.name": "Zeppelin",
        "zeppelin.spark.importImplicit": "true",
        "zeppelin.spark.useHiveContext": "true",
        "args": "",
        "spark.home": "/usr/lib/spark",
        "zeppelin.spark.concurrentSQL": "false",
        "zeppelin.pyspark.python": "/mnt/home/hadoop/anaconda/bin/python"
      },
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "spark",
          "class": "org.apache.zeppelin.spark.SparkInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "scala",
            "editOnDblClick": false
          }
        },
        {
          "name": "pyspark",
          "class": "org.apache.zeppelin.spark.PySparkInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "python",
            "editOnDblClick": false
          }
        },
        {
          "name": "sql",
          "class": "org.apache.zeppelin.spark.SparkSqlInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "sql",
            "editOnDblClick": false
          }
        }
      ],
      "dependencies": [
        {
          "groupArtifactVersion": "/usr/lib/zeppelin/interpreter/jdbc/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar",
          "local": false
        }
      ],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
                "setPermission": false,
        "users": [],
        "isUserImpersonate": false
      }
    },
    "2AM1YV5CU": {
      "id": "2AM1YV5CU",
      "name": "angular",
      "group": "angular",
      "properties": {},
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "angular",
          "class": "org.apache.zeppelin.angular.AngularInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "editOnDblClick": true
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "isUserImpersonate": false
      }
    },
    "2BRWU4WXC": {
      "id": "2BRWU4WXC",
      "name": "python",
      "group": "python",
      "properties": {
        "zeppelin.python": "/mnt/home/hadoop/anaconda/bin/python",
        "zeppelin.python.maxResult": "1000"
      },
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "python",
          "class": "org.apache.zeppelin.python.PythonInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "python",
            "editOnDblClick": false
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "users": [],
        "isUserImpersonate": false
              }
    },
    "2AJXGMUUJ": {
      "id": "2AJXGMUUJ",
      "name": "md",
      "group": "md",
      "properties": {},
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "md",
          "class": "org.apache.zeppelin.markdown.Markdown",
          "defaultInterpreter": false,
          "editor": {
            "language": "markdown",
            "editOnDblClick": true
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "isUserImpersonate": false
      }
    },
    "2AKK3QQXU": {
      "id": "2AKK3QQXU",
      "name": "sh",
      "group": "sh",
      "properties": {
        "shell.command.timeout.millisecs": "60000"
      },
      "status": "READY",
      "interpreterGroup": [
        {
          "name": "sh",
          "class": "org.apache.zeppelin.shell.ShellInterpreter",
          "defaultInterpreter": false,
          "editor": {
            "language": "sh",
            "editOnDblClick": false
          }
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "port": -1,
        "perNote": "shared",
        "perUser": "shared",
        "isExistingProcess": false,
        "setPermission": false,
        "isUserImpersonate": false
      }
    }
  },
  "interpreterBindings": {
    "2EMW16J14": [
      "2ANGGHHMQ",
      "2AJXGMUUJ",
      "2AM1YV5CU",
      "2AKK3QQXU",
      "2BRWU4WXC"
    ],
    "2A94M5J1Z": [
      "2ANGGHHMQ",
      "2AJXGMUUJ",
      "2AM1YV5CU",
      "2AKK3QQXU",
      "2BRWU4WXC"
    ]
  },
  "interpreterRepositories": [
    {
      "id": "central",
      "type": "default",
      "url": "http://repo1.maven.org/maven2/",
      "releasePolicy": {
      "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "snapshotPolicy": {
        "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "mirroredRepositories": [],
      "repositoryManager": false
    },
    {
      "id": "local",
      "type": "default",
      "url": "file:///var/lib/zeppelin/.m2/repository",
      "releasePolicy": {
        "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "snapshotPolicy": {
        "enabled": true,
        "updatePolicy": "daily",
        "checksumPolicy": "warn"
      },
      "mirroredRepositories": [],
      "repositoryManager": false
    }
  ]
}
endmsg

Now, I need to tune default spark submit options:

sudo cat | sudo tee /etc/zeppelin/conf.dist/zeppelin-env.sh <<'endmsg'
export ZEPPELIN_PORT=8890
export ZEPPELIN_CONF_DIR=/etc/zeppelin/conf
export ZEPPELIN_LOG_DIR=/var/log/zeppelin
export ZEPPELIN_PID_DIR=/var/run/zeppelin
export ZEPPELIN_PID=$ZEPPELIN_PID_DIR/zeppelin.pid
export ZEPPELIN_WAR_TEMPDIR=/var/run/zeppelin/webapps
export ZEPPELIN_NOTEBOOK_DIR=/var/lib/zeppelin/notebook
export MASTER=yarn-client
export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf
export LD_LIBRARY_PATH=/usr/local/cuda/lib64/
export CLASSPATH=":/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar"


export SPARK_SUBMIT_OPTIONS="--jars=YOUR_JARS_HERE --conf spark.executor.extraClassPath=/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/* --conf spark.driver.extraClassPath=/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/* --conf 'spark.executorEnv.PYTHONPATH=/usr/lib/spark/python/lib/py4j-src.zip:/usr/lib/spark/python/:<CPS>{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-src.zip' --conf spark.yarn.isPython=true --conf spark.driver.maxResultSize=80G --num-executors 56 --executor-cores 5 --executor-memory 38G --driver-memory 90G --conf 'spark.dynamicAllocation.enabled=false' --conf 'spark.hadoop.fs.s3.canned.acl=BucketOwnerFullControl' --conf 'spark.hadoop.fs.s3.acl.default=BucketOwnerFullControl' --conf 'spark.hadoop.fs.s3.acl=bucket-owner-full-control' --conf 'spark.hadoop.fs.s3n.acl=bucket-owner-full-control' --conf 'spark.hadoop.fs.s3.cse.enabled=false'"
export ZEPPELIN_NOTEBOOK_STORAGE=org.apache.zeppelin.notebook.repo.S3NotebookRepo
endmsg

This is not the full content! I omit some of my internal settings so generally don’t copy it blindly, just extend the zeppelin-env.sh file as needed. Important things are:
export LD_LIBRARY_PATH=/usr/local/cuda/lib64/ — this points to CUDA libraries
--conf spark.driver.maxResultSize=80G --num-executors 56 --executor-cores 5 --executor-memory 38G --driver-memory 90G --conf 'spark.dynamicAllocation.enabled=false' — this configures executors and memory. You need to tune it for your cluster size.

Now, restart Zeppelin. You should now be able to run:

%spark.pyspark
print(mx.__version__)
print(np.__version__)

1.6.0
1.14.6

Now you can create GPU context:

ctx = mx.gpu(0)

and it should work as a charm.

So now you have power of Spark — you can easily distribute job and use all slaves. And also, you have GPU at your hand, so whenever you use ndarray from mxnet, it can use the GPU power.

If you don’t want to use GPU, then just install these libraries instead:

yes | sudo /mnt/home/hadoop/anaconda/bin/pip install --pre mxnet
/mnt/home/hadoop/anaconda/bin/conda install -y -c dglteam dgl

and use mx.cpu() context. This works as well, obviously, much slower. For my use case GPU calculations were 80 times faster than when running on CPU.

]]>
https://blog.adamfurmanek.pl/2020/03/28/running-anaconda-with-dgl-and-mxnet-on-cuda-gpu-in-spark-running-in-emr/feed/ 0
Running any query in Redshift or JDBC from Spark in EMR https://blog.adamfurmanek.pl/2020/03/21/running-any-query-in-redshift-or-jdbc-from-spark-in-emr/ https://blog.adamfurmanek.pl/2020/03/21/running-any-query-in-redshift-or-jdbc-from-spark-in-emr/#respond Sat, 21 Mar 2020 09:00:20 +0000 https://blog.adamfurmanek.pl/?p=3264 Continue reading Running any query in Redshift or JDBC from Spark in EMR]]> Last time we saw how to connect to Redshift from Spark running in EMR. Provided solution was nice but allowed for reading data only. Sometimes we might want to run any DDL or DML query, not only simple read statements.

To do that, we need to connect to Redshift directly over JDBC. I assume you configured your cluster the same way as in the previous part. Now use this code:

def executeNonQuery(query: String) = {
    import java.sql._
    import java.util._
    
    var connectionProps = new Properties();

    var connection = DriverManager.getConnection(getConnectionString(), connectionProps);
    var statement = connection.createStatement(query);
    statement.executeUpdate();
}

We first import packages for JDBC. Next, we create new properties for the connection which can be empty. Then, we open the connection using JDBC infrastructure, prepare the query and execute it. Please remember that this query must return row count so it shouldn’t be SELECT query.

We can use it like this:

%spark
executeNonQuery(s"""DROP TABLE IF EXISTS table""")

Query is pretty much anything you can run in SQL Workbench. It works with temporary tables as well (unlike prepareStatement).

]]>
https://blog.adamfurmanek.pl/2020/03/21/running-any-query-in-redshift-or-jdbc-from-spark-in-emr/feed/ 0
Connecting to Redshift from Spark running in EMR https://blog.adamfurmanek.pl/2020/03/14/connecting-to-redshift-from-spark-running-in-emr/ https://blog.adamfurmanek.pl/2020/03/14/connecting-to-redshift-from-spark-running-in-emr/#respond Sat, 14 Mar 2020 09:00:22 +0000 https://blog.adamfurmanek.pl/?p=3261 Continue reading Connecting to Redshift from Spark running in EMR]]> Today I’ll share my configuration for Spark running in EMR to connect to Redshift cluster. First, I assume the cluster is accessible (so configure virtual subnet, allowed IPs and all network stuff before running this).

I’m using Zeppelin so I’ll show two interpreters configured for the connection, but the same thing should work with standalone job (as long as it has the same libraries configured). I tested things with EMR 5.17.2 but it should work with other versions as well.

Redshift interpreter

First, let’s configure separate interpreter to use in Zeppelin. SSH into the master node of the cluster and install JDBC interpreter:

sudo /usr/lib/zeppelin/bin/install-interpreter.sh --name jdbc

Next, download the driver:

cd /usr/lib/zeppelin/interpreter/jdbc/
sudo wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

Restart zeppelin:

sudo stop zeppelin
sudo start zeppelin

Go to interpreters configuration in Zeppelin and add new JDBC named redshift. Use the following settings:

default.driver	com.amazon.redshift.jdbc42.Driver
default.url	jdbc:redshift://your-redshift-instance-address.redshift.amazonaws.com:5439/your-database
default.user	redshift_user
default.password	redshift_password

Now create new paragraph like below:

%redshift
SELECT * FROM table

And it should work.

Spark interpreter

Download driver the same way as before. Now, go to interpreter settings and add dependency to Spark interpreter:

/usr/lib/zeppelin/interpreter/jdbc/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

Now you can start reading data like this:

%spark
import org.apache.spark.sql._

def getConnectionString() = {
    val url = "cluster url"
    val port = 8192
    val database = "database name"
    val user = "user"
    val password = "password"
    
    s"jdbc:redshift://${url}:$port/$database?user=$user&password=$password"
}

def runQuery(query: String) = {
    val df: DataFrame = sqlContext.read
      .format("jdbc")
      .option("driver", "com.amazon.redshift.jdbc42.Driver")
      .option("url", getConnectionString())
      .option("dbtable", s"($query) tmp")
      .load()
    
    df
}

var table = runQuery(s"""
SELECT * FROM Table AS t
""")

This is even nicer because you can use string interpolation to provide parameters for queries.

]]>
https://blog.adamfurmanek.pl/2020/03/14/connecting-to-redshift-from-spark-running-in-emr/feed/ 0
Aborted transactions in Redshift due to Serializable isolation level https://blog.adamfurmanek.pl/2020/02/22/aborted-transactions-in-redshift-due-to-serializable-isolation-level/ https://blog.adamfurmanek.pl/2020/02/22/aborted-transactions-in-redshift-due-to-serializable-isolation-level/#respond Sat, 22 Feb 2020 09:00:27 +0000 https://blog.adamfurmanek.pl/?p=3246 Continue reading Aborted transactions in Redshift due to Serializable isolation level]]> I was working with seemingly simple load job in Redshift. I was generating some data and wanted to store the output in separate table, just to persist it for debugging purposes. However, since the job could be reran if needed, I wanted to remove rows generated previously and add new ones. So I tried the following:

CREATE TABLE locks_tests (
  market INT,
  value INT
);

INSERT INTO locks_tests(market, value) VALUES
  (1, 1),
  (1, 2),
  (2, 1),
  (2, 2)
;

This is our table. Let’s assume that job can generate data for market 1 or market 2. So the job for market 1 does this:

DELETE FROM locks_tests WHERE market = 1;
INSERT INTO locks_tests VALUES (1, 1), (1,2);

Similarly, job for market 2 does this:

DELETE FROM locks_tests WHERE market = 2;
INSERT INTO locks_tests VALUES (2, 1), (2,2);

As you can see, both jobs use the same table but access different rows.

Now the thing is, if you try running both transactions in parallel, most likely you will get an error that transactions collide and one of them had to be aborted.

I didn’t understand why it works this way as per my understanding there is nothing wrong. Either it should use table lock and one transaction should wait for the other, ot it should use some range locks and figure out that there is nothing wrong.

Unfortunately, it is not that simple. Since Redshift is based on Postgresql, we can reproduce the problem over there as well. For instance, see dba.stackexchange.

Okay, we know what the problem is, how to solve it? I considered locking tables explicitly with LOCK statement but I never find it nice. I checked if it is possible to control partitions of the table but it doesn’t seem to be allowed in Redshift. I thought about doing so called “poor men’s clustering” with multiple tables joined using single view, but I didn’t want to increase the complexity.

Finally, I decided to go with simple MVCC-like approach. Add new column to the table, indicating timestamp of inserted row:

CREATE TABLE locks_tests (
  market INT,
  value INT,
  insert_time TIMESTAMP
);

Now, when inserting data to the table, use this:

INSERT INTO locks_tests(market, value) VALUES
  (1, 1, getdate())
;

Finally, prepare one view extracting only latest data:

CREATE OR REPLACE VIEW locks_tests_view AS (
  WITH max_timestamps AS (
    SELECT market, MAX(insert_time) AS max_insert_time
    FROM locks_tests
    GROUP BY market
  )
  SELECT C.*
  FROM locks_tests AS C
  JOIN max_timestamps AS T ON T.market = C.market
  WHERE C.insert_time = T.max_insert_time
);

And now query the view instead of the original table. You can also run deletion job periodically if needed.

]]>
https://blog.adamfurmanek.pl/2020/02/22/aborted-transactions-in-redshift-due-to-serializable-isolation-level/feed/ 0
Machine Learning Part 8 — Backpropagation in neural net in SQL https://blog.adamfurmanek.pl/2019/07/27/machine-learning-part-8/ https://blog.adamfurmanek.pl/2019/07/27/machine-learning-part-8/#respond Sat, 27 Jul 2019 08:00:18 +0000 https://blog.adamfurmanek.pl/?p=3004 Continue reading Machine Learning Part 8 — Backpropagation in neural net in SQL]]>

This is the eighth part of the ML series. For your convenience you can find other parts in the table of contents in Part 1 – Linear regression in MXNet

Last time we saw forward propagation in neural net. Today we are going to extend the process to backpropagate the errors. Let’s begin.

We need to add some more definitions to calculate output:

CREATE TABLE outputs (
  outputNode NUMERIC,
  outputValue NUMERIC
);

INSERT INTO outputs VALUES
    (1, 290)
   ,(2, 399)
   ,(3, 505)
;

Before we see some SQL code, let’s do some math. We had three layers (input, hidden, output), in input and output layers we used linear activation function. Hidden layer used ReLU.

We start with calculating loss function. We use normal squared error:

    \begin{gather*} Loss = \left[\begin{array}{c} \frac{\left(y^{out}_1 - target_1\right)^2 }{ 2 } \\ \frac{\left(y^{out}_2 - target_2\right)^2 }{ 2 } \end{array}\right] \end{gather*}

Now let’s calculate partial derivatives to update weights between hidden layer and output layer:

    \begin{gather*} \left[\begin{array}{ccc} \frac{\partial Loss}{\partial W^2_{1,1}} & \frac{\partial Loss}{\partial W^2_{1,2}} & \frac{\partial Loss}{\partial W^2_{1,3}} \\ \frac{\partial Loss}{\partial W^2_{2,1}} & \frac{\partial Loss}{\partial W^2_{2,2}} & \frac{\partial Loss}{\partial W^2_{2,3}} \end{array}\right] =  \left[\begin{array}{ccc}  \frac{\partial Loss}{\partial y^{out}_1 } \frac{\partial y^{out}_1 }{\partial y^{in}_1} \frac{\partial y^{in}_1}{\partial W^2_{1,1}} & \frac{\partial Loss}{\partial y^{out}_2 } \frac{\partial y^{out}_2 }{\partial y^{in}_2} \frac{\partial y^{in}_2}{\partial W^2_{1,2}} & \frac{\partial Loss}{\partial y^{out}_3 } \frac{\partial y^{out}_3 }{\partial y^{in}_3} \frac{\partial y^{in}_3}{\partial W^2_{1,3}} \\ \frac{\partial Loss}{\partial y^{out}_1 } \frac{\partial y^{out}_1 }{\partial y^{in}_1} \frac{\partial y^{in}_1}{\partial W^2_{2,1}} & \frac{\partial Loss}{\partial y^{out}_2 } \frac{\partial y^{out}_2 }{\partial y^{in}_2} \frac{\partial y^{in}_2}{\partial W^2_{2,2}} & \frac{\partial Loss}{\partial y^{out}_3 } \frac{\partial y^{out}_3 }{\partial y^{in}_3} \frac{\partial y^{in}_3}{\partial W^2_{2,3}} \end{array}\right]  =\\ \left[\begin{array}{ccc}  (y^{out}_1 - target_1) \cdot 1 \cdot h^{out}_1 & (y^{out}_2 - target_2) \cdot 1 \cdot h^{out}_1 & (y^{out}_3 - target_3) \cdot 1 \cdot h^{out}_1 \\ (y^{out}_1 - target_1) \cdot 1 \cdot h^{out}_2 & (y^{out}_2 - target_2) \cdot 1 \cdot h^{out}_2 & (y^{out}_3 - target_3) \cdot 1 \cdot h^{out}_2 \\ \end{array}\right]  \end{gather*}

Now, the same for biases:

    \begin{gather*} \left[\begin{array}{ccc} \frac{\partial Loss}{\partial b^2_{1,1}} & \frac{\partial Loss}{\partial b^2_{1,2}} & \frac{\partial Loss}{\partial b^2_{1,3}} \\ \frac{\partial Loss}{\partial b^2_{2,1}} & \frac{\partial Loss}{\partial b^2_{2,2}} & \frac{\partial Loss}{\partial b^2_{2,3}} \end{array}\right] =  \left[\begin{array}{ccc}  \frac{\partial Loss}{\partial y^{out}_1 } \frac{\partial y^{out}_1 }{\partial y^{in}_1} \frac{\partial y^{in}_1}{\partial b^2_{1,1}} & \frac{\partial Loss}{\partial y^{out}_2 } \frac{\partial y^{out}_2 }{\partial y^{in}_2} \frac{\partial y^{in}_2}{\partial b^2_{1,2}} & \frac{\partial Loss}{\partial y^{out}_3 } \frac{\partial y^{out}_3 }{\partial y^{in}_3} \frac{\partial y^{in}_3}{\partial b^2_{1,3}} \\ \frac{\partial Loss}{\partial y^{out}_1 } \frac{\partial y^{out}_1 }{\partial y^{in}_1} \frac{\partial y^{in}_1}{\partial b^2_{2,1}} & \frac{\partial Loss}{\partial y^{out}_2 } \frac{\partial y^{out}_2 }{\partial y^{in}_2} \frac{\partial y^{in}_2}{\partial b^2_{2,2}} & \frac{\partial Loss}{\partial y^{out}_3 } \frac{\partial y^{out}_3 }{\partial y^{in}_3} \frac{\partial y^{in}_3}{\partial b^2_{2,3}} \end{array}\right]  =\\ \left[\begin{array}{ccc}  (y^{out}_1 - target_1) \cdot 1 \cdot 1 & (y^{out}_2 - target_2) \cdot 1 \cdot 1 & (y^{out}_3 - target_3) \cdot 1 \cdot 1 \\ (y^{out}_1 - target_1) \cdot 1 \cdot 1 & (y^{out}_2 - target_2) \cdot 1 \cdot 1 & (y^{out}_3 - target_3) \cdot 1 \cdot 1 \\ \end{array}\right]  \end{gather*}

That was easy. Now we use learning rate equal to 0.1 and we can update both weights and biases between hidden layer and output layer.

Similar things go for other updates. If you are lost, you can find great explanation here.

Let’s now see the code:

WITH RECURSIVE currentPhase AS(
	SELECT CAST(0 AS NUMERIC) AS phase
),
oneRow AS(
	SELECT CAST(NULL AS NUMERIC) AS rowValue
),
solution AS (
	SELECT I.*, O1.rowValue AS inputLayerOutput, W1.*, I2.rowValue AS hiddenLayerInput, O2.rowValue AS hiddenLayerOutput, W2.*, I3.rowValue AS outputLayerInput, O3.rowValue AS outputLayerOutput, O.*, E.rowValue AS errorValue, P.*
	FROM inputs AS I
	CROSS JOIN oneRow AS O1
	JOIN weights1 AS W1 ON W1.weight1InputNodeNumber = I.inputNode
	CROSS JOIN oneRow AS I2
	CROSS JOIN oneRow AS O2
	JOIN weights2 AS W2 ON W2.weight2InputNodeNumber = W1.weight1OutputNodeNumber
	CROSS JOIN oneRow AS I3
	CROSS JOIN oneRow AS O3
	JOIN outputs AS O ON O.outputNode = W2.weight2OutputNodeNumber
	CROSS JOIN oneRow AS E
	CROSS JOIN currentPhase AS P

	UNION ALL
	
    SELECT
		inputNode,
		inputValue,

		CASE
			WHEN phase = 0 THEN inputValue
			ELSE inputLayerOutput
		END AS inputLayerOutput,

		weight1InputNodeNumber,
		weight1OutputNodeNumber,
		
		CASE
			WHEN phase = 6 THEN weight1Value - 0.1 * (SUM(outputLayerOutput - outputValue) OVER (PARTITION BY weight1InputNodeNumber, weight1OutputNodeNumber))  * 1 * weight2Value * (CASE WHEN hiddenLayerInput > 0 THEN 1 ELSE 0 END) * inputLayerOutput
			ELSE weight1Value
		END AS weight1Value,
		
		CASE
			WHEN phase = 6 THEN weight1Value - 0.1 * (SUM(outputLayerOutput - outputValue) OVER (PARTITION BY weight1InputNodeNumber, weight1OutputNodeNumber)) * 1 * weight2Value * (CASE WHEN hiddenLayerInput > 0 THEN 1 ELSE 0 END) * 1
			ELSE weight1Bias
		END weight1Bias,

		CASE
			WHEN phase = 1 THEN SUM(weight1Value * inputLayerOutput + weight1Bias) OVER (PARTITION BY weight1OutputNodeNumber, phase) / 3
			ELSE hiddenLayerInput
		END AS hiddenLayerInput,

		CASE
			WHEN phase = 2 THEN CASE WHEN hiddenLayerInput > 0 THEN hiddenLayerInput ELSE 0 END
			ELSE hiddenLayerOutput
		END AS hiddenLayerOutput,

		weight2InputNodeNumber,
		weight2OutputNodeNumber,
		
		CASE
			WHEN phase = 6 THEN weight2Value - 0.1 * (outputLayerOutput - outputValue) * 1 * hiddenLayerOutput
			ELSE weight2Value
		END AS weight2Value,
		
		CASE
			WHEN phase = 6 THEN weight2Value - 0.1 * (outputLayerOutput - outputValue) * 1 * 1
			ELSE weight2Bias
		END ASweight2Bias,

		CASE
			WHEN phase = 3 THEN SUM(weight2Value * hiddenLayerOutput + weight2Bias) OVER (PARTITION BY weight2OutputNodeNumber, phase) / 3
			ELSE outputLayerInput
		END AS outputLayerInput,

		CASE
			WHEN phase = 4 THEN outputLayerInput
			ELSE outputLayerOutput
		END AS outputLayerOutput,
		
		outputNode,
		outputValue,
		
		CASE
			WHEN phase = 5 THEN (outputLayerOutput - outputValue) * (outputLayerOutput - outputValue) / 2
			ELSE errorValue
		END AS errorValue,

		phase + 1 AS phase

	FROM solution
	WHERE phase <= 6
)
SELECT DISTINCT *
FROM solution WHERE phase = 7
ORDER BY weight1InputNodeNumber, weight1OutputNodeNumber, weight2OutputNodeNumber

It is very similar to the solution from previous post. This time in phase 5 we calculate error, in phase 6 we update weights and biases. You can find results here.

]]>
https://blog.adamfurmanek.pl/2019/07/27/machine-learning-part-8/feed/ 0
Machine Learning Part 7 — Forward propagation in neural net in SQL https://blog.adamfurmanek.pl/2019/07/20/machine-learning-part-7/ https://blog.adamfurmanek.pl/2019/07/20/machine-learning-part-7/#respond Sat, 20 Jul 2019 08:00:35 +0000 https://blog.adamfurmanek.pl/?p=2997 Continue reading Machine Learning Part 7 — Forward propagation in neural net in SQL]]>

This is the seventh part of the ML series. For your convenience you can find other parts in the table of contents in Part 1 – Linear regression in MXNet

Today we are going to create a neural net and calculate forward propagation using PostgreSQL. Let’s go.

We start with definition of the network: we will have input layer, hidden layer, and output layer. Input layer will have 3 nodes, hidden layer will have 2, output layer will have 3. In input layer we don’t do any transformation on the input data, in hidden layer we use ReLU, in output layer we use linear activation function (so no transformation).

Let’s start with the following definitions:

DROP TABLE IF EXISTS inputs;
DROP TABLE IF EXISTS weights1;
DROP TABLE IF EXISTS weights2;
DROP TABLE IF EXISTS biases;

CREATE TABLE inputs (
  inputNode NUMERIC,
  inputValue NUMERIC
);

INSERT INTO inputs VALUES
    (1, 1)
   ,(2, 3)
   ,(3, 5)
;

CREATE TABLE weights1 (
  weight1InputNodeNumber NUMERIC,
  weight1OutputNodeNumber NUMERIC,
  weight1Value NUMERIC,
  weight1Bias NUMERIC
);

INSERT INTO weights1 VALUES
    (1, 1, 2, 1)
   ,(1, 2, 3, 1)
   ,(2, 1, 4, 2)
   ,(2, 2, 5, 2)
   ,(3, 1, 6, 3)
   ,(3, 2, 7, 3)
;

CREATE TABLE weights2 (
  weight2InputNodeNumber NUMERIC,
  weight2OutputNodeNumber NUMERIC,
  weight2Value NUMERIC,
  weight2Bias NUMERIC
);

INSERT INTO weights2 VALUES
    (1, 1, 1, 2)
   ,(1, 2, 2, 2)
   ,(1, 3, 3, 2)
   ,(2, 1, 4, 3)
   ,(2, 2, 5, 3)
   ,(2, 3, 6, 3)
;

We define some input values, weights and biases. Values are completely made up and do not make a difference.

Before we write SQL code, let’s calculate result manually.

We have the following variables:

    \begin{gather*} input = \left[\begin{array}{c} 1 \\ 3 \\ 5 \end{array}\right] \\ W^1 = \left[\begin{array}{cc} 2 & 3 \\ 4 & 5 \\ 6 & 7 \end{array}\right] \\ b^1 = \left[\begin{array}{cc} 1 & 1 \\ 2 & 2 \\ 3 & 3 \end{array}\right] \\ W^2 = \left[\begin{array}{ccc} 1 & 2 & 3 \\ 4 & 5 & 6 \end{array}\right] \\ b^2 = \left[\begin{array}{ccc} 2 & 2 & 2 \\ 3 & 3 & 3 \end{array}\right] \\ \end{gather*}

Now, let’s calculate input for hidden layer:

    \begin{gather*} h^{in} = \left[\begin{array}{c} W^1_{1, 1} \cdot input_1 + b^1_{1, 1} + W^1_{2, 1} \cdot input_2 + b^1_{2, 1} + W^1_{3, 1} \cdot input_3 + b^1_{3, 1} \\ W^1_{1, 2} \cdot input_1 + b^1_{1, 2} + W^1_{2, 2} \cdot input_2 + b^1_{2, 2} + W^1_{3, 2} \cdot input_3 + b^1_{3, 2} \end{array}\right] \end{gather*}

Now, we use ReLU activation function for hidden layer:

    \begin{gather*} h^{out} = \left[\begin{array}{c} \max(h^{in}_1, 0) \\ \max(h^{in}_2, 0) \end{array}\right] \end{gather*}

We carry on with calculating input for output layer:

    \begin{gather*} y^{in} = \left[\begin{array}{c} W^2_{1, 1} \cdot h^{out}_1 + b^2_{1, 1} +  W^2_{2, 1} \cdot h^{out}_2 + b^2_{2, 1} \\ W^2_{1, 2} \cdot h^{out}_1 + b^2_{1, 2} +  W^2_{2, 2} \cdot h^{out}_2 + b^2_{2, 2} \\ W^2_{1, 3} \cdot h^{out}_1 + b^2_{1, 3} +  W^2_{2, 3} \cdot h^{out}_2 + b^2_{2, 3} \end{array}\right] \end{gather*}

Activation function for output layer is linear, so it is easy now:

    \begin{gather*} y^{out} = y^{in} \end{gather*}

We will calculate errors next time.

Now, let’s calculate the result:

WITH RECURSIVE currentPhase AS(
	SELECT CAST(0 AS NUMERIC) AS phase
),
oneRow AS(
	SELECT CAST(NULL AS NUMERIC) AS rowValue
),
solution AS (
	SELECT I.*, O1.rowValue AS inputLayerOutput, W1.*, I2.rowValue AS hiddenLayerInput, O2.rowValue AS hiddenLayerOutput, W2.*, I3.rowValue AS outputLayerInput, O3.rowValue AS outputLayerOutput, P.*
	FROM inputs AS I
	CROSS JOIN oneRow AS O1
	JOIN weights1 AS W1 ON W1.weight1InputNodeNumber = I.inputNode
	CROSS JOIN oneRow AS I2
	CROSS JOIN oneRow AS O2
	JOIN weights2 AS W2 ON W2.weight2InputNodeNumber = W1.weight1OutputNodeNumber
	CROSS JOIN oneRow AS I3
	CROSS JOIN oneRow AS O3
	CROSS JOIN currentPhase AS P

	UNION ALL
	
    SELECT
		inputNode,
		inputValue,

		CASE
			WHEN phase = 0 THEN inputValue
			ELSE inputLayerOutput
		END AS inputLayerOutput,

		weight1InputNodeNumber,
		weight1OutputNodeNumber,
		weight1Value,
		weight1Bias,

		CASE
			WHEN phase = 1 THEN SUM(weight1Value * inputLayerOutput + weight1Bias) OVER (PARTITION BY weight1OutputNodeNumber, phase) / 3
			ELSE hiddenLayerInput
		END AS hiddenLayerInput,

		CASE
			WHEN phase = 2 THEN CASE WHEN hiddenLayerInput > 0 THEN hiddenLayerInput ELSE 0 END
			ELSE hiddenLayerOutput
		END AS hiddenLayerOutput,

		weight2InputNodeNumber,
		weight2OutputNodeNumber,
		weight2Value,
		weight2Bias,

		CASE
			WHEN phase = 3 THEN SUM(weight2Value * hiddenLayerOutput + weight2Bias) OVER (PARTITION BY weight2OutputNodeNumber, phase) / 3
			ELSE outputLayerInput
		END AS outputLayerInput,

		CASE
			WHEN phase = 4 THEN outputLayerInput
			ELSE outputLayerOutput
		END AS outputLayerOutput,

		phase + 1 AS phase

	FROM solution
	WHERE phase <= 4
)
SELECT DISTINCT weight2OutputNodeNumber, outputLayerOutput
FROM solution WHERE phase = 5

This is actually very easy. We divide the process into multiple phases. Each row of CTE represents one complete path from some input node to some output node. Initially row carries some metadata and input value, in each phase we fill some next value using different case expressions.

In phase 0 we get the input and transform it into output, since input layer has no logic, we just copy the value.
In phase 1 we calculate inputs for next layer by multiplying weights and values.
In phase 2 we activate hidden layer. Since we use ReLU, we perform a very simple comparison.
In phase 3 we once again use weights and values to calculate input for next layer, this time we use different weights.
In phase 4 we activate output layer, which just copies values (since we use a linear activation function).

So in our query we start by defining a schema. We simply join all tables and cross join dummy table with one row which we use to define additional column. We fill these columns later throughout the process.

In recursive part of CTE we simply either rewrite values or do some logic depending on the phase number.

You can see results here.

Next time we will see how to backpropagate errors.

]]>
https://blog.adamfurmanek.pl/2019/07/20/machine-learning-part-7/feed/ 0
Windowing functions in recursive CTE https://blog.adamfurmanek.pl/2019/07/13/windowing-functions-in-recursive-cte/ https://blog.adamfurmanek.pl/2019/07/13/windowing-functions-in-recursive-cte/#respond Sat, 13 Jul 2019 08:00:24 +0000 https://blog.adamfurmanek.pl/?p=2994 Continue reading Windowing functions in recursive CTE]]> Today we will see an interesting case of incompatibility between MS SQL Server 2017 and PostgreSQL 9.6 (and different versions as well). Let’s start with this code:

WITH dummy AS(
    SELECT 1 AS rowValue, 0 AS phase
    UNION ALL
    SELECT 2 AS rowValue, 0 AS phase
),
solution AS (
    SELECT * FROM dummy
),
solution2 AS(
    SELECT
        SUM(rowValue) OVER (PARTITION BY phase) AS rowValue,
        phase + 1 AS phase
    FROM solution
    WHERE phase = 0
)
SELECT *
FROM solution2
WHERE phase = 1

We emulate a recursive CTE. We have two columns in source dataset, we want to sum first column for rows partitioned by second column. This gives a very expected result:

rowValue    phase
----------- -----------
3           1
3           1

Now let’s use recursive CTE in MS SQL:

WITH dummy AS(
    SELECT 1 AS rowValue, 0 AS phase
    UNION ALL
    SELECT 2 AS rowValue, 0 AS phase
),
solution AS (
    SELECT * FROM dummy
    UNION ALL
        SELECT
        SUM(rowValue) OVER (PARTITION BY phase) AS rowValue,
        phase + 1 AS phase
    FROM solution
    WHERE phase = 0
)
SELECT * FROM solution WHERE phase = 1;

And result is:

rowValue    phase
----------- -----------
2           1
1           1

However, PostgreSQL gives correct values:

rowValue    phase
----------- -----------
3           1
3           1

Beware! Also, see this great post explaining row-based approach and set-based approach for implementing CTE.

]]>
https://blog.adamfurmanek.pl/2019/07/13/windowing-functions-in-recursive-cte/feed/ 0
Machine Learning Part 6 — Matrix multiplication in SQL https://blog.adamfurmanek.pl/2019/06/29/machine-learning-part-6/ https://blog.adamfurmanek.pl/2019/06/29/machine-learning-part-6/#respond Sat, 29 Jun 2019 08:00:47 +0000 https://blog.adamfurmanek.pl/?p=2936 Continue reading Machine Learning Part 6 — Matrix multiplication in SQL]]>

This is the sixth part of the ML series. For your convenience you can find other parts in the table of contents in Part 1 – Linear regression in MXNet

Today we are going to implement a matrix multiplication in Redshift. Let’s go.

First, let’s see what we want to calculate:

    \begin{gather*} \left[\begin{array}{cc}2&3\\4&5\end{array}\right] \left[\begin{array}{cc}5&3\\2&4\end{array}\right] = \left[\begin{array}{cc}16&18\\30&32\end{array}\right] \end{gather*}

Nothing fancy. We would like our algorithm to be extensible to any sizes and non-uniform matrices as well.

Let’s start with matrix representation:

DROP TABLE IF EXISTS matrix1;

CREATE TEMP TABLE matrix1 (
  rowNumber INT,
  columnNumber INT,
  value INT
);

DROP TABLE IF EXISTS matrix2;

CREATE TEMP TABLE matrix2 (
  rowNumber INT,
  columnNumber INT,
  value INT
);

INSERT INTO matrix1 VALUES
   (1, 1, 2)
  ,(1, 2, 3)
  ,(2, 1, 4)
  ,(2, 2, 5)
;

INSERT INTO matrix2 VALUES
   (1, 1, 5)
  ,(1, 2, 3)
  ,(2, 1, 2)
  ,(2, 2, 4)
;

We store the matrices as a rows where each row represents one value for given row and column. Rows and columns are one-based.

First, we need to calculate size of the result:

WITH maxWidth AS(
  SELECT MAX(columnNumber) AS width FROM matrix2
),
maxHeight AS (
  SELECT MAX(rowNumber) AS height FROM matrix1
),
resultDimensions AS (
  SELECT width, height FROM maxWidth CROSS JOIN maxHeight
),

So we just get the maximum width and maximum height from the respective matrices. Now, we want to generate all the cells we need to fill:

rowNums AS (
  SELECT (row_number() OVER (ORDER BY 1)) AS rowNumber FROM matrix1 WHERE rowNumber <= (SELECT MAX(height) FROM resultDimensions)
),
columnNums AS (
  SELECT (row_number() OVER (ORDER BY 1)) AS columnNumber FROM matrix2 WHERE columnNumber <= (SELECT width FROM resultDimensions)
),
positions AS (
  SELECT rowNumber, columnNumber FROM rowNums CROSS JOIN columnNums
),

So we basically do the Cartesian product and we are done. Now, we would like to get correct pairs for each cell:

pairsForPositions AS (
  SELECT P.rowNumber, P.columnNumber, M1.value AS M1, M2.value AS M2
  FROM positions AS P
  JOIN matrix1 AS M1 ON M1.rowNumber = P.rowNumber
  JOIN matrix2 AS M2 ON M2.columnNumber = P.columnNumber AND M2.rowNumber = M1.columnNumber
),

This is what we get for our sample matrices:

row	column	m1	m2
1	1	2	5
1	1	3	2
1	2	2	3
1	2	3	4
2	1	4	5
2	1	5	2
2	2	4	3
2	2	5	4

Looks good. Now we just need to aggregate the pairs:

results AS (
  SELECT rowNumber, columnNumber, SUM(M1 * M2) AS value
  FROM pairsForPositions
  GROUP BY rowNumber, columnNumber
)
SELECT * FROM results ORDER BY rowNumber, columnNumber

And we are done. You can see the code here.

]]>
https://blog.adamfurmanek.pl/2019/06/29/machine-learning-part-6/feed/ 0
Machine Learning Part 4 — Linear regression in T-SQL https://blog.adamfurmanek.pl/2018/11/10/machine-learning-part-4/ https://blog.adamfurmanek.pl/2018/11/10/machine-learning-part-4/#comments Sat, 10 Nov 2018 09:00:03 +0000 https://blog.adamfurmanek.pl/?p=2640 Continue reading Machine Learning Part 4 — Linear regression in T-SQL]]>

This is the fourth part of the ML series. For your convenience you can find other parts in the table of contents in Part 1 – Linear regression in MXNet

This time we are going to implement linear regression as a function. This gives us a little more flexibility in terms of debugging the code and reading it later, also, we can implement much more complex algorithms. Too bad, we can’t use this in Redshift at this time as it doesn’t support such functions or stored procedures. So I will use T-SQL and test the code with MS SQL 2017. I assume you have table samples with Iris dataset.

We start with declaring a type for the function parameter:

CREATE TYPE SamplesTable 
AS TABLE (id int, feature int, value float, target float)

Next, let’s prepare samples for training:

DECLARE @numbers TABLE (N int)

INSERT INTO @numbers SELECT TOP 5 row_number() OVER(ORDER BY t1.number) AS N FROM master..spt_values AS t1 CROSS JOIN master..spt_values AS t2

DECLARE @samples TABLE(
	sepal_length float
	,sepal_width float
	,petal_length float
	,petal_width float
	,iris varchar(255)
	,is_setosa float
	,is_virginica float
	,sample_id int
)

INSERT INTO @samples SELECT TOP 100 S.*,
CASE WHEN S.iris = 'setosa' THEN 1.0 ELSE 0.0 END AS is_setosa, 
CASE WHEN S.iris = 'virginica' THEN 1.0 ELSE 0.0 END AS is_virginica,
row_number() OVER(ORDER BY (SELECT NULL)) AS sample_id
FROM samples AS S ORDER BY (SELECT ABS(CHECKSUM(NewId()))) 

DECLARE @samplesPivoted SamplesTable

INSERT INTO @samplesPivoted 
SELECT
	S.sample_id,
	N.N,
	CASE
		WHEN N.N = 1 THEN S.sepal_width
		WHEN N.N = 2 THEN S.petal_length
		WHEN N.N = 3 THEN S.petal_width
		WHEN N.N = 4 THEN S.is_setosa
		ELSE S.is_virginica
	END,
	S.sepal_length
FROM @samples AS S CROSS JOIN @numbers AS N

We generate table with numbers, next add more features, and then pivot them just like in the last part.

Finally, our function:

CREATE FUNCTION Train(@samplesPivoted SamplesTable READONLY)
RETURNS @coefficients TABLE(feature int, w float, b float, mse float)
AS
BEGIN
    DECLARE @featureIds TABLE(feature int)
	INSERT INTO @featureIds SELECT DISTINCT feature from @samplesPivoted

	INSERT INTO @coefficients SELECT feature, 0.0, 0.0, -1.0 FROM @featureIds

	DECLARE @gradients TABLE(feature int, gw float, gb float)
	INSERT INTO @gradients SELECT feature, 0.0, 0.0 FROM @featureIds

	DECLARE @learningRate float
	SELECT @learningRate = 0.01

	DECLARE @iterations int
	SELECT @iterations = 500

	DECLARE @currentIteration int
	SELECT @currentIteration = 0

	DECLARE @newCoefficients TABLE(feature int, w float, b float)
	DECLARE @distances TABLE(id int, distance float)
	DECLARE @mse float

	WHILE @currentIteration < @iterations
	BEGIN
		DELETE FROM @newCoefficients
		INSERT INTO @newCoefficients SELECT C.feature, C.w - @learningRate * G.gw, C.b - @learningRate * G.gb FROM @coefficients AS C JOIN @gradients AS G ON C.feature = G.feature

		DELETE FROM @distances;

		INSERT INTO @distances SELECT 
			S.id, 
			SUM(N.w * S.value + N.b) - MAX(S.target)
		FROM 
			@samplesPivoted AS S
			JOIN @newCoefficients AS N ON S.feature = N.feature
		GROUP BY S.id

		SELECT @mse = AVG(D.distance * D.distance) FROM @distances AS D
		
		DELETE FROM @gradients;

		INSERT INTO @gradients SELECT
			S.feature,
			AVG(S.value * D.distance),
			AVG(D.distance)
		FROM 
			@samplesPivoted AS S
			JOIN @distances AS D ON S.id = D.id
		GROUP BY S.feature

		DELETE FROM @coefficients;

		INSERT INTO @coefficients SELECT *, @mse FROM @newCoefficients
		
		SELECT @currentIteration = @currentIteration + 1
	END

	RETURN
END

We extract featureIds so we can pass basically any dataset for training and it should work. We initialize coefficients with default values, do the same with gradients, and prepare some bookkeeping like iterations count or learning rate.

Next, in every iteration we start with calculating new coefficients based on old coefficients and old gradients. We clear distances table and calculate distance (which is the difference between predicted value and expected value) for each sample. Next, we calculate mean squared error.

Next, we need to calculate new gradients. For each feature we calculate the derivatives and we are done. We just need to store new coefficients and increase the counter.

Now we can execute the code:

SELECT * FROM Train(@samplesPivoted)

And the result is:

feature     w                      b                      mse
----------- ---------------------- ---------------------- ----------------------
1           0.746997439342549      0.282176586393152      0.098274347087078
2           0.563235001391582      0.282176586393152      0.098274347087078
3           0.0230764649956309     0.282176586393152      0.098274347087078
4           0.193704294614636      0.282176586393152      0.098274347087078
5           -0.110068224303597     0.282176586393152      0.098274347087078

]]>
https://blog.adamfurmanek.pl/2018/11/10/machine-learning-part-4/feed/ 2