Spring Cloud Stream - Publish Message to RabbitMQ
Spring cloud stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems. It uses the Spring Integration project to provide connectivity to a message broker. It unifies lots of popular messaging platforms behind one easy to use API including RabbitMQ, Apache Kafka, Google PubSub etc..Without this framework, we will be writing lot of boilerplate codes to support different messaging platform, and it will go beyond maintainable.
Spring Cloud Stream relies on the concept of binder to handle the integration with a messaging or event streaming framework. The binders support the core feature of modern messaging/eventing systems, such as Publish/Subscribe, Consumer Groups, Partitioning, Message-driven consumers, Polling based consumers.
Let's see some of the concepts, before we jump into the code
Binder: A binder handles the integration with single framework. This abstraction allows your code to be middleware-neutral, focusing only on core business logic. The application core only needs to interact with a canonical message. So depending upon the messaging system we will have to specify the dependency in this case it is RabbitMQ, so we just need to add the below dependency in the pom.xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
The @EnableBinding configures the application to bind the channels INPUT and OUTPUT defined within the interface processor.
Channel: Represents the communication pipe between messaging-middleware and the application. A channel abstracts the queue that will either publish or consume the message. A channel is always associated with a queue. With this approach, we do not need to use queue name in the application code. So if tomorrow the queue needs to be changed, we don't need to change the application code.
Let's being the coding:
Getting RabbitMQ
It is quite easy to set-up RabbitMQ on your machine. You can follow the official download and installation guide. For this post, I will be using a docker image, to get my RabbitMQ up and running. I will be using a simple command:
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
This will get the official image with management console added. It will also expose ports 15672 for the management console and 5672 for the connection to the RabbitMQ. Now go to the browser and type http://localhost:15672, it will ask you for username and password, the default username and password is guest/guest. Once you logged into the console, you will should see the below screen.
Now using RabbitMQ with Spring Cloud Stream
Create a spring boot application named member-service, make sure that you added the dependency for the cloud stream using the RabbitMQ in the pom.xml.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
Next step is to define channels through which we want to communicate This will simply be an interface that defines ways of obtaining the MessageChannel object needed to send the message. Spring has provided 3 types of channels out of the box:
Sink.class - Consumer binding, input channel
Source.class - Producer binding, output channel
Processor.class - Combination of both Sink and Source, consumes message as well as produce it.
But for this article we will be using the custom channel. For using the default channels, please see here.
Here we defined the output channel named as "memberRegistrationChannel". The @Output annotation identifies an output channel, through which the publisher application publishes the messages to the exchange of the message broker middleware (RabbitMQ).
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface MemberCreationSource {
@Output("memberRegistrationChannel")
MessageChannel memberRegistration();
}
Now the magic happens with the annotation @EnableBinding(MemberCreationSource.class)
We used @EnableBinding and provided the source channel(MemberCreationSource.class), then we autowired the source (MemberCreationSource) to publish message to channel
@Service
@EnableBinding(MemberCreationSource.class)
public class MemberService {
@Autowired
private MemberCreationSource memberCreationSource;
public Member create(Member member) {
String routingKey = "member-created";
Member persistedMember = memberRepository.save(member);
if(persistedMember != null) {
....
....
memberCreationSource.memberRegistration().send(MessageBuilder.withPayload(persistedMember).build());
}
return persistedMember;
}
Finally we specify the properties, Here we specify the RabbitMQ properties and also we associate the channel to the queue to be used.
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.cloud.stream.bindings.memberRegistrationChannel.destination=memberRegistrations
spring.cloud.stream.default.contentType=application/json
The property spring.cloud.stream.bindings.<channel-name>.destination - this is the information from which topic this messages should come from.
Next start the Spring Boot Application, and go to the RabbitMQ console http://localhost:15672. We can see in the Exchanges section, an exchange named memberRegistrations has been created.
We have set everything to publish message. Once the MemberService.create has been invoked, it will send a message (serialized Member object) here. I didn't show the blueprint for the Member object, since it is a simple bean which holds basic member properties.
That's all, we will see, how to consume this message in the next post.
Happy Programming...!!!

Comments
Post a Comment