Currying callbacks to reach the next level

When talking with people new to functional programming currying is the hardest concept to explain. Usually, newcomers are very skeptical and keep asking “why is currying relevant?”.

In this post, I will show an example of currying a callback. A callback is a function passed as an argument to another function (a higher-order function).

A real-world practical problem

I created a Kafka consumer that runs in its own thread. To process messages, the Kafka consumer receives a callback with a single argument: a Kafka message to be processed. This way my Kafka consumer abstracts how a message should be processed, postponing this decision to runtime.

Sometimes this callback function requires a complex message processing, for example, sending a reply message to a Kafka topic. Let’s refer to this complex processing requirement as messaging the processing environment. The environment description, name of the reply topic and everything else is known at run time, reading a configuration file.

Curry to the rescue

The Kafka consumer problem requires heavy usage of Currying. I find this solution aesthetically appealing and I would like to hear about your opinion.

First I define a Kafka message with headers and a payload (Kafka’s value):

/** Abstract message to be sent using a messaging system, for instance Pub/Sub or Kafka
 * attributes are message metadata, payload is the message content
 */
case class Message(attributes: Map[String, String], payload: String)

Here is the Kafka consumer code:

import java.time.Duration
import java.util.{Arrays, Properties}

import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecords, KafkaConsumer}

import scala.collection.JavaConverters._

object KafkaSubscriber extends StrictLogging {

  /** Creates Properties for the Kafka consumer
   *
   * @param bootstrap_servers        Zookeepers servers
   * @param group_id                 Group ID to which the consumer belongs to
   * @param key_deserializer_class   Key deserialization
   * @param value_deserializer_class Payload deserialization
   * @return Properties for the Kafka consumer
   */
  def kafkaConsumerSettings(bootstrap_servers: String,
                            group_id: String,
                            key_deserializer_class: String,
                            value_deserializer_class: String
                           ): Properties = {

    val properties: Properties = new Properties()

    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, group_id)
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer_class)
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, value_deserializer_class)

    properties
  }

  /** Subscribe to a topic consuming it with a callback.
   *
   * @param consumerSettings Properties for the consumer
   * @param topic            Topic to read from
   * @param callback         Function that performs an action over the message
   */
  def subscribe(consumerSettings: Properties, topic: String)(callback: Message => Unit): Unit = {

    logger.info(s"Starting consumer on Kafka topic: ${topic}.")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](consumerSettings)
    consumer.subscribe(Arrays.asList(topic))

    try {
      while (true) {
        val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(100))

        records.asScala.foreach(rec => {
          val attributes: Map[String, String] =
            rec
              .headers()
              .toArray
              .map(x => Tuple2(x.key, new String(x.value)))
              .toMap
          val payload: String = rec.value()
          callback(Message(attributes = attributes, payload = payload))
        })
      }
    } finally {
      consumer.close()
    }
  }
}

Lets look at the signature of a function to be sent as a callback to the above subscribe function. This callback sends an environment for the Kafka consumer:

/** Callback function to be passed to the Kafka consumer.
 *
 * @param environment This environment in real world use case would be a Kafka producer
 *                    or any other complex processing to do in the message consumption.
 * @param message     The Message to be processed.
 */
def processMessage(environment: String) (message: Message): Unit = {
  ...
}

A possible use of the processMessage message is:

subscribe(consumerSettings = kafkaConsumerSettings, 
          topic = "example_topic")
         (callback: processMessage(environment = "Hello World"))

Final remarks

I hope this post has shown the power of currying with higher-order functions. This technique provides the programmer with a clean and easy way to postpone decisions to run time.

In Scala, launch Java Threads that stay alive forever

Recently I needed to spawn threads to have several Kafka consumers listening from a list of topics. I tried to do this using foreach from a parallel collection but it did not work. In the end I just used a for comprehension to launch a Java Thread for each Kafka consumer. Bellow is a synthesis of the code I used:

import scala.annotation.tailrec
import scala.util.Random

val is: Seq[Int] = 1 to 100

@tailrec
def printToScreen(i: Int): Unit = {
    println(i)
    Thread.sleep(1000 * Random.nextInt(10))
    printToScreen(i)
}

for (i: Int <- is) {
    val thread: Thread = new Thread {
        override def run {
            printToScreen(i)
        }
    }
    thread.start()
}

In the production code I have a list of Kafka consumers configurations and prinToScreen generates a call back that I send to the Kafka consumer.

The code above lunches 100 Threads and prints its corresponding number in random intervals.