spark-submit a Python application in cluster mode

To launch a Python Spark application in cluster mode it is necessary to broadcast the application to the workers, using the --py-files directive. I concluded that the best way to do it is to create a fat egg with the .py files, and extract the entry point python file from it. The packaged code is referenced from the Spark application adding this reference in the entry point file:

import sys
sys.path.insert(0, <name of egg file>)

A simple working example can be found here:

https://github.com/sparkfireworks/spark-submit-cluster-python

Changing GitHub project origin

You may need to change a GitHub origin definition in the client-side, some reasons to do this are:

  • the repo name changed;
  • you want to switch from HTTPS to SSH or vice-versa.

Imagine I want to change my authentication method from HTTPS to SSH, the HHTPS URL I want to replace is:

https://github.com/sparkfireworks/spark-fireworks.git

the SSH URL want to start using is:

git@github.com:sparkfireworks/spark-fireworks.git

Start by listing current origin:

$ git remote -v
origin  https://github.com/sparkfireworks/spark-fireworks.git (fetch)
origin  https://github.com/sparkfireworks/spark-fireworks.git (push)

Now remove the origin no longer in use:

$ git remote remove origin

Finally add the new SSH URL:

git remote add origin git@github.com:sparkfireworks/spark-fireworks.git

Now check you successfully switched to SSH:

$ git remote -v
origin  git@github.com:sparkfireworks/spark-fireworks.git (fetch)
origin  git@github.com:sparkfireworks/spark-fireworks.git (push)

Verifying if a Hive database or table exists

Simple Python code sample that checks if a Hive database or table exists.

from pyspark.sql import HiveContext


def database_exists(hc, db):
  """
  Function that checks the existence of a Hive database
  :param hc: Hive Context
  :param db: database name
  :return: bool, True if dabase exists
  """
  return bool([x.datadaseName for x in hc.sql("SHOW DATABASES").collect() if x.datadaseName == db])


def table_exists(hc, db, table):
  """
  Function that checks the existence of a Hive table
  :param hc: Hive Context
  :param db: database name
  :param table: table name
  :return: bool, True if table exists
  """
  if database_exists(hc=hc, db=db):
        hc.sql("USE %s" % (db,))
        return table in hc.tableNames() 
    else: 
        False

Setting up SSH authentication for GitHub

To add a SSH key to GitHub it is necessary to work both in your computer and in GitHub web page. Lets start with the computer from which you want to commit code to your GitHub repositories. Later we will deal with GitHub.

@ your computer

First start by generating a ssh key. To generate a ssh key got to the command line and type:

$ ssh-keygen 
Generating public/private rsa key pair.
Enter file in which to save the key (/Users/cadoado/.ssh/id_rsa): /Users/cadoado/.ssh/github_rsa
Enter passphrase (empty for no passphrase): 
Enter same passphrase again: 
Your identification has been saved in /Users/cadoado/.ssh/github_rsa.
Your public key has been saved in /Users/cadoado/.ssh/github_rsa.pub.
The key fingerprint is:
SHA256:5egVnuUSe4NKS4l8HU4mnioQk4sacmjI3upq9eQ9Bfk cadoado@Cados-MBP
The key's randomart image is:
+---[RSA 2048]----+
|                 |
|   .             |
|  +     o B .    |
|oo + . = & X     |
|*o+   o S @ +    |
|=o.o . * E o .   |
|....= o =        |
| ..  + o         |
|=o      .        |
+----[SHA256]-----+

The passphrase can be an empty string (just press Enter). Now you can check the key pair was generated:

ls -la ~/.ssh/
total 48
drwx------   8 cadoado  staff   256  9 Jul 23:31 .
drwxr-xr-x+ 24 cadoado  staff   768  9 Jul 23:07 ..
-rw-r--r--   1 cadoado  staff    46  9 Jul 23:07 config
-rw-------   1 cadoado  staff  1823  9 Jul 23:31 github_rsa
-rw-r--r--   1 cadoado  staff   399  9 Jul 23:31 github_rsa.pub
-rw-------   1 cadoado  staff  1843  9 Jul 10:06 id_rsa
-rw-------   1 cadoado  staff   413  9 Jul 10:06 id_rsa.pub
-rw-r--r--   1 cadoado  staff   799  9 Jul 14:34 known_hosts

The file github_rsa is the private part, that you shell never ever disclose. The file github_rsa.pub is the public part, that you distribute whenever requested to.

Assign the key to GitHub editing – or creating – a config file:

$ vi ~/.ssh/config

Add these lines to the config file (here you must refer to the private part of the key):

Host github.com
  AddKeysToAgent yes
  UseKeychain yes
  IdentityFile ~/.ssh/github_rsa

It is essential assign the correct permissions on these files:

$ chmod 644 ~/.ssh/config
$ chmod 600 ~/.ssh/github_rsa
$ chmod 644 ~/.ssh/github_rsa.pub

Finally add the key to your keyring:

$ ssh-add -K ~/.ssh/github_rsa

@ GitHub

In GutHub select Settings:

image showing where settings are located in main menu

Select SSH and GPG keys:

select ssh from settings menu

Press New SSH key button:

button nwe ssh key

Finally copy/paste the public part of the previously generated key. Give the key a name because in the future you may end up with several keys (for each device you want to commit code from)

insert public part of generated key

Final remarks

Now you can communicate conveniently and securely with GitHub, avoiding constant typing of your user/password.

You may need to reset the project origin in your local machine, please refer to the post Changing GitHub project origin.

Scala foldLeft

foldleft is a partial applied function (curried), where first it is applied an initial value followed by an operation on a pair of elements from the sequence to be fold:

def foldLeft[B](z: B)(op: (B, A) ⇒ B): B

scala> val xs: List[Int] = List(1,2,3)
xs: List[Int] = List(1, 2, 3)

scala> xs.foldLeft(0){(acc, x) => acc + x}
res9: Int = 6

scala> xs.foldLeft(0)(_+_)
res10: Int = 6

Scala string operations

Create an empty string:

scala> val emptyString: String = ""
emptyString: String = ""

Concatenate strings:

scala> "London " + "city"
res1: String = London city

String length:

scala> val str: String = "London " + "city"
str: String = London city

scala> str.length()
res7: Int = 11

scala> str.size
res8: Int = 11

Mulitine String:

scala> val str: String = """I am a multiline
     | String
     | In Scala""".stripMargin
str: String =
I am a multiline
String
In Scala

Parametrize a String:

scala> val name: String = "London"
name: String = London

scala> val str: String = s"""${name} city""".stripMargin
str: String = London city

Concatenate a Sequence of Strings:

scala> val xs: List[String] = List("To", "be", "or", "not", "to", "be")
xs: List[String] = List(To, be, or, not, to, be)

scala> xs.mkString(" ")
res1: String = To be or not to be