When talking with people new to functional programming currying is the hardest concept to explain. Usually, newcomers are very skeptical and keep asking “why is currying relevant?”.
In this post, I will show an example of currying a callback. A callback is a function passed as an argument to another function (a higher-order function).
A real-world practical problem
I created a Kafka consumer that runs in its own thread. To process messages, the Kafka consumer receives a callback with a single argument: a Kafka message to be processed. This way my Kafka consumer abstracts how a message should be processed, postponing this decision to runtime.
Sometimes this callback function requires a complex message processing, for example, sending a reply message to a Kafka topic. Let’s refer to this complex processing requirement as messaging the processing environment
. The environment
description, name of the reply topic and everything else is known at run time, reading a configuration file.
Curry to the rescue
The Kafka consumer problem requires heavy usage of Currying. I find this solution aesthetically appealing and I would like to hear about your opinion.
First I define a Kafka message with headers and a payload (Kafka’s value):
/** Abstract message to be sent using a messaging system, for instance Pub/Sub or Kafka
* attributes are message metadata, payload is the message content
*/
case class Message(attributes: Map[String, String], payload: String)
Here is the Kafka consumer code:
import java.time.Duration
import java.util.{Arrays, Properties}
import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecords, KafkaConsumer}
import scala.collection.JavaConverters._
object KafkaSubscriber extends StrictLogging {
/** Creates Properties for the Kafka consumer
*
* @param bootstrap_servers Zookeepers servers
* @param group_id Group ID to which the consumer belongs to
* @param key_deserializer_class Key deserialization
* @param value_deserializer_class Payload deserialization
* @return Properties for the Kafka consumer
*/
def kafkaConsumerSettings(bootstrap_servers: String,
group_id: String,
key_deserializer_class: String,
value_deserializer_class: String
): Properties = {
val properties: Properties = new Properties()
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group_id)
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer_class)
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, value_deserializer_class)
properties
}
/** Subscribe to a topic consuming it with a callback.
*
* @param consumerSettings Properties for the consumer
* @param topic Topic to read from
* @param callback Function that performs an action over the message
*/
def subscribe(consumerSettings: Properties, topic: String)(callback: Message => Unit): Unit = {
logger.info(s"Starting consumer on Kafka topic: ${topic}.")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](consumerSettings)
consumer.subscribe(Arrays.asList(topic))
try {
while (true) {
val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(100))
records.asScala.foreach(rec => {
val attributes: Map[String, String] =
rec
.headers()
.toArray
.map(x => Tuple2(x.key, new String(x.value)))
.toMap
val payload: String = rec.value()
callback(Message(attributes = attributes, payload = payload))
})
}
} finally {
consumer.close()
}
}
}
Lets look at the signature of a function to be sent as a callback to the above subscribe
function. This callback sends an environment for the Kafka consumer:
/** Callback function to be passed to the Kafka consumer.
*
* @param environment This environment in real world use case would be a Kafka producer
* or any other complex processing to do in the message consumption.
* @param message The Message to be processed.
*/
def processMessage(environment: String) (message: Message): Unit = {
...
}
A possible use of the processMessage
message is:
subscribe(consumerSettings = kafkaConsumerSettings,
topic = "example_topic")
(callback: processMessage(environment = "Hello World"))
Final remarks
I hope this post has shown the power of currying with higher-order functions. This technique provides the programmer with a clean and easy way to postpone decisions to run time.