Implementing Saga Pattern in Nodejs Microservices

In this article, we will how to implement saga pattern in nodejs microservices. Implementing Saga Pattern in Nodejs Microservices.

Firstly, what is a saga pattern in microservices and why kind of problem that it solves.

Saga Pattern

Let's try to understand the saga pattern with an example. consider an application like Asana where a project contains a task and task contains a subtask.

project details will be in project service and task and subtask will be in subtask service with each service has their own database.

What happens to Task and Subtask service when user deletes the Project. How can you maintain the data consistency across all services.

That is to say, Saga pattern solves the problem of data consistency across different services.

Saga Pattern types

There are two kind of methods that are used in saga pattern. they are,

Orchestration-based Saga

Orchestration based saga is a pattern method where a orchestrator service maintains the communication(command/reply) between services.

So, it helps to mains the data consistency across services.

Choreography-Based Saga

In this method, there is no central orchestrator. each services will have a command/reply events. so, for every reply, it will update the database consistently.

Implementation in Nodejs Microservices

Mainly, we will see an example of nodejs microservices where data consistency is a crucial part.

Complete Source code can be found here

Note : This Code is just to demonstrate how saga pattern will work in nodejs microservices. it doesn't have all the business logics on services. Feel free to complete the service if you are interested. (PR's are always welcome)

saga nodejs

In this example, we have an e-commerce application. it contains order service, payment service and stock service.

whenever user places an order, we need to implement the complete flow of order, payment and delivery of items which involves order service, payment service and stock service.

Here, data consistency place a crucial. Let's see how to implement it Orchestator based saga pattern.

Structure

Screenshot 2020 01 16 at 2 44 01 PM

  • KafkaBroker - it contains all the kafka producer, consumer and routes logic. All the services will be using this to publish and receive events from kafka.
  • orchestatorService - it contains all the logics to implement the orchestration of saga pattern.
  • orderService - this service will handle all the order business logics.
  • paymentService - it will handles all the payment business logics.

we will be using kafka-node for kafka communication in nodejs. if you prefer kafkajs, feel free to do that.

KafkaBroker

If you are new to kafka, read this article to get a good grasp of it.

Create a directory kafkaHandler inside kafkaBootstrap. Here, we are going to create producer and consumer logics for kafka.

After that, create a file called producer.js and add the following code

1const Kafka = require("kafka-node")
2
3const Producer = Kafka.Producer
4const client = new Kafka.KafkaClient()
5
6// For creating Topics.
7// Only admins were able to create topics
8const admin = new Kafka.Admin(client)
9
10let producer
11
12let producerReady
13
14const bindListeners = function bindListeners() {
15 producerReady = new Promise((resolve, reject) => {
16 producer.on("ready", () => {
17 console.log("producer ready")
18 resolve(producer)
19 })
20
21 producer.on("error", err => {
22 console.log("producer err", err)
23 reject(err)
24 })
25 })
26}
27
28const initializeProducer = () => {
29 producer = new Producer(client)
30
31 bindListeners()
32}
33
34/*
35 * A Higher level producer which sends a message to a particular topic
36 */
37const ProducerService = function ProducerService() {
38 initializeProducer()
39}
40
41/*
42 * Sends a message from the kafka instance
43 **/
44ProducerService.prototype.produce = function produce(
45 topic,
46 messages,
47 partition = 0
48) {
49 // Returns data if producer success
50 return producerReady.then(producer => {
51 const payload = [{ topic, messages, partition }]
52 return new Promise((resolve, reject) => {
53 producer.send(payload, function(err, data) {
54 if (err) {
55 console.log("Error while producing data in this service")
56 reject(err)
57 }
58 resolve(data)
59 })
60 })
61 })
62}
63
64ProducerService.prototype.createTopic = function createTopic(topics) {
65 return producerReady.then(producer => {
66 return new Promise((resolve, reject) => {
67 producer.createTopics(topics, (err, res) => {
68 if (err) {
69 console.log("Error while creating a topic")
70 reject(err)
71 }
72
73 console.log("Topics created successfully")
74 resolve(res)
75 })
76 })
77 })
78}
79
80module.exports = ProducerService

Here, we have few methods on the producer. they are,

  • Initializing the producer by binding on ready and on error call back functions.
  • produce method that takes the topic and message and send the message to the specified topic.
  • createTopic method that creates a topic if not exists.

create a file called Consumer.js and add the following code

1const kafkaNode = require("kafka-node")
2
3const client = new kafkaNode.KafkaClient()
4const offset = new kafkaNode.Offset(client)
5
6const Consumer = kafkaNode.Consumer
7
8let consumer
9
10let consumerReady
11
12var defaultOptions = {
13 encoding: "utf8", // default is utf8, use 'buffer' for binary data
14 fromOffset: -1, // default,
15 autoCommit: true,
16}
17
18const bindEventListeners = function bindEventListeners(options, topic) {
19 consumerReady = new Promise((resolve, reject) => {
20 try {
21 consumer = new Consumer(client, [], options)
22 consumer.on("error", err => {
23 console.log(`Error occured on consumer group ${topic}`)
24 })
25 resolve(consumer)
26 } catch (e) {
27 reject(e)
28 }
29 })
30}
31
32const initializeConsumer = function initializeConsumer(defaultTopic) {
33 const options = defaultOptions
34
35 bindEventListeners(options, defaultTopic)
36}
37
38const ConsumerService = function ConsumerService(defaultTopic) {
39 console.log("initializing consumer ")
40 initializeConsumer(defaultTopic)
41}
42
43ConsumerService.prototype.addTopics = function addTopics(topicArray) {
44 return new Promise((resolve, reject) => {
45 consumerReady
46 .then(consumer => {
47 console.log("adding topics ", topicArray)
48 consumer.addTopics(topicArray, function(err, added) {
49 console.log("topics added ", err, added)
50 resolve(added)
51 })
52 })
53 .catch(e => {
54 console.log("errror while creating topic ", e)
55 })
56 })
57}
58
59ConsumerService.prototype.consume = function consume(cb) {
60 consumerReady
61 .then(consumer => {
62 console.log("consumer ready")
63 consumer.on("message", message => {
64 // console.log('recieved message ', message);
65 cb(message)
66 })
67 })
68 .catch(e => {
69 console.log("errror while consuming", e)
70 })
71}
72
73module.exports = ConsumerService

Here, we have few methods for consumer. they are,

  • Initializing the consumer by binding on ready and on error call back functions.
  • addTopic method will add the topic for the consumer to consume.
  • consume method will receive the message from producer and sends it to callback

After that, create a file called kafkaBootstrap.js and add the following code

1const kafka = require("kafka-node")
2
3const Producer = require("../kafkaBroker/kafkaHandler/Producer")
4
5const producer = new Producer()
6
7const topics = [
8 { topic: "ORDER_SERVICE", partitions: 1, replicationFactor: 1 },
9 { topic: "PAYMENT_SERVICE", partitions: 1, replicationFactor: 1 },
10 { topic: "STOCK_SERVICE", partitions: 1, replicationFactor: 1 },
11 { topic: "ORCHESTATOR_SERVICE", partitions: 1, replicationFactor: 1 },
12]
13
14producer
15 .createTopic(topics)
16 .then(res => {})
17 .catch(err => {
18 console.log(`Error ${err}`)
19 })

Here, we create a topic if not exists, run these code for the first time to create topics.

Order Service

Screenshot 2020 01 16 at 11 26 08 PM

  • Controller - it handles the request and business logics.
  • eventHandler - it helps to handle all the kafka messages and maps it with business logics.
  • Model - it contains all the database models.

After that, create a file app.js and add the following code

1const express = require("express")
2const bodyParser = require("body-parser")
3const mongoose = require("mongoose")
4
5const Consumer = require("../../kafkaBroker/kafkaHandler/Consumer")
6const eventHandler = require("./eventHandler")
7const CreateOrder = require("./Controller/createOrder")
8const app = express()
9
10app.use(bodyParser.json())
11app.use(bodyParser.urlencoded({ extended: false }))
12
13mongoose
14 .connect("mongodb://localhost:27017/orderdb", {
15 useNewUrlParser: true,
16 useUnifiedTopology: true,
17 })
18 .then(data => {
19 app.post("/createorder", CreateOrder)
20
21 const PORT = 3000
22
23 app.listen(PORT, () => {
24 console.log("server is running on port 3000")
25 })
26
27 const consumer = new Consumer()
28
29 consumer.addTopics(["ORDER_SERVICE", "SERVICE_REPLY"]).then(() => {
30 consumer.consume(message => {
31 console.log("consumed message", message)
32 eventHandler(JSON.parse(message))
33 })
34 })
35 })
36 .catch(err => {
37 console.log(`Error in Mongo Connection ${err}`)
38 })

Here, we setup a mongodb connection and add topics to kafka consumer of order service.

Once it consumes the message, eventHandlers takes those message and performs some business logics.

Further, create a file createOrder.js in Controller and add the following code,

1const uuidv1 = require("uuid/v1")
2
3const OrderModel = require("../Model/orderModel")
4const Producer = require("../../../kafkaBroker/kafkaHandler/routes")
5const CreateOrder = async (req, res) => {
6 try {
7 const name = req.body.name
8 const itemCount = req.body.itemCount
9 const amount = req.body.amount
10
11 const order = await new OrderModel({
12 name: name,
13 itemCount: itemCount,
14 transactionId: uuidv1(),
15 status: "PENDING",
16 })
17
18 await order.save()
19
20 res.send(order)
21
22 Producer({
23 topic: "ORDER_CREATION_TRANSACTIONS",
24 type: "ORDER_CREATED",
25 payload: {
26 data: {
27 id: order._id,
28 transactionId: order.transactionId,
29 amount: amount,
30 },
31 },
32 })
33 } catch (e) {
34 console.log(e)
35 }
36}
37module.exports = CreateOrder

Mainly, controller takes the request and insert the data into database. once it does that, it will sends that data to kafka producer by initiating the create order transaction.

Orchestator service

Screenshot 2020 01 17 at 8 44 35 AM

Like said, Main purpose of having orchestator service to orchestrate the command and reply.

Mainly, Everything is a Transaction here. For every transactions, it will orchestrate the status across different services.

Firstly, create a file bootstrap.js and add the following code,

1const Consumer = require("../../kafkaBroker/kafkaHandler/Consumer")
2const Transactions = require("./Transactions")
3try {
4 const consumer = new Consumer()
5
6 consumer.addTopics(["ORCHESTATOR_SERVICE"]).then(() => {
7 consumer.consume(message => {
8 console.log("consumed message", message)
9 Transactions(JSON.parse(message.value))
10 })
11 })
12
13 console.log("Orchestator Started successfully")
14} catch (e) {
15 console.log(`Orchestrator Error ${e}`)
16}

create a file orderCreationTransactions.js and add the following code,

1const Producer = require("../../../kafkaBroker/kafkaHandler/routes")
2
3module.exports = message => {
4 switch (message.type) {
5 case "ORDER_CREATED":
6 Producer({
7 topic: "EXECUTE_PAYMENT",
8 payload: {
9 data: message.payload.data,
10 },
11 })
12 break
13 case "PAYMENT_COMPLETED_STATE":
14 Producer({
15 topic: "",
16 payload: {
17 data: message.payload.data,
18 },
19 })
20 default:
21 break
22 }
23}

Once, it receives the state. orchestator will directs the transaction state to appropriate services.

Payment Service

Once payment service receives a command from orchestator service. it will do the business logic and updates the status to orchestaor servcice.

based on the status, it will perform the further actions to respective services.

Screenshot 2020 01 17 at 9 33 37 AM

add the following code in app.js

1const Consumer = require("../../kafkaBroker/kafkaHandler/Consumer")
2const eventHandler = require("./eventHandler")
3try {
4 const consumer = new Consumer()
5
6 consumer.addTopics(["PAYMENT_SERVICE"]).then(() => {
7 consumer.consume(message => {
8 console.log("consumed message", message)
9 eventHandler(JSON.parse(message.value))
10 })
11 })
12
13 console.log("Payment service Started Successfully")
14} catch (e) {
15 console.log(`Orchestrator Error ${e}`)
16}

It add the topics and when a message is received. it will send it to eventhandler.

executePayment.js

1const Producer = require("../../../kafkaBroker/kafkaHandler/routes")
2module.exports = data => {
3 /** Database Layer Logic Comes Here */
4 try {
5 console.log("data", data)
6 Producer({
7 topic: "ORDER_CREATION_TRANSACTIONS",
8 type: "PAYMENT_COMPLETED_STATE",
9 payload: {
10 transactionId: data.transactionId,
11 },
12 })
13 } catch (e) {
14 console.log(e)
15 }
16}

Summary

In Conclusion, maintaining microservice that implement saga pattern will be a bit complex. but, it is worth to solve the problem using saga pattern.

we will see how to implement Choreography-Based Saga in upcoming article.

Author

Hey, I’m Ganesh, Full stack engineer.I love to write technical content and help developers like me to grow in the industry. please consider supporting me.

To Read More

Building a Production-grade Nodejs,...

This article is the first part of building a production grade nodejs,graphql and...

Modern React Redux Tutorials with R...

This tutorial explain how you can build an application using modern react redux ...

Building a Piano with React Hooks

In this article, we will see how to build a piano with react hooks. Building a P...

TypeScript Basics - The Definitive ...

In this article, we will learn some basics of typescript which helps you to deve...

Here's why podman is more secured t...

In this article we will see about podman and why it is more secured way to run c...

Building a Production - Ready Node....

In this article, we will see how to build a Nodejs, TypeScript Application and d...

Nginx for Front-end Developers

This article is to explain Nginx for Front-end Developers in a much simpler way....

What is gRPC ? How to implement gRP...

Everyone talks about gRPC. Have you ever wonder how it works or how to implement...