Get keys and values from a Scala Map

scala> val states = Map("AK" -> "Alaska", "AL" -> "Alabama", "AR" -> "Arkansas")
states: scala.collection.immutable.Map[String,String] = Map(AK -> Alaska, AL -> Alabama, AR -> Arkansas)

scala> states.keySet
res0: scala.collection.immutable.Set[String] = Set(AK, AL, AR)

scala> states.keys
res1: Iterable[String] = Set(AK, AL, AR)

scala> states.values
res2: Iterable[String] = MapLike.DefaultValuesIterable(Alaska, Alabama, Arkansas)

scala> states("AK")
res3: String = Alaska

states + ("NY" -> "New York")
res7: scala.collection.immutable.Map[String,String] = Map(AK -> Alaska, AL -> Alabama, AR -> Arkansas, NY -> New York)

Idempotence

The more I work with systems exchanging messages among them, the more I am aware of idempotency.

Formally speaking, in mathematics, an idempotent function has the same effect no mater how many times it is applied. Think about a remote to lock/unlock a car, most remotes will just lock the doors no matter how many times you press the lock switch. In mathematical notation:

f(x)=f(f(x))

Why is it so important? Well, if you have a set of uncoupled systems, you can experiment partitions or other problems that prevent messages from being successfully processed. If this is the case the first thing you need is the ability to retry sending the message not warring about a possible duplicate effect on the world.

How to implement idempotency

There are two options to do idempotency:

  • store state at the server so that it recognises a duplicate request and just ignores it, usually it is achieved storing the processed messages IDs; or
  • make the algorithms idempotent, imagine something like when the message encodes an intention rather then the way to do something. One example would be in a game have the instruction grab sward, no matter the amount of times it is processed the effect would be the same.

Final remarks

I strongly support idempotent code, from the start, even if right now you do not figure out the benefits. I always expect idempotency to emerge as an essential way to avoid expensive tasks to avoid unintended duplicates.

Message attributes in messaging systems

For me the best way to create micro-services is to follow a small set of rules:

  • highly decoupled – each service does not know anything about other services;
  • asynchronous communication – reactive system;
  • stateless wherever possible; and
  • small services – a single developer should be able to maintain the entire micro-service in her head.

There is no better way to create decoupling than to use a messaging system. A service subscribes to one or several topics and publishes to one or more topics. This way a particular service does not know where messages are originated and is not aware of what happens downstream.

Point to point systems

Traditional systems where components communicate directly rely on the knowledge that consumers have on servers endpoints. This is the philosophy behind REST and gRPC. Each service is passively waiting for its services to be requested, exposing endpoints for clients to call. The client usually maintains the connection open waiting for a response.

Event driven reactive systems

Event driven reactive systems communicate via asynchronous messages. This messages should go flow through channels in a publish-subscribe pattern. A service subscribes to topics and publish to other topics. Just this easy. A message is an event. The system works reacting to messages flow, without need for a coordinator or orchestrator.

Canonical data model

The first step is to create a canonical data model for messages to conform to. In Kafka this is the schema registry. Each service must adapt its events internal representation to the canonical data model when consuming or producing messages.

Message attributes

A very important aspect is the distinction between payload and attributes (in Kafka they are called headers). The payload is the contents of the message, the event contents, for instance computation results or reply to a query. Attributes give context, for instance a timestamp.

Attributes are a fundamental piece of a micro-services architecture but usually they do not receive the attention they deserve. Attributes are seldom mentioned.

Monitoring and observability

The most important use is to create a monitoring and observability layer. A service mesh leads to a complex web of paths, and the decoupling of its components makes it hard to understand the system at rest. Its the monitoring of the live system that emerges system patterns.

Every message should be enriched with metadata, and metadata place is in the attributes. Attributes are typically:

  • timestamp;
  • message ID;
  • correlation ID (if it is a reply to a message or part of a set of messages);
  • relevant time to reply;
  • sender;
  • owner; and
  • if it is a probe message (for instance a message sent periodically to access system health).

Attributes are processed by other systems, for instance ElasticSearch, to provide metrics, warnings and errors.

You may be thinking that usually this is solved using logs. I argue that collecting logs is an invaluable post-mortem tool and some alerting but total reliance on logs is an antipattern. I could see it happen to surface the time it takes to process a message.

If there is metadata to associate with messages, it is the correct way to handle it. An event should be directly linked with its metadata. This way metadata becomes data fed to control systems in a very direct way. Metadata may start to be processed in the future as the system matures in ever more complex ways, increasing our understanding and enhancing our control over the system.

Currying callbacks to reach the next level

When talking with people new to functional programming currying is the hardest concept to explain. Usually, newcomers are very skeptical and keep asking “why is currying relevant?”.

In this post, I will show an example of currying a callback. A callback is a function passed as an argument to another function (a higher-order function).

A real-world practical problem

I created a Kafka consumer that runs in its own thread. To process messages, the Kafka consumer receives a callback with a single argument: a Kafka message to be processed. This way my Kafka consumer abstracts how a message should be processed, postponing this decision to runtime.

Sometimes this callback function requires a complex message processing, for example, sending a reply message to a Kafka topic. Let’s refer to this complex processing requirement as messaging the processing environment. The environment description, name of the reply topic and everything else is known at run time, reading a configuration file.

Curry to the rescue

The Kafka consumer problem requires heavy usage of Currying. I find this solution aesthetically appealing and I would like to hear about your opinion.

First I define a Kafka message with headers and a payload (Kafka’s value):

/** Abstract message to be sent using a messaging system, for instance Pub/Sub or Kafka
 * attributes are message metadata, payload is the message content
 */
case class Message(attributes: Map[String, String], payload: String)

Here is the Kafka consumer code:

import java.time.Duration
import java.util.{Arrays, Properties}

import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecords, KafkaConsumer}

import scala.collection.JavaConverters._

object KafkaSubscriber extends StrictLogging {

  /** Creates Properties for the Kafka consumer
   *
   * @param bootstrap_servers        Zookeepers servers
   * @param group_id                 Group ID to which the consumer belongs to
   * @param key_deserializer_class   Key deserialization
   * @param value_deserializer_class Payload deserialization
   * @return Properties for the Kafka consumer
   */
  def kafkaConsumerSettings(bootstrap_servers: String,
                            group_id: String,
                            key_deserializer_class: String,
                            value_deserializer_class: String
                           ): Properties = {

    val properties: Properties = new Properties()

    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, group_id)
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer_class)
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, value_deserializer_class)

    properties
  }

  /** Subscribe to a topic consuming it with a callback.
   *
   * @param consumerSettings Properties for the consumer
   * @param topic            Topic to read from
   * @param callback         Function that performs an action over the message
   */
  def subscribe(consumerSettings: Properties, topic: String)(callback: Message => Unit): Unit = {

    logger.info(s"Starting consumer on Kafka topic: ${topic}.")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](consumerSettings)
    consumer.subscribe(Arrays.asList(topic))

    try {
      while (true) {
        val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(100))

        records.asScala.foreach(rec => {
          val attributes: Map[String, String] =
            rec
              .headers()
              .toArray
              .map(x => Tuple2(x.key, new String(x.value)))
              .toMap
          val payload: String = rec.value()
          callback(Message(attributes = attributes, payload = payload))
        })
      }
    } finally {
      consumer.close()
    }
  }
}

Lets look at the signature of a function to be sent as a callback to the above subscribe function. This callback sends an environment for the Kafka consumer:

/** Callback function to be passed to the Kafka consumer.
 *
 * @param environment This environment in real world use case would be a Kafka producer
 *                    or any other complex processing to do in the message consumption.
 * @param message     The Message to be processed.
 */
def processMessage(environment: String) (message: Message): Unit = {
  ...
}

A possible use of the processMessage message is:

subscribe(consumerSettings = kafkaConsumerSettings, 
          topic = "example_topic")
         (callback: processMessage(environment = "Hello World"))

Final remarks

I hope this post has shown the power of currying with higher-order functions. This technique provides the programmer with a clean and easy way to postpone decisions to run time.

In Scala, launch Java Threads that stay alive forever

Recently I needed to spawn threads to have several Kafka consumers listening from a list of topics. I tried to do this using foreach from a parallel collection but it did not work. In the end I just used a for comprehension to launch a Java Thread for each Kafka consumer. Bellow is a synthesis of the code I used:

import scala.annotation.tailrec
import scala.util.Random

val is: Seq[Int] = 1 to 100

@tailrec
def printToScreen(i: Int): Unit = {
    println(i)
    Thread.sleep(1000 * Random.nextInt(10))
    printToScreen(i)
}

for (i: Int <- is) {
    val thread: Thread = new Thread {
        override def run {
            printToScreen(i)
        }
    }
    thread.start()
}

In the production code I have a list of Kafka consumers configurations and prinToScreen generates a call back that I send to the Kafka consumer.

The code above lunches 100 Threads and prints its corresponding number in random intervals.

Cast from String to Integer in Spark 2

When casting from String to Integer in Spark 2 I found a bug and here I present my workaround. Later I verified the bug does not exist in Spark 3. The bug is related to the existence of spaces between commas and values.

To reproduce the error create a file with these contents, I named it test_feature_engineering.csv:

ID,XACT_DT,DUE_DT,AFT_CHRG_XACT_AMI,BILL_AMT
1,2020-01-11,2020-01-11, 10, 10
2,2020-01-11,2020-01-12, 10, 10
3,2020-01-11 10:10:01,2020-01-12 10:20:00, 10, 10

Open the spark-shell and type the following commands:

Inspecting the output above it is possible to verify that a cast from String to Integer results in res column with null values in df2. df3 shows that casting first to Float and then to Integer produces the desired result.

CI/CD from scratch

To start a brand new CI/CD project three tools are necessary:

  • Git, a source control tool;
  • Nexus, a binary repository tool; and
  • Jenkins, an automation tool.

The above list are widely used FLOSS projects. Any other set of tools will fit the bill. There are two rules to follow no matter what:

  • there is absolutely no space for manual steps, from development to deployment in live environments. It means that every test and deployment must be automated and scripted; and
  • every piece of code, configuration and scripts must be kept in source control.

Source control

Git or any other source control tool should be used just to help the development process. Versioning is done with an artefactory, in our case, Nexus.

In source control, no user is allowed to merge to master. Merge to master is possible only via a pull request (PR). Use this rule everywhere, even in your personal projects where you are the unique contributor!

Artefactory

In an artefactory, for instance Nexus, release versions ready to be deployed are published. Imagine you want to deploy version 2.3.4 to Prod environment. A deployment script (more about it later) grabs the artefact from Nexus and place it in the designated place in the Prod environment.

Continuous Integration/Continuous Deployment

Continuous Integration/Continuous Deployment (CI/CD) is the hardest prat in a modern software project. In modern projects CI/CD is where the maturity level of a team really shines.

CI

CI uses Jenkins scripts. Here we enforce rules such as that you can not open a PR if the code does not compile and passes static code analysis.

When a PR is accepted a Jenkins script should upload the corresponding release version to the artefactory.

CD

CD is hard, really hard. Never ever underestimate the effort a team needs to put on it to get it right. There are no established best practices, blueprints, open-source projects from where to begin. Watch Yevgeniy Brikman’s Youtube presentation “Lessons from 300k+ Lines of Infrastructure Code“.

One of the difficulties inherent to CD is the amount of tools you need to use, synchronize and decide on how to use. For instance, in my current project we are using Ansible, Terraform, Helm, Jenkins, JenkinsX, and more. Do not forget that all the required scripting is itself stored in Git. Often times these scripts have different parametrizations according to the deployment environment, and these parametrizations are stored in Git too.

spark-submit a Python application in cluster mode

To launch a Python Spark application in cluster mode it is necessary to broadcast the application to the workers, using the --py-files directive. I concluded that the best way to do it is to create a fat egg with the .py files, and extract the entry point python file from it. The packaged code is referenced from the Spark application adding this reference in the entry point file:

import sys
sys.path.insert(0, <name of egg file>)

A simple working example can be found here:

https://github.com/sparkfireworks/spark-submit-cluster-python

Changing GitHub project origin

You may need to change a GitHub origin definition in the client-side, some reasons to do this are:

  • the repo name changed;
  • you want to switch from HTTPS to SSH or vice-versa.

Imagine I want to change my authentication method from HTTPS to SSH, the HHTPS url I want to replace is:

https://github.com/sparkfireworks/spark-fireworks.git

the SSH url want to start using is:

git@github.com:sparkfireworks/spark-fireworks.git

Start by listing current origin:

$ git remote -v
origin  https://github.com/sparkfireworks/spark-fireworks.git (fetch)
origin  https://github.com/sparkfireworks/spark-fireworks.git (push)

Now remove the origin no longer in use:

$ git remote remove origin

Finally add the new SSH url:

git remote add origin git@github.com:sparkfireworks/spark-fireworks.git

Now check you successfuly switched to SSH:

$ git remote -v
origin  git@github.com:sparkfireworks/spark-fireworks.git (fetch)
origin  git@github.com:sparkfireworks/spark-fireworks.git (push)