It is not Just Functional Requirements – the Importance of CI/CD

Everybody is talking about cloud-native. Infrastructure is moving to the cloud. In the meantime, non-technical staff fails to realize that code that fulfills functional requirements is a small fraction of the final delivery.

There is an impedance mismatch between business and information technology professionals on what is a software delivery.

This impedance mismatch can only be reduced by tearing down the walls that separate business and development team. This is a movement that started with Agile and XP (eXtreme Programming) but this a road that must be further explored. The communication must go beyond meetings, hybrid teams should have working sessions together, for instance, business people doing pair-programming with developers.

This article presents a bird view of what makes a cloud-native project, a description of its constituents, and why their presence is mandatory.

Fictitious software project

In this article I will use the following scenario: recently, a fictitious company decided to move to GCP (Google Cloud Platform) and adopted a cloud-native approach. The main development language is Python.

Cloud-native requires the extensive use of cloud provider serverless offerings and managed services. Everything must be automated, i.e., there is no space for manual deployments, environment or infrastructure commands, and any other non-repeatable and scripted actions. In fact, nothing can be done using graphical user interfaces.

A new project has been launched and a set of functional requirements were passed to the development team. Functional requirements are product features or functions that developers must implement to enable users to accomplish their tasks. Naturally, functional requirements will be fulfilled in Python code.

Multiple levels of code

Now I want to give you the shocking news: the Python code that directly responds to the project requirements is just 10 to 20% of the final codebase!

In this article, I will explain what the development team will be doing and the software layers present in a modern cloud-native project.

Application code

Application code is the Python and SQL code that responds to functional requirements. This is the code everybody is expecting from the beginning. No surprises here.

Unit tests

Unit test are also developed in Python.

Continuous Integration (CI)

Continuous Delivery (CD)

Continuous Deployment (CD)

Infrastructure as Code (IaC)

Tests everywhere

Unit tests

Integration tests

Saga pattern in Kafka based micro-services – let’s do distributed commits

Often times there is a need to create the concept of an atomic commit in a distributed system. Consider the following scenario: you split your former monolith into several micro-services, that communicate among them via asynchronous message passing. If a business process requires the contribution of several services, it might be the case that a single service failure implies the rollback of the entire flow. A practical example is the processing of a web store’s shopping basket, if the credit card service is unable to charge the customer, the order must be canceled.

This article presents the saga pattern and discusses its implementation in a Kafka based micro-services system.

Moving away from 2PC

The saga pattern

Creating a saga

Injecting a saga into a Kafka message

Deduplication

Sagas benefits

Conclusions

Get keys and values from a Scala Map

scala> val states = Map("AK" -> "Alaska", "AL" -> "Alabama", "AR" -> "Arkansas")
states: scala.collection.immutable.Map[String,String] = Map(AK -> Alaska, AL -> Alabama, AR -> Arkansas)

scala> states.keySet
res0: scala.collection.immutable.Set[String] = Set(AK, AL, AR)

scala> states.keys
res1: Iterable[String] = Set(AK, AL, AR)

scala> states.values
res2: Iterable[String] = MapLike.DefaultValuesIterable(Alaska, Alabama, Arkansas)

scala> states("AK")
res3: String = Alaska

states + ("NY" -> "New York")
res7: scala.collection.immutable.Map[String,String] = Map(AK -> Alaska, AL -> Alabama, AR -> Arkansas, NY -> New York)

Idempotence

The more I work with systems exchanging messages among them, the more I am aware of idempotency.

Formally speaking, in mathematics, an idempotent function has the same effect no mater how many times it is applied. Think about a remote to lock/unlock a car, most remotes will just lock the doors no matter how many times you press the lock switch. In mathematical notation:

f(x)=f(f(x))

Why is it so important? Well, if you have a set of uncoupled systems, you can experiment partitions or other problems that prevent messages from being successfully processed. If this is the case the first thing you need is the ability to retry sending the message not warring about a possible duplicate effect on the world.

How to implement idempotency

There are two options to do idempotency:

  • store state at the server so that it recognises a duplicate request and just ignores it, usually it is achieved storing the processed messages IDs; or
  • make the algorithms idempotent, imagine something like when the message encodes an intention rather then the way to do something. One example would be in a game have the instruction grab sward, no matter the amount of times it is processed the effect would be the same.

Final remarks

I strongly support idempotent code, from the start, even if right now you do not figure out the benefits. I always expect idempotency to emerge as an essential way to avoid expensive tasks to avoid unintended duplicates.

Message attributes in messaging systems

For me the best way to create micro-services is to follow a small set of rules:

  • highly decoupled – each service does not know anything about other services;
  • asynchronous communication – reactive system;
  • stateless wherever possible; and
  • small services – a single developer should be able to maintain the entire micro-service in her head.

There is no better way to create decoupling than to use a messaging system. A service subscribes to one or several topics and publishes to one or more topics. This way a particular service does not know where messages are originated and is not aware of what happens downstream.

Point to point systems

Traditional systems where components communicate directly rely on the knowledge that consumers have on servers endpoints. This is the philosophy behind REST and gRPC. Each service is passively waiting for its services to be requested, exposing endpoints for clients to call. The client usually maintains the connection open waiting for a response.

Event driven reactive systems

Event driven reactive systems communicate via asynchronous messages. This messages should go flow through channels in a publish-subscribe pattern. A service subscribes to topics and publish to other topics. Just this easy. A message is an event. The system works reacting to messages flow, without need for a coordinator or orchestrator.

Canonical data model

The first step is to create a canonical data model for messages to conform to. In Kafka this is the schema registry. Each service must adapt its events internal representation to the canonical data model when consuming or producing messages.

Message attributes

A very important aspect is the distinction between payload and attributes (in Kafka they are called headers). The payload is the contents of the message, the event contents, for instance computation results or reply to a query. Attributes give context, for instance a timestamp.

Attributes are a fundamental piece of a micro-services architecture but usually they do not receive the attention they deserve. Attributes are seldom mentioned.

Monitoring and observability

The most important use is to create a monitoring and observability layer. A service mesh leads to a complex web of paths, and the decoupling of its components makes it hard to understand the system at rest. Its the monitoring of the live system that emerges system patterns.

Every message should be enriched with metadata, and metadata place is in the attributes. Attributes are typically:

  • timestamp;
  • message ID;
  • correlation ID (if it is a reply to a message or part of a set of messages);
  • relevant time to reply;
  • sender;
  • owner; and
  • if it is a probe message (for instance a message sent periodically to access system health).

Attributes are processed by other systems, for instance ElasticSearch, to provide metrics, warnings and errors.

You may be thinking that usually this is solved using logs. I argue that collecting logs is an invaluable post-mortem tool and some alerting but total reliance on logs is an antipattern. I could see it happen to surface the time it takes to process a message.

If there is metadata to associate with messages, it is the correct way to handle it. An event should be directly linked with its metadata. This way metadata becomes data fed to control systems in a very direct way. Metadata may start to be processed in the future as the system matures in ever more complex ways, increasing our understanding and enhancing our control over the system.

Currying callbacks to reach the next level

When talking with people new to functional programming currying is the hardest concept to explain. Usually, newcomers are very skeptical and keep asking “why is currying relevant?”.

In this post, I will show an example of currying a callback. A callback is a function passed as an argument to another function (a higher-order function).

A real-world practical problem

I created a Kafka consumer that runs in its own thread. To process messages, the Kafka consumer receives a callback with a single argument: a Kafka message to be processed. This way my Kafka consumer abstracts how a message should be processed, postponing this decision to runtime.

Sometimes this callback function requires a complex message processing, for example, sending a reply message to a Kafka topic. Let’s refer to this complex processing requirement as messaging the processing environment. The environment description, name of the reply topic and everything else is known at run time, reading a configuration file.

Curry to the rescue

The Kafka consumer problem requires heavy usage of Currying. I find this solution aesthetically appealing and I would like to hear about your opinion.

First I define a Kafka message with headers and a payload (Kafka’s value):

/** Abstract message to be sent using a messaging system, for instance Pub/Sub or Kafka
 * attributes are message metadata, payload is the message content
 */
case class Message(attributes: Map[String, String], payload: String)

Here is the Kafka consumer code:

import java.time.Duration
import java.util.{Arrays, Properties}

import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecords, KafkaConsumer}

import scala.collection.JavaConverters._

object KafkaSubscriber extends StrictLogging {

  /** Creates Properties for the Kafka consumer
   *
   * @param bootstrap_servers        Zookeepers servers
   * @param group_id                 Group ID to which the consumer belongs to
   * @param key_deserializer_class   Key deserialization
   * @param value_deserializer_class Payload deserialization
   * @return Properties for the Kafka consumer
   */
  def kafkaConsumerSettings(bootstrap_servers: String,
                            group_id: String,
                            key_deserializer_class: String,
                            value_deserializer_class: String
                           ): Properties = {

    val properties: Properties = new Properties()

    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, group_id)
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer_class)
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, value_deserializer_class)

    properties
  }

  /** Subscribe to a topic consuming it with a callback.
   *
   * @param consumerSettings Properties for the consumer
   * @param topic            Topic to read from
   * @param callback         Function that performs an action over the message
   */
  def subscribe(consumerSettings: Properties, topic: String)(callback: Message => Unit): Unit = {

    logger.info(s"Starting consumer on Kafka topic: ${topic}.")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](consumerSettings)
    consumer.subscribe(Arrays.asList(topic))

    try {
      while (true) {
        val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(100))

        records.asScala.foreach(rec => {
          val attributes: Map[String, String] =
            rec
              .headers()
              .toArray
              .map(x => Tuple2(x.key, new String(x.value)))
              .toMap
          val payload: String = rec.value()
          callback(Message(attributes = attributes, payload = payload))
        })
      }
    } finally {
      consumer.close()
    }
  }
}

Lets look at the signature of a function to be sent as a callback to the above subscribe function. This callback sends an environment for the Kafka consumer:

/** Callback function to be passed to the Kafka consumer.
 *
 * @param environment This environment in real world use case would be a Kafka producer
 *                    or any other complex processing to do in the message consumption.
 * @param message     The Message to be processed.
 */
def processMessage(environment: String) (message: Message): Unit = {
  ...
}

A possible use of the processMessage message is:

subscribe(consumerSettings = kafkaConsumerSettings, 
          topic = "example_topic")
         (callback: processMessage(environment = "Hello World"))

Final remarks

I hope this post has shown the power of currying with higher-order functions. This technique provides the programmer with a clean and easy way to postpone decisions to run time.

In Scala, launch Java Threads that stay alive forever

Recently I needed to spawn threads to have several Kafka consumers listening from a list of topics. I tried to do this using foreach from a parallel collection but it did not work. In the end I just used a for comprehension to launch a Java Thread for each Kafka consumer. Bellow is a synthesis of the code I used:

import scala.annotation.tailrec
import scala.util.Random

val is: Seq[Int] = 1 to 100

@tailrec
def printToScreen(i: Int): Unit = {
    println(i)
    Thread.sleep(1000 * Random.nextInt(10))
    printToScreen(i)
}

for (i: Int <- is) {
    val thread: Thread = new Thread {
        override def run {
            printToScreen(i)
        }
    }
    thread.start()
}

In the production code I have a list of Kafka consumers configurations and prinToScreen generates a call back that I send to the Kafka consumer.

The code above lunches 100 Threads and prints its corresponding number in random intervals.

Cast from String to Integer in Spark 2

When casting from String to Integer in Spark 2 I found a bug and here I present my workaround. Later I verified the bug does not exist in Spark 3. The bug is related to the existence of spaces between commas and values.

To reproduce the error create a file with these contents, I named it test_feature_engineering.csv:

ID,XACT_DT,DUE_DT,AFT_CHRG_XACT_AMI,BILL_AMT
1,2020-01-11,2020-01-11, 10, 10
2,2020-01-11,2020-01-12, 10, 10
3,2020-01-11 10:10:01,2020-01-12 10:20:00, 10, 10

Open the spark-shell and type the following commands:

Inspecting the output above it is possible to verify that a cast from String to Integer results in res column with null values in df2. df3 shows that casting first to Float and then to Integer produces the desired result.

CI/CD from scratch

To start a brand new CI/CD project three tools are necessary:

  • Git, a source control tool;
  • Nexus, a binary repository tool; and
  • Jenkins, an automation tool.

The above list are widely used FLOSS projects. Any other set of tools will fit the bill. There are two rules to follow no matter what:

  • there is absolutely no space for manual steps, from development to deployment in live environments. It means that every test and deployment must be automated and scripted; and
  • every piece of code, configuration and scripts must be kept in source control.

Source control

Git or any other source control tool should be used just to help the development process. Versioning is done with an artefactory, in our case, Nexus.

In source control, no user is allowed to merge to master. Merge to master is possible only via a pull request (PR). Use this rule everywhere, even in your personal projects where you are the unique contributor!

Artefactory

In an artefactory, for instance Nexus, release versions ready to be deployed are published. Imagine you want to deploy version 2.3.4 to Prod environment. A deployment script (more about it later) grabs the artefact from Nexus and place it in the designated place in the Prod environment.

Continuous Integration/Continuous Deployment

Continuous Integration/Continuous Deployment (CI/CD) is the hardest prat in a modern software project. In modern projects CI/CD is where the maturity level of a team really shines.

CI

CI uses Jenkins scripts. Here we enforce rules such as that you can not open a PR if the code does not compile and passes static code analysis.

When a PR is accepted a Jenkins script should upload the corresponding release version to the artefactory.

CD

CD is hard, really hard. Never ever underestimate the effort a team needs to put on it to get it right. There are no established best practices, blueprints, open-source projects from where to begin. Watch Yevgeniy Brikman’s Youtube presentation “Lessons from 300k+ Lines of Infrastructure Code“.

One of the difficulties inherent to CD is the amount of tools you need to use, synchronize and decide on how to use. For instance, in my current project we are using Ansible, Terraform, Helm, Jenkins, JenkinsX, and more. Do not forget that all the required scripting is itself stored in Git. Often times these scripts have different parametrizations according to the deployment environment, and these parametrizations are stored in Git too.