Implementing a Microservices Application with CQRS (Command Query Responsibiltiy Segregation)

Building scalable microservices using Java and CQRS

Ruvani Jayaweera
Bits and Pieces

--

Have you ever implemented microservices where you needed different data shapes for reads and write?

For example, you might be working on an e-commerce application that uses a completely different write and read data shape:

  1. A query of a product might include a product name, quantity available and the ID.
  2. A command (write) operation might include a product name, quantity available, supplier, best before, manufactured date and a few more attributes, and might need to communicate with different databases.

In these cases, if you’re using a single database for data writes and access, you might need to perform additional computation to get the data necessary to show to the user. This would lead to performance bottlenecks, scalability issues and complex logic making your code unreadable.

This is where the Command Query Responsibility Segregation (CQRS) pattern comes into the picture. CQRS is an architectural pattern that separates the responsibilities of reading data (queries) and writing data (commands) into different components.

Simply put, you’ll be using different models to update information (commands) and to read information (queries) in a system.

If you’d like to directly explore the code for a CQRS implementation, checkout my GitHub repository:

The Need for CQRS

Data operations involve both reading and modifying data, often expressed as CRUD (Create, Read, Update, Delete) tasks.

These tasks are collectively known as CRUDing the data, where application code may not distinctly separate reading and changing data. This straightforward approach is effective when all clients share the same data structure with low contention.

However, this approach becomes inadequate when different clients require diverse views across multiple data sets, when data usage is extensive, and when multiple clients making updates may unintentionally conflict with each other.

For instance, in a microservices architecture, each microservice manages its own data, but a user interface may need to display data from multiple microservices, leading to inefficiencies.

What is the solution?

Come up with a domain model to distinctly handle operations for querying and updating data, allowing them to be managed independently. The CQRS pattern enforces a strict segregation between operations that involve reading data and those that involve updating it. In this pattern, an operation can either read data (representing the “R” in CRUD) or write data (representing the “CUD” in CRUD), but not both.

There are 3 steps involving applying the CQRS.

  1. Divide the read and write APIs
  2. Dividing Read and Write Models
  3. Dividing Read and Write Databases

Let’s deep dive in to those steps.

Step 1: Divide the read and write APIs

The initial and prominently visible action in implementing the CQRS pattern involves the division of the CRUD API into distinct read and write APIs. In this diagram, the existing domain model remains unchanged, but its singular CRUD API is now separated into retrieve and modify APIs.

Dividing the Two APIs:

The implementation of the CQRS pattern involves separating the CRUD API into distinct read and write APIs while sharing the existing domain model. This segregation of behavior results in two key functionalities:

  • Read (Retrieve) API: Utilized to access the current state of objects in the domain model without altering that state.
  • Write (Modify) API: Used to modify objects in the domain model through Create, Update, and Delete (CUD) tasks.

This API separation aligns with the Command Query Separation (CQS) pattern, advocating the clear division of methods that change state from those that do not. The model categorizes each of an object’s methods into two exclusive groups:

  • Query: Returns a result without altering the system’s state or causing side effects.
  • Command: Changes the system’s state, providing an indication of success or failure without returning a value.

Step 2: Dividing Read and Write Models

The next step in implementing the CQRS pattern involves dividing the domain model into distinct read and write models. The diagram illustrates that the domain model forms the foundation for a write model responsible for modifying domain objects, alongside a distinct read model utilized to access the application’s state.

Step 3: Dividing Read and Write Databases

The third step of implementing the CQRS pattern involves splitting the database of record into distinct read and write databases. This comprehensive CQRS pattern solution is depicted in the diagram, illustrating separate databases for the write model and read model.

In this advanced step of CQRS implementation, the write model operates with its dedicated read/write database, while the read model has a separate read-only database. To maintain synchronization between the databases, CQRS incorporates the following design features:

  • Event Bus for publishing update events: Whenever the write database is updated, a change notification is published as an event on the event bus. An event processor for the query database subscribes to these events, updating the query database accordingly and ensuring synchronization.
  • Command Bus for queuing commands: Commands from the modify API can be queued, enhancing throughput and serialization of updates. Clients making updates are not blocked synchronously, and commands are placed on a message queue, processed asynchronously by the write model.

How To Implement Microservices with CQRS

I am going to implement a sample CQRS application with Spring boot and Axon framework.

Axon Framework provides the ability to handle events and commands and save the events in the event store. First thing we are going to do is download and setup the axon serve and use it with the springboot application.

Download the Zip file from here and then follow the given steps to start the server.

Next I am going to create a new springboot application. You can use Spring Initializr and generate a fresh spring boot application. Then you can open it in IntelliJ idea.

For simplicity, let’s implement two microservices, ProductCommandService for handling product commands and ProductQueryService for handling queries.

First I have created the PoductRestModel as below.

package com.cqrs.ProductQueryService.query.api.model;

import lombok.Builder;
import lombok.Data;

import java.math.BigDecimal;

@Data
@Builder
public class ProductRestModel {
private String name;
private BigDecimal price;
private Integer quantity;
}

Then it is required to create the rest endpoint to post the products. Hence creating a ProductCommandController class. Commands are sending via the controller to the command gateway. For that I am using ProductRestModel.

Let me explain what this controller is supposed to do. Inside the controller add postmapping method as addProduct and add create product command within that. Then send this command via the command gateway. Then wait until it is completed. And send the result back. Controller is ready and can be used to send command to comandGateway.

Here is the discussed ProductCommandController class.

package com.cqrs.ProductCommandService.command.api.controller;

import com.cqrs.ProductCommandService.command.api.model.ProductRestModel;
import com.cqrs.ProductCommandService.command.api.commands.CreateProductCommand;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
@RequestMapping("/products")
public class ProductCommandController {

private CommandGateway commandGateway;

public ProductCommandController(CommandGateway commandGateway) {
this.commandGateway = commandGateway;
}

@PostMapping
public String addProduct(@RequestBody ProductRestModel productRestModel) {

CreateProductCommand createProductCommand =
CreateProductCommand.builder()
.productId(UUID.randomUUID().toString())
.name(productRestModel.getName())
.price(productRestModel.getPrice())
.build();
String result = commandGateway.sendAndWait(createProductCommand);
return result;
}
}

Here is the CreateProductCommand class that simply has the attributes of the product and annotated the productID as the TargetAggregateIdentifier.

package com.cqrs.ProductCommandService.command.api.commands;

import lombok.Builder;
import lombok.Data;
import org.axonframework.modelling.command.TargetAggregateIdentifier;

import java.math.BigDecimal;

@Data
@Builder
public class CreateProductCommand {

@TargetAggregateIdentifier
private String productId;
private String name;
private BigDecimal price;
}

The createProductCommand will be going through the command gateway.Then we have the aggregate class to handle the command. It handles each and every command. PoductAggregate class is annotated using @Aggregate. Product id is annotated as @AgetaggregateIdentifier. If there is any validations we can put them under the Command Handler.

Then Create an event sourcing handler injecting productCreatedEvent and annotate with @EventSourcingHandler, Whenever product created event is occurred and this event sourcing handler will handle the data. With that now we can publish the event.

package com.cqrs.ProductCommandService.command.api.aggregate;

import com.cqrs.ProductCommandService.command.api.events.ProductCreatedEvent;
import com.cqrs.ProductCommandService.command.api.commands.CreateProductCommand;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.modelling.command.AggregateLifecycle;
import org.axonframework.spring.stereotype.Aggregate;
import org.springframework.beans.BeanUtils;

import java.math.BigDecimal;

@Aggregate
public class ProductAggregate {

@AggregateIdentifier
private String productId;
private String name;
private BigDecimal price;

@CommandHandler
public ProductAggregate(CreateProductCommand createProductCommand) {
//Add anu validations here
ProductCreatedEvent productCreatedEvent =
new ProductCreatedEvent();

BeanUtils.copyProperties(createProductCommand,productCreatedEvent);

AggregateLifecycle.apply(productCreatedEvent);
}

public ProductAggregate() {
}

@EventSourcingHandler
public void on(ProductCreatedEvent productCreatedEvent) {
this.productId = productCreatedEvent.getProductId();
this.name = productCreatedEvent.getName();
this.price = productCreatedEvent.getPrice();
}
}

Then it is required to create productCreatEvent. These are the events those are stored in the event store.

package com.cqrs.ProductCommandService.command.api.events;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ProductCreatedEvent {

private String productId;
private String name;
private BigDecimal price;
}

Then we have to implement an event handler who is listing to the created events. Whenever a new productCreatEvent is store, event handler is listen to that and consume it. That will then be save to the database using the repository.

package com.cqrs.ProductCommandService.command.api.events;

import com.cqrs.ProductCommandService.command.api.data.Product;
import com.cqrs.ProductCommandService.command.api.data.ProductRepository;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.messaging.interceptors.ExceptionHandler;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Component;

@Component
@ProcessingGroup("product")
public class ProductEventsHandler {

private ProductRepository productRepository;

public ProductEventsHandler(ProductRepository productRepository) {
this.productRepository = productRepository;
}

@EventHandler
public void on(ProductCreatedEvent event) throws Exception {
Product product =
new Product();
BeanUtils.copyProperties(event,product);
productRepository.save(product);
throw new Exception("Exception Occurred");
}

@ExceptionHandler
public void handle(Exception exception) throws Exception {
throw exception;
}
}

In the Axon Server dashboard, we can see the created application.

Once we send a command or the post request using postman we can see the eventidentifier or the productID returns back.

Then we can see the same event via the Axon server dashboard.

You can see more details of the event by clicking on the event as below.

The productQueryService is created to handle the queries. It has the controller defined to handle the query endpoint as below. It also uses the same ProductRestModel.

package com.cqrs.ProductQueryService.query.api.controller;


import com.cqrs.ProductQueryService.query.api.model.ProductRestModel;
import com.cqrs.ProductQueryService.query.api.queries.GetProductsQuery;
import org.axonframework.messaging.responsetypes.ResponseTypes;
import org.axonframework.queryhandling.QueryGateway;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController
@RequestMapping("/products")
public class ProductQueryController {

private QueryGateway queryGateway;

public ProductQueryController(QueryGateway queryGateway) {
this.queryGateway = queryGateway;
}

@GetMapping
public List<ProductRestModel> getAllProducts() {
GetProductsQuery getProductsQuery =
new GetProductsQuery();

List<ProductRestModel> productRestModels =
queryGateway.query(getProductsQuery,
ResponseTypes.multipleInstancesOf(ProductRestModel.class))
.join();

return productRestModels;
}
}

It has a projection class, similar to aggregate class we saw above to handle the events above.

package com.cqrs.ProductQueryService.query.api.projection;


import com.cqrs.ProductQueryService.query.api.data.Product;
import com.cqrs.ProductQueryService.query.api.data.ProductRepository;
import com.cqrs.ProductQueryService.query.api.model.ProductRestModel;
import com.cqrs.ProductQueryService.query.api.queries.GetProductsQuery;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.stream.Collectors;

@Component
public class ProductProjection {

private ProductRepository productRepository;

public ProductProjection(ProductRepository productRepository) {
this.productRepository = productRepository;
}

@QueryHandler
public List<ProductRestModel> handle(GetProductsQuery getProductsQuery) {
List<Product> products =
productRepository.findAll();

List<ProductRestModel> productRestModels =
products.stream()
.map(product -> ProductRestModel
.builder()
.price(product.getPrice())
.name(product.getName())
.build())
.collect(Collectors.toList());

return productRestModels;
}
}

Once we send the Get request via the postman, we can see that in the Axon server as well.

Anyone who is more interested on play around the application above, you can check this git repository.

Conclusion

Implementing a microservices application with Command Query Responsibility Segregation (CQRS) introduces a revolutionary approach to system architecture. By embracing CQRS principles, the system undergoes a strategic division into separate models and databases for reading and writing, presenting unique advantages and challenges.

Thank you for reading!

--

--