Get keys and values from a Scala Map

scala> val states = Map("AK" -> "Alaska", "AL" -> "Alabama", "AR" -> "Arkansas")
states: scala.collection.immutable.Map[String,String] = Map(AK -> Alaska, AL -> Alabama, AR -> Arkansas)

scala> states.keySet
res0: scala.collection.immutable.Set[String] = Set(AK, AL, AR)

scala> states.keys
res1: Iterable[String] = Set(AK, AL, AR)

scala> states.values
res2: Iterable[String] = MapLike.DefaultValuesIterable(Alaska, Alabama, Arkansas)

scala> states("AK")
res3: String = Alaska

states + ("NY" -> "New York")
res7: scala.collection.immutable.Map[String,String] = Map(AK -> Alaska, AL -> Alabama, AR -> Arkansas, NY -> New York)

Idempotence

The more I work with systems exchanging messages among them, the more I am aware of idempotency.

Formally speaking, in mathematics, an idempotent function has the same effect no mater how many times it is applied. Think about a remote to lock/unlock a car, most remotes will just lock the doors no matter how many times you press the lock switch. In mathematical notation:

f(x)=f(f(x))

Why is it so important? Well, if you have a set of uncoupled systems, you can experiment partitions or other problems that prevent messages from being successfully processed. If this is the case the first thing you need is the ability to retry sending the message not warring about a possible duplicate effect on the world.

How to implement idempotency

There are two options to do idempotency:

  • store state at the server so that it recognises a duplicate request and just ignores it, usually it is achieved storing the processed messages IDs; or
  • make the algorithms idempotent, imagine something like when the message encodes an intention rather then the way to do something. One example would be in a game have the instruction grab sward, no matter the amount of times it is processed the effect would be the same.

Final remarks

I strongly support idempotent code, from the start, even if right now you do not figure out the benefits. I always expect idempotency to emerge as an essential way to avoid expensive tasks to avoid unintended duplicates.

Message attributes in messaging systems

For me the best way to create micro-services is to follow a small set of rules:

  • highly decoupled – each service does not know anything about other services;
  • asynchronous communication – reactive system;
  • stateless wherever possible; and
  • small services – a single developer should be able to maintain the entire micro-service in her head.

There is no better way to create decoupling than to use a messaging system. A service subscribes to one or several topics and publishes to one or more topics. This way a particular service does not know where messages are originated and is not aware of what happens downstream.

Point to point systems

Traditional systems where components communicate directly rely on the knowledge that consumers have on servers endpoints. This is the philosophy behind REST and gRPC. Each service is passively waiting for its services to be requested, exposing endpoints for clients to call. The client usually maintains the connection open waiting for a response.

Event driven reactive systems

Event driven reactive systems communicate via asynchronous messages. This messages should go flow through channels in a publish-subscribe pattern. A service subscribes to topics and publish to other topics. Just this easy. A message is an event. The system works reacting to messages flow, without need for a coordinator or orchestrator.

Canonical data model

The first step is to create a canonical data model for messages to conform to. In Kafka this is the schema registry. Each service must adapt its events internal representation to the canonical data model when consuming or producing messages.

Message attributes

A very important aspect is the distinction between payload and attributes (in Kafka they are called headers). The payload is the contents of the message, the event contents, for instance computation results or reply to a query. Attributes give context, for instance a timestamp.

Attributes are a fundamental piece of a micro-services architecture but usually they do not receive the attention they deserve. Attributes are seldom mentioned.

Monitoring and observability

The most important use is to create a monitoring and observability layer. A service mesh leads to a complex web of paths, and the decoupling of its components makes it hard to understand the system at rest. Its the monitoring of the live system that emerges system patterns.

Every message should be enriched with metadata, and metadata place is in the attributes. Attributes are typically:

  • timestamp;
  • message ID;
  • correlation ID (if it is a reply to a message or part of a set of messages);
  • relevant time to reply;
  • sender;
  • owner; and
  • if it is a probe message (for instance a message sent periodically to access system health).

Attributes are processed by other systems, for instance ElasticSearch, to provide metrics, warnings and errors.

You may be thinking that usually this is solved using logs. I argue that collecting logs is an invaluable post-mortem tool and some alerting but total reliance on logs is an antipattern. I could see it happen to surface the time it takes to process a message.

If there is metadata to associate with messages, it is the correct way to handle it. An event should be directly linked with its metadata. This way metadata becomes data fed to control systems in a very direct way. Metadata may start to be processed in the future as the system matures in ever more complex ways, increasing our understanding and enhancing our control over the system.

Currying callbacks to reach the next level

When talking with people new to functional programming currying is the hardest concept to explain. Usually, newcomers are very skeptical and keep asking “why is currying relevant?”.

In this post, I will show an example of currying a callback. A callback is a function passed as an argument to another function (a higher-order function).

A real-world practical problem

I created a Kafka consumer that runs in its own thread. To process messages, the Kafka consumer receives a callback with a single argument: a Kafka message to be processed. This way my Kafka consumer abstracts how a message should be processed, postponing this decision to runtime.

Sometimes this callback function requires a complex message processing, for example, sending a reply message to a Kafka topic. Let’s refer to this complex processing requirement as messaging the processing environment. The environment description, name of the reply topic and everything else is known at run time, reading a configuration file.

Curry to the rescue

The Kafka consumer problem requires heavy usage of Currying. I find this solution aesthetically appealing and I would like to hear about your opinion.

First I define a Kafka message with headers and a payload (Kafka’s value):

/** Abstract message to be sent using a messaging system, for instance Pub/Sub or Kafka
 * attributes are message metadata, payload is the message content
 */
case class Message(attributes: Map[String, String], payload: String)

Here is the Kafka consumer code:

import java.time.Duration
import java.util.{Arrays, Properties}

import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecords, KafkaConsumer}

import scala.collection.JavaConverters._

object KafkaSubscriber extends StrictLogging {

  /** Creates Properties for the Kafka consumer
   *
   * @param bootstrap_servers        Zookeepers servers
   * @param group_id                 Group ID to which the consumer belongs to
   * @param key_deserializer_class   Key deserialization
   * @param value_deserializer_class Payload deserialization
   * @return Properties for the Kafka consumer
   */
  def kafkaConsumerSettings(bootstrap_servers: String,
                            group_id: String,
                            key_deserializer_class: String,
                            value_deserializer_class: String
                           ): Properties = {

    val properties: Properties = new Properties()

    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, group_id)
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer_class)
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, value_deserializer_class)

    properties
  }

  /** Subscribe to a topic consuming it with a callback.
   *
   * @param consumerSettings Properties for the consumer
   * @param topic            Topic to read from
   * @param callback         Function that performs an action over the message
   */
  def subscribe(consumerSettings: Properties, topic: String)(callback: Message => Unit): Unit = {

    logger.info(s"Starting consumer on Kafka topic: ${topic}.")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](consumerSettings)
    consumer.subscribe(Arrays.asList(topic))

    try {
      while (true) {
        val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(100))

        records.asScala.foreach(rec => {
          val attributes: Map[String, String] =
            rec
              .headers()
              .toArray
              .map(x => Tuple2(x.key, new String(x.value)))
              .toMap
          val payload: String = rec.value()
          callback(Message(attributes = attributes, payload = payload))
        })
      }
    } finally {
      consumer.close()
    }
  }
}

Lets look at the signature of a function to be sent as a callback to the above subscribe function. This callback sends an environment for the Kafka consumer:

/** Callback function to be passed to the Kafka consumer.
 *
 * @param environment This environment in real world use case would be a Kafka producer
 *                    or any other complex processing to do in the message consumption.
 * @param message     The Message to be processed.
 */
def processMessage(environment: String) (message: Message): Unit = {
  ...
}

A possible use of the processMessage message is:

subscribe(consumerSettings = kafkaConsumerSettings, 
          topic = "example_topic")
         (callback: processMessage(environment = "Hello World"))

Final remarks

I hope this post has shown the power of currying with higher-order functions. This technique provides the programmer with a clean and easy way to postpone decisions to run time.

In Scala, launch Java Threads that stay alive forever

Recently I needed to spawn threads to have several Kafka consumers listening from a list of topics. I tried to do this using foreach from a parallel collection but it did not work. In the end I just used a for comprehension to launch a Java Thread for each Kafka consumer. Bellow is a synthesis of the code I used:

import scala.annotation.tailrec
import scala.util.Random

val is: Seq[Int] = 1 to 100

@tailrec
def printToScreen(i: Int): Unit = {
    println(i)
    Thread.sleep(1000 * Random.nextInt(10))
    printToScreen(i)
}

for (i: Int <- is) {
    val thread: Thread = new Thread {
        override def run {
            printToScreen(i)
        }
    }
    thread.start()
}

In the production code I have a list of Kafka consumers configurations and prinToScreen generates a call back that I send to the Kafka consumer.

The code above lunches 100 Threads and prints its corresponding number in random intervals.

Cast from String to Integer in Spark 2

When casting from String to Integer in Spark 2 I found a bug and here I present my workaround. Later I verified the bug does not exist in Spark 3. The bug is related to the existence of spaces between commas and values.

To reproduce the error create a file with these contents, I named it test_feature_engineering.csv:

ID,XACT_DT,DUE_DT,AFT_CHRG_XACT_AMI,BILL_AMT
1,2020-01-11,2020-01-11, 10, 10
2,2020-01-11,2020-01-12, 10, 10
3,2020-01-11 10:10:01,2020-01-12 10:20:00, 10, 10

Open the spark-shell and type the following commands:

Inspecting the output above it is possible to verify that a cast from String to Integer results in res column with null values in df2. df3 shows that casting first to Float and then to Integer produces the desired result.