eBook – Guide Spring Cloud – NPI EA (cat=Spring Cloud)
announcement - icon

Let's get started with a Microservice Architecture with Spring Cloud:

>> Join Pro and download the eBook

eBook – Mockito – NPI EA (tag = Mockito)
announcement - icon

Mocking is an essential part of unit testing, and the Mockito library makes it easy to write clean and intuitive unit tests for your Java code.

Get started with mocking and improve your application tests using our Mockito guide:

Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Reactive – NPI EA (cat=Reactive)
announcement - icon

Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:

>> Join Pro and download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Jackson – NPI EA (cat=Jackson)
announcement - icon

Do JSON right with Jackson

Download the E-book

eBook – HTTP Client – NPI EA (cat=Http Client-Side)
announcement - icon

Get the most out of the Apache HTTP Client

Download the E-book

eBook – Maven – NPI EA (cat = Maven)
announcement - icon

Get Started with Apache Maven:

Download the E-book

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

eBook – RwS – NPI EA (cat=Spring MVC)
announcement - icon

Building a REST API with Spring?

Download the E-book

Course – LS – NPI EA (cat=Jackson)
announcement - icon

Get started with Spring and Spring Boot, through the Learn Spring course:

>> LEARN SPRING
Course – RWSB – NPI EA (cat=REST)
announcement - icon

Explore Spring Boot 3 and Spring 6 in-depth through building a full REST API with the framework:

>> The New “REST With Spring Boot”

Course – LSS – NPI EA (cat=Spring Security)
announcement - icon

Yes, Spring Security can be complex, from the more advanced functionality within the Core to the deep OAuth support in the framework.

I built the security material as two full courses - Core and OAuth, to get practical with these more complex scenarios. We explore when and how to use each feature and code through it on the backing project.

You can explore the course here:

>> Learn Spring Security

Course – LSD – NPI EA (tag=Spring Data JPA)
announcement - icon

Spring Data JPA is a great way to handle the complexity of JPA with the powerful simplicity of Spring Boot.

Get started with Spring Data JPA through the guided reference course:

>> CHECK OUT THE COURSE

Partner – Moderne – NPI EA (cat=Spring Boot)
announcement - icon

Refactor Java code safely — and automatically — with OpenRewrite.

Refactoring big codebases by hand is slow, risky, and easy to put off. That’s where OpenRewrite comes in. The open-source framework for large-scale, automated code transformations helps teams modernize safely and consistently.

Each month, the creators and maintainers of OpenRewrite at Moderne run live, hands-on training sessions — one for newcomers and one for experienced users. You’ll see how recipes work, how to apply them across projects, and how to modernize code with confidence.

Join the next session, bring your questions, and learn how to automate the kind of work that usually eats your sprint time.

Course – LJB – NPI EA (cat = Core Java)
announcement - icon

Code your way through and build up a solid, practical foundation of Java:

>> Learn Java Basics

Partner – LambdaTest – NPI EA (cat= Testing)
announcement - icon

Distributed systems often come with complex challenges such as service-to-service communication, state management, asynchronous messaging, security, and more.

Dapr (Distributed Application Runtime) provides a set of APIs and building blocks to address these challenges, abstracting away infrastructure so we can focus on business logic.

In this tutorial, we'll focus on Dapr's pub/sub API for message brokering. Using its Spring Boot integration, we'll simplify the creation of a loosely coupled, portable, and easily testable pub/sub messaging system:

>> Flexible Pub/Sub Messaging With Spring Boot and Dapr

1. Overview

This tutorial will be an introduction to Apache Storm, a distributed real-time computation system.

We’ll focus on and cover:

  • What exactly is Apache Storm and what problems it solves
  • Its architecture, and
  • How to use it in a project

2. What Is Apache Storm?

Apache Storm is free and open source distributed system for real-time computations.

It provides fault-tolerance, scalability, and guarantees data processing, and is especially good at processing unbounded streams of data. 

Some good use cases for Storm can be processing credit card operations for fraud detection or processing data from smart homes to detect faulty sensors.

Storm allows integration with various databases and queuing systems available on the market.

3. Maven Dependency

Before we use Apache Storm, we need to include the storm-core dependency in our project:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
    <scope>provided</scope>
</dependency>

We should only use the provided scope if we intend to run our application on the Storm cluster.

To run the application locally, we can use a so-called local mode that will simulate the Storm cluster in a local process, in such case we should remove the provided.

4. Data Model

Apache Storm’s data model consists of two elements: tuples and streams.

4.1. Tuple

Tuple is an ordered list of named fields with dynamic types. This means that we don’t need to explicitly declare the types of the fields.

Storm needs to know how to serialize all values that are used in a tuple. By default, it can already serialize primitive types, Strings and byte arrays.

And since Storm uses Kryo serialization, we need to register the serializer using Config to use the custom types. We can do this in one of two ways:

First, we can register the class to serialize using its full name:

Config config = new Config();
config.registerSerialization(User.class);

In such a case, Kryo will serialize the class using FieldSerializerBy default, this will serialize all non-transient fields of the class, both private and public.

Or instead, we can provide both the class to serialize and the serializer we want Storm to use for that class:

Config config = new Config();
config.registerSerialization(User.class, UserSerializer.class);

To create the custom serializer, we need to extend the generic class Serializer that has two methods write and read.

4.2. Stream

Stream is the core abstraction in the Storm ecosystem. The Stream is an unbounded sequence of tuples.

Storms allows processing multiple streams in parallel.

Every stream has an id that is provided and assigned during declaration.

5. Topology

The logic of the real-time Storm application is packaged into the topology. The topology consists of spouts and bolts.

5.1. Spout

Spouts are the sources of the streams. They emit tuples to the topology.

Tuples can be read from various external systems like Kafka, Kestrel or ActiveMQ.

Spouts can be reliable or unreliable. Reliable means that the spout can reply that the tuple that has failed to be processed by Storm. Unreliable means that the spout doesn’t reply since it is going to use a fire-and-forget mechanism to emit the tuples.

To create the custom spout, we need to implement the IRichSpout interface or extend any class that already implements the interface, for example, an abstract BaseRichSpout class.

Let’s create an unreliable spout:

public class RandomIntSpout extends BaseRichSpout {

    private Random random;
    private SpoutOutputCollector outputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext,
      SpoutOutputCollector spoutOutputCollector) {
        random = new Random();
        outputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp"));
    }
}

Our custom RandomIntSpout will generate random integer and timestamp every second.

5.2. Bolt

Bolts process tuples in the stream. They can perform various operations like filtering, aggregations or custom functions.

Some operations require multiple steps, and thus we will need to use multiple bolts in such cases.

To create the custom Bolt, we need to implement IRichBolt or for simpler operations IBasicBolt interface.

There are also multiple helper classes available for implementing Bolt. In this case, we’ll use BaseBasicBolt:

public class PrintingBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        System.out.println(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

This custom PrintingBolt will simply print all tuples to the console.

6. Creating a Simple Topology

Let’s put these ideas together into a simple topology. Our topology will have one spout and three bolts.

6.1. RandomNumberSpout

In the beginning, we’ll create an unreliable spout. It will generate random integers from the range (0,100) every second:

public class RandomNumberSpout extends BaseRichSpout {
    private Random random;
    private SpoutOutputCollector collector;

    @Override
    public void open(Map map, TopologyContext topologyContext, 
      SpoutOutputCollector spoutOutputCollector) {
        random = new Random();
        collector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        int operation = random.nextInt(101);
        long timestamp = System.currentTimeMillis();

        Values values = new Values(operation, timestamp);
        collector.emit(values);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
    }
}

6.2. FilteringBolt

Next, we’ll create a bolt that will filter out all elements with operation equal to 0:

public class FilteringBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        int operation = tuple.getIntegerByField("operation");
        if (operation > 0) {
            basicOutputCollector.emit(tuple.getValues());
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
    }
}

6.3. AggregatingBolt

Next, let’s create a more complicated Bolt that will aggregate all positive operations from each day.

For this purpose, we’ll use a specific class created especially for implementing bolts that operate on windows instead of operating on single tuples: BaseWindowedBolt.

Windows are an essential concept in stream processing, splitting the infinite streams into finite chunks. We can then apply computations to each chunk. There are generally two types of windows:

Time windows are used to group elements from a given time period using timestamps. Time windows may have a different number of elements.

Count windows are used to create windows with a defined size. In such a case, all windows will have the same size and the window will not be emitted if there are fewer elements than the defined size.

Our AggregatingBolt will generate the sum of all positive operations from a time window along with its beginning and end timestamps:

public class AggregatingBolt extends BaseWindowedBolt {
    private OutputCollector outputCollector;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.outputCollector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp"));
    }

    @Override
    public void execute(TupleWindow tupleWindow) {
        List<Tuple> tuples = tupleWindow.get();
        tuples.sort(Comparator.comparing(this::getTimestamp));

        int sumOfOperations = tuples.stream()
          .mapToInt(tuple -> tuple.getIntegerByField("operation"))
          .sum();
        Long beginningTimestamp = getTimestamp(tuples.get(0));
        Long endTimestamp = getTimestamp(tuples.get(tuples.size() - 1));

        Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp);
        outputCollector.emit(values);
    }

    private Long getTimestamp(Tuple tuple) {
        return tuple.getLongByField("timestamp");
    }
}

Note that, in this case, getting the first element of the list directly is safe. That’s because each window is calculated using the timestamp field of the Tuple, so there has to be at least one element in each window.

6.4. FileWritingBolt

Finally, we’ll create a bolt that will take all elements with sumOfOperations greater than 2000, serialize them and write them to the file:

public class FileWritingBolt extends BaseRichBolt {
    public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class);
    private BufferedWriter writer;
    private String filePath;
    private ObjectMapper objectMapper;

    @Override
    public void cleanup() {
        try {
            writer.close();
        } catch (IOException e) {
            logger.error("Failed to close writer!");
        }
    }

    @Override
    public void prepare(Map map, TopologyContext topologyContext, 
      OutputCollector outputCollector) {
        objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
        
        try {
            writer = new BufferedWriter(new FileWriter(filePath));
        } catch (IOException e) {
            logger.error("Failed to open a file for writing.", e);
        }
    }

    @Override
    public void execute(Tuple tuple) {
        int sumOfOperations = tuple.getIntegerByField("sumOfOperations");
        long beginningTimestamp = tuple.getLongByField("beginningTimestamp");
        long endTimestamp = tuple.getLongByField("endTimestamp");

        if (sumOfOperations > 2000) {
            AggregatedWindow aggregatedWindow = new AggregatedWindow(
                sumOfOperations, beginningTimestamp, endTimestamp);
            try {
                writer.write(objectMapper.writeValueAsString(aggregatedWindow));
                writer.newLine();
                writer.flush();
            } catch (IOException e) {
                logger.error("Failed to write data to file.", e);
            }
        }
    }
    
    // public constructor and other methods
}

Note that we don’t need to declare the output as this will be the last bolt in our topology

6.5. Running the Topology

Finally, we can pull everything together and run our topology:

public static void runTopology() {
    TopologyBuilder builder = new TopologyBuilder();

    Spout random = new RandomNumberSpout();
    builder.setSpout("randomNumberSpout");

    Bolt filtering = new FilteringBolt();
    builder.setBolt("filteringBolt", filtering)
      .shuffleGrouping("randomNumberSpout");

    Bolt aggregating = new AggregatingBolt()
      .withTimestampField("timestamp")
      .withLag(BaseWindowedBolt.Duration.seconds(1))
      .withWindow(BaseWindowedBolt.Duration.seconds(5));
    builder.setBolt("aggregatingBolt", aggregating)
      .shuffleGrouping("filteringBolt"); 
      
    String filePath = "./src/main/resources/data.txt";
    Bolt file = new FileWritingBolt(filePath);
    builder.setBolt("fileBolt", file)
      .shuffleGrouping("aggregatingBolt");

    Config config = new Config();
    config.setDebug(false);
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("Test", config, builder.createTopology());
}

To make the data flow through each piece in the topology, we need to indicate how to connect them. shuffleGroup allows us to state that data for filteringBolt will be coming from randomNumberSpout.

For each Bolt, we need to add shuffleGroup which defines the source of elements for this bolt.  The source of elements may be a Spout or another Bolt. And if we set the same source for more than one boltthe source will emit all elements to each of them.

In this case, our topology will use the LocalCluster to run the job locally.

7. Conclusion

In this tutorial, we introduced Apache Storm, a distributed real-time computation system. We created a spout, some bolts, and pulled them together into a complete topology.

The code backing this article is available on GitHub. Once you're logged in as a Baeldung Pro Member, start learning and coding on the project.

 

Baeldung Pro – NPI EA (cat = Baeldung)
announcement - icon

Baeldung Pro comes with both absolutely No-Ads as well as finally with Dark Mode, for a clean learning experience:

>> Explore a clean Baeldung

Once the early-adopter seats are all used, the price will go up and stay at $33/year.

eBook – HTTP Client – NPI EA (cat=HTTP Client-Side)
announcement - icon

The Apache HTTP Client is a very robust library, suitable for both simple and advanced use cases when testing HTTP endpoints. Check out our guide covering basic request and response handling, as well as security, cookies, timeouts, and more:

>> Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

Course – LS – NPI EA (cat=REST)

announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

Partner – Moderne – NPI EA (tag=Refactoring)
announcement - icon

Modern Java teams move fast — but codebases don’t always keep up. Frameworks change, dependencies drift, and tech debt builds until it starts to drag on delivery. OpenRewrite was built to fix that: an open-source refactoring engine that automates repetitive code changes while keeping developer intent intact.

The monthly training series, led by the creators and maintainers of OpenRewrite at Moderne, walks through real-world migrations and modernization patterns. Whether you’re new to recipes or ready to write your own, you’ll learn practical ways to refactor safely and at scale.

If you’ve ever wished refactoring felt as natural — and as fast — as writing code, this is a good place to start.

eBook Jackson – NPI EA – 3 (cat = Jackson)