Getting Started with Spring Cloud Stream

Written by mcclain-pivotal | Published 2019/11/29
Tech Story Tags: kafka | stream-processing | event-sourcing | event-streaming | spring-framework | spring-cloud-streaming | microservices | latest-tech-stories

TLDR This post was co-written with Ben Wilcock, Product and Technical Marketing Manager for Spring at Pivotal. We're big fans of both Kafka and RabbitMQ as event streaming platforms, so for this demo we’ll use Kafka. The demo is made up of two Spring microservices, one to produce events and one to consume them. In our fictional scenario, the message producer will create a stream of applications for bank loans, and our processor will check if those applications should be approved or declined.via the TL;DR App

This post was co-written with Ben Wilcock, Product and Technical Marketing Manager for Spring at Pivotal.
đź”” A file has been uploaded! đź””
đź”” A new user was registered! đź””
đź”” An order was placed! đź””
These sound like events that many parts of our application architecture might be interested in, right? For example, when an order is placed on our website, we’ll need a call to process the payment, a call to reserve inventory, and a call to begin the process of picking, packaging and shipping the product. 
For a single order, this isn’t too bad. Our store can make a few requests to these backend services directly and it shouldn’t introduce too much overhead.But what happens if we’re really good at selling our product? Processing 100 orders a second suddenly means our frontend is making three hundred calls per second to our backend services.
If we add one more service to that—say, to report to an internal sales dashboard— now that’s four hundred calls per second. That’s a lot of overhead!
What if instead, we can simply have our website alert our whole architecture at once? It can yell, “Hey! I made a sale” to our whole stack, and any component that’s interested can take the appropriate action. This means we don’t need to update our frontend as we add additional services, and our new services just need to know what to listen for. 

Why Spring Cloud Stream?

The above is an example of an event-driven architecture, where instead of reaching out to each service one by one, our services instead emit a change of state. If a file is uploaded, our file service can emit it out to a messaging platform, and then our Super Duper Image Resizer 3000 service can listen for that and automatically generate differently sized profile images. Pivotal’s own Richard Seroter wrote about this very topic in detail, and it’s a great read. In his blog post, Richard talks about messaging as a way of reliably delivering events to many consumers quickly and in volume.
He also touches on something we want to talk about today: Spring Cloud Stream.
We’re big fans of both Kafka and RabbitMQ as event streaming platforms, so for this demo we’ll use Kafka. No matter which you choose to use, making it easy to produce and consume events is important for your developers. I’ve used a lot of frameworks that abstract away from the underlying message queue, but none quite as easy and flexible as Spring Cloud Stream.
My teammate Ben Wilcock put together a demo that really shows just how easy it is to get up and running. Let’s take it for a spin—and to follow along, you can download the full source code here.

Prepping For The Demo

We only need a couple of things for our demo, which are Docker and Docker Compose, and of course your favorite distribution of the JDK (perhaps even AdoptOpenJDK, which we sponsor). To keep things easy, the demo includes a Docker Compose config that will set up both Kafka and RabbitMQ, though for our purposes we’ll only be using Kafka. We can spin this up with a simple command:
docker-compose up
This will read our docker-compose.yml file, download the necessary container images, run them, and configure them. After just a few moments, Kafka should be up and running and ready to go.

Sending Events

Our demo is made up of two Spring microservices, one to produce events and one to consume them. In our fictional scenario, the message producer will create a stream of applications for bank loans, and our processor will check if those applications should be approved or declined.
Let’s start by producing some messages that will be sent to Kafka, the code for which is in the 
loansource
 directory.
There are a few files of code here. The 
Loan.java
 file defines a 
loan
 object and the 
Statuses.java
 file defines all the states a loan can be in. What’s interesting, though, is the 
LoansourceApplication.java
 file, which is what’s actually producing our messages.
As you can imagine, Spring and its dependencies handle a lot of the wiring up of components for us automatically. Let’s take a look at
LoansourceApplication.java
 to see how this works.
@Bean
public Supplier<Loan> supplyLoan() {
  return () -> {
    String rName = names.get(new Random().nextInt(names.size()));
    Long rAmount = amounts.get(new Random().nextInt(amounts.size()));
    Loan loan = new Loan(UUID.randomUUID().toString(), rName, rAmount);
    log.info("{} {} for ${} for {}", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
    return loan;
  };
}
Supplier<>
 is a Java function data type. Because there is only one 
@Bean 
method that returns this type, Spring Cloud Stream knows exactly what to do next. By default, it will trigger this function once every second and send the result to the default 
MessageChannel
 named 
output
. What’s nice about this function method is that it only contains business logic, so you can test it using your favorite testing methods.
We could use the 
spring.cloud.function.definition
 property in the application.properties file to explicitly declare which function bean we want to be bound to binding destinations, but for cases when you only have a single 
@Bean
 defined, this is not necessary.
Likewise, if we wanted to use a different poller interval, we can use the
spring.integration.poller.fixed-delay 
property in the
application.properties
 file. The only question that remains is, “How does Spring know it’s Kafka we’re writing to?” For that, we take a look at our 
pom.xml
:
<dependency>
       <groupId>org.springframework.cloud</groupId>
       <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
Providing this dependency in our code tells Spring, “I’d like to send these messages to Kafka”. Since our Kafka server is listening on 
localhost
 on the default port, we don’t need to provide any additional configuration in our 
application.properties
 file, but we can of course do so if that’s not the case, providing information such as hostname, port, authentication, etc.
We can run our code and activate the 
kafka
 profile, which we’ve configured to be the profile that includes the Kafka SCS binding, and we should see it start producing messages:
cd loansource
./mvnw package spring-boot:run -DskipTests=true -Pkafka
After a few moments, we’ll see our application start creating new loans and sending them to Kafka:
2019-10-15...LoansourceApplication : PENDING 9eff9b58-e1f1-474d-8f1d-aa4db8dbb75a for $10000000 for Donald
2019-10-15...LoansourceApplication : PENDING d507c06c-81bb-4a98-8f85-38f74af36984 for $100 for Jacinda
2019-10-15...LoansourceApplication : PENDING 19fc86a4-d461-470c-8005-423ce1a258e7 for $100 for Jacinda
2019-10-15...LoansourceApplication : PENDING 33f3756c-ea9b-472f-bad2-73f1647188b1 for $10000 for Vladimir
2019-10-15...LoansourceApplication : PENDING 1625d10f-c1c8-4e75-8fe8-10ce363ef56f for $10000000 for Theresa
If you prefer, you can also see the messages in your browser using KafDrop. Simply point your browser to 
localhost:9000
 and you should see a UI that allows you to look at the messages stored in Kafka:

Receiving Events

We’ve got half of the equation here, but we also need something to consume and process these events. For this, we’ll look in the 
loancheck
 directory. For this half of the demo, our loan checker will observe every application and approve or decline it. If approved, an approval message will be sent to the 
approved
 topic otherwise, a denial message will be sent to the 
declined
 topic.
You can extrapolate from here that other systems down the line could listen for and pick up these messages for further processing. For example, maybe a payout system listens for an approved loan to start processing it.
We’ll see the code here is a little different, just pointing to different topics. We see that in 
LoanCheckApplication.java
, we have the
@EnableBinding(LoanProcessor.class)
 annotation, meaning that all of our definitions for channel bindings are found in the 
LoanProcessor
 class.
In our 
LoanProcessor.java
 file, we’ll see we define the
MessageChannel 
we’re listening on is named output, matching the default topic our producer writes to. Additionally, we define two other MessageChannels that we’ll be writing to, 
approved
 and 
declined
. For each of these, we also define which method to invoke when a message is received on those channels.
@Component
public interface LoanProcessor {
  String APPLICATIONS_IN = "output";
  String APPROVED_OUT = "approved";
  String DECLINED_OUT = "declined";

  @Input(APPLICATIONS_IN)
  SubscribableChannel sourceOfLoanApplications();

  @Output(APPROVED_OUT)
  MessageChannel approved();

  @Output(DECLINED_OUT)
  MessageChannel declined();
}
Finally, we can see how this ties into which method is invoked if we take a look at the 
LoanChecker.java
 file. We’ll see we have a method
checkAndSortLoans
 with the 
@StreamListener
 annotation that matches our Input we defined previously: 
@StreamListener(LoanProcessor.APPLICATIONS_IN)
public void checkAndSortLoans(Loan loan) {
  log.info("{} {} for ${} for {}", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());

  if (loan.getAmount() > MAX_AMOUNT) {
    loan.setStatus(Statuses.DECLINED.name());
    processor.declined().send(message(loan));
  } else {
    loan.setStatus(Statuses.APPROVED.name());
    processor.approved().send(message(loan));
  }
}
We can start this code up much like we did our 
loansource
, by opening up a separate terminal and running the following:
cd loancheck
./mvnw package spring-boot:run -DskipTests=true -Pkafka
After a few moments, we’ll start seeing our pending messages come through and then get sorted into 
approved
 or 
declined
:
2019-10-15...LoanChecker : PENDING 95a887cf-ab5f-48c4-b03b-556675446cfc for $1000 for Kim
2019-10-15...LoanChecker : APPROVED 95a887cf-ab5f-48c4-b03b-556675446cfc for $1000 for Kim
2019-10-15...LoanChecker : PENDING a15f13fe-fc9a-40fb-b6f0-24106a18c0cd for $100000000 for Angela
2019-10-15...LoanChecker : DECLINED a15f13fe-fc9a-40fb-b6f0-24106a18c0cd for $100000000 for Angela

Wrapping Up

Spring Cloud Stream provides an extremely powerful abstraction for potentially complicated messaging platforms, turning the act of producing messages into just a couple lines of code. Should your infrastructure needs change and you need to migrate to a new messaging platform, not a single line of code changes other than your pom file. No matter if you’re using Kafka, RabbitMQ, or a cloud provider’s solution such as GCP Pub/Sub or Azure Event Hub, Spring Cloud Stream means it’s simple and quick to get up and running.

About the Author

Brian is a Principal Product Marketing Manager on the Technical Marketing team at Pivotal, with a focus on technical educational content for Pivotal customers as well as the Cloud Foundry, BOSH, and Knative communities. Prior to Pivotal, Brian worked on both the development and operations of software, with a heavy focus on Cloud Foundry and BOSH at companies in many industries including finance, entertainment and technology. He loves learning and experimenting in many fields of technology, and more importantly sharing the lessons learned along the way.

Written by mcclain-pivotal | Principal Product Marketing Manager on the Technical Marketing team at Pivotal
Published by HackerNoon on 2019/11/29