Cast from String to Integer in Spark 2

When casting from String to Integer in Spark 2 I found a bug and here I present my workaround. Later I verified the bug does not exist in Spark 3.

To reproduce the error create a file with these contents, I named it test_feature_engineering.csv:

ID,XACT_DT,DUE_DT,AFT_CHRG_XACT_AMI,BILL_AMT
1,2020-01-11,2020-01-11, 10, 10
2,2020-01-11,2020-01-12, 10, 10
3,2020-01-11 10:10:01,2020-01-12 10:20:00, 10, 10

Open the spark-shell and type the following commands:


scala> import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}                                                     
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}                                                            
                                                                                                                                                
scala> import org.apache.spark.sql.{DataFrame, DataFrameReader, SQLContext}                                                                     
import org.apache.spark.sql.{DataFrame, DataFrameReader, SQLContext}  

scala> val schemaFiveColumns: StructType = StructType(List(                                                                                   
     |     StructField("ID", StringType, nullable = false),                                                                                     
     |     StructField("XACT_DT", StringType, nullable = true),                                                                                 
     |     StructField("DUE_DT", StringType, nullable = true),                                                                                  
     |     StructField("AFT_CHRG_XACT_AMI", StringType, nullable = true),                                                                       
     |     StructField("BILL_AMT", StringType, nullable = true)                                                                                 
     |   ))                                                                                                                                     
schemaFiveColumns: org.apache.spark.sql.types.StructType = StructType(StructField(ID,StringType,false), StructField(XACT_DT,StringType,true), St
ructField(DUE_DT,StringType,true), StructField(AFT_CHRG_XACT_AMI,StringType,true), StructField(BILL_AMT,StringType,true))                       
                                                                                                                                                
scala> def getTestDataFrame(resourceFilePath: String, schema:  StructType): DataFrame = {                                                       
     |   spark.read                                                                                                                             
     |    .schema(schema)                                                                                                                       
     |    .option("header", value=true)                                                                                                         
     |    .csv(resourceFilePath)                                                                                                                
     | }                                                                                                                                        
getTestDataFrame: (resourceFilePath: String, schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame  

scala> val df = getTestDataFrame("file:///path_to_file/test_feature_engineering.csv", schemaFiveColumns)
df: org.apache.spark.sql.DataFrame = [ID: string, XACT_DT: string ... 3 more fields]

scala> df.show()
+---+-------------------+-------------------+-----------------+--------+
| ID|            XACT_DT|             DUE_DT|AFT_CHRG_XACT_AMI|BILL_AMT|
+---+-------------------+-------------------+-----------------+--------+
|  1|         2020-01-11|         2020-01-11|               10|      10|
|  2|         2020-01-11|         2020-01-12|               10|      10|
|  3|2020-01-11 10:10:01|2020-01-12 10:20:00|               10|      10|
+---+-------------------+-------------------+-----------------+--------+


scala> val df2 = df.withColumn(colName = "res", df.col("AFT_CHRG_XACT_AMI").cast("Integer"))
df2: org.apache.spark.sql.DataFrame = [ID: string, XACT_DT: string ... 4 more fields]

scala> df2.show()
+---+-------------------+-------------------+-----------------+--------+----+
| ID|            XACT_DT|             DUE_DT|AFT_CHRG_XACT_AMI|BILL_AMT| res|
+---+-------------------+-------------------+-----------------+--------+----+
|  1|         2020-01-11|         2020-01-11|               10|      10|null|
|  2|         2020-01-11|         2020-01-12|               10|      10|null|
|  3|2020-01-11 10:10:01|2020-01-12 10:20:00|               10|      10|null|
+---+-------------------+-------------------+-----------------+--------+----+

scala> val df3 = df.withColumn(colName = "res", df.col("AFT_CHRG_XACT_AMI").cast("Float").cast("Integer"))
df3: org.apache.spark.sql.DataFrame = [ID: string, XACT_DT: string ... 4 more fields]

scala> df3.show()
+---+-------------------+-------------------+-----------------+--------+---+
| ID|            XACT_DT|             DUE_DT|AFT_CHRG_XACT_AMI|BILL_AMT|res|
+---+-------------------+-------------------+-----------------+--------+---+
|  1|         2020-01-11|         2020-01-11|               10|      10| 10|
|  2|         2020-01-11|         2020-01-12|               10|      10| 10|
|  3|2020-01-11 10:10:01|2020-01-12 10:20:00|               10|      10| 10|
+---+-------------------+-------------------+-----------------+--------+---+

Inspecting the output above it is possible to verify that a cast from String to Integer results in res column with null values in df2. df3 shows that casting first to Float and then to Integer produces the desired result.

CI/CD from scratch

To start a brand new CI/CD project three tools are necessary:

  • Git, a source control tool;
  • Nexus, a binary repository tool; and
  • Jenkins, an automation tool.

The above list are widely used FLOSS projects. Any other set of tools will fit the bill. There are two rules to follow no matter what:

  • there is absolutely no space for manual steps, from development to deployment in live environments. It means that every test and deployment must be automated and scripted; and
  • every piece of code, configuration and scripts mus be kept in source control.

spark-submit a Python application in cluster mode

To launch a Python Spark application in cluster mode it is necessary to broadcast the application to the workers, using the --py-files directive. I concluded that the best way to do it is to create a fat egg with the .py files, and extract the entry point python file from it. The packaged code is referenced from the Spark application adding this reference in the entry point file:

import sys
sys.path.insert(0, <name of egg file>)

A simple working example can be found here:

https://github.com/sparkfireworks/spark-submit-cluster-python

Scala foldLeft

foldleft is a partial applied function (curried), where first it is applied an initial value followed by an operation on a pair of elements from the sequence to be fold:

def foldLeft[B](z: B)(op: (B, A) ⇒ B): B

scala> val xs: List[Int] = List(1,2,3)
xs: List[Int] = List(1, 2, 3)

scala> xs.foldLeft(0){(acc, x) => acc + x}
res9: Int = 6

scala> xs.foldLeft(0)(_+_)
res10: Int = 6

Scala string operations

Create an empty string:

scala> val emptyString: String = ""
emptyString: String = ""

Concatenate strings:

scala> "London " + "city"
res1: String = London city

String length:

scala> val str: String = "London " + "city"
str: String = London city

scala> str.length()
res7: Int = 11

scala> str.size
res8: Int = 11

Mulitine String:

scala> val str: String = """I am a multiline
     | String
     | In Scala""".stripMargin
str: String =
I am a multiline
String
In Scala

Parametrize a String:

scala> val name: String = "London"
name: String = London

scala> val str: String = s"""${name} city""".stripMargin
str: String = London city

Concatenate a Sequence of Strings:

scala> val xs: List[String] = List("To", "be", "or", "not", "to", "be")
xs: List[String] = List(To, be, or, not, to, be)

scala> xs.mkString(" ")
res1: String = To be or not to be