In this lab we will transition our knowledge to using Spring Boot as the vehicle to produce messages to our Kafka topics. This will give us a basic understanding of how Spring Boot will autoconfigure, how configurations can be overridden, and how to produce messages with proper key and value structures.
As Josh Long frequently mentions, http://start.spring.io is one of his favorite places on the internet. It's also a good place to start this lab.
You can keep most of the defaults if you want to be sure to select:
- Spring for Apache Kafka
- Spring Boot Actuator
- Spring Reactive Web
- Lombok
NOTE: Do not pick Spring for Apache Kafka Streams
at this time. We'll introduce that later. We want to start with a
basic example of using the base APIs.
Download the .zip
unzip and import the project into your favorite IDE.
A Spring Boot application is generally runnable automatically without additional modification. Attempt to run the project and confirm that you can see information on http://localhost:8080/actuator (if not, review the logs - Boot is usually pretty good at explaining the problem).
If you want to be able to see the complete actuator and health details, update the /resources
folder
application.properties (rename to yml):
# Local configs only - not for production!!!
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
health:
show-details: always
To create the most basic of Producer, add the following code to the Spring Boot application class:
@Bean
ApplicationRunner produce(KafkaTemplate<String, String> kafkaTemplate) {
return event -> {
for (int i = 0; i < 10; i++) {
// send to partition1 without a key
kafkaTemplate.send("topic1", String.format("Testing %d", i));
}
};
}
Review the logs after restarting the application. There will be more info explicitly about the connection to the Kafka cluster.
You may have wondered at this point how the topic partition1
is actually working since we never actually created it.
This is a default behavior in Kafka called auto.create.topics.enable
. We may not want this to happen in production
code, so let's disable it now. Add this to the server.properties
file in the config folder and restart the broker:
auto.create.topics.enable=false
Change the topic in the topic target in the kafkaTemplate.send()
method to topic2
and restart the application.
There should be several repeating errors similar to this:
2021-01-06 15:29:45.756 WARN 113037 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 355 : {topic2=UNKNOWN_TOPIC_OR_PARTITION}
The Kafka Producer API will attempt to retry until the topic exists or just continue to fail. This is likely more desirable in a production setting to prevent accidental runaway topic creation.
We can use Spring Boot to create the topics we want for us using Kafka Admin. Add the following code to the Spring Boot application:
@Bean
public NewTopic topicExample() {
return TopicBuilder.name("topic2")
.partitions(3)
.replicas(1)
.build();
}
This enables the Spring Boot application to create and configure topics essential to itself on start-up. There are some caveats however as it is possible to change configuration and increase the partition count which might impact how message keys get distributed (i.e. if you are expecting messages of the same key to continue to go to the same partition, this change may break that).
So far we have only sent messages to Kafka with an empty Partition key. In these cases, Kafka automatically balances the messages across the available partitions. This is not typically how Kafka messages are saved in Kafka. Introducing a key now will help influence how Kafka will store the messages. By default, Kafka will hash the message key and modulus the resulting value by the number of partitions (i.e. hash(key) % partitions). This will ensure that messages with the same key will end up in the same partition.
This feature is important to the ability for Kafka to scale as well as help preserve message ordering within the context of a partition key.
Modify the Producer code as follows:
kafkaTemplate.send("topic2", String.valueOf(i), String.format("Testing %d", i));
We've simply added a synthetic partition key. If we re-run the program, messages with the key "1" will now always end up in the same partition. This will become useful when we start consuming the messages.
When using Kafka, messages are typically more complex than Strings and need some form of serialization/deserialization. Kafka supports the ability to register customer serdes implementations. Spring Kafka provides a Json converter for Kafka that can be used to convert POJO objects to Json.
First, let's create a simple POJO:
@Data
@AllArgsConstructor
class CustomMessage {
private final String message;
private final LocalDateTime timestamp;
}
Change the main produce() method as well (we need to handle the new CustomMessage type):
@Bean
ApplicationRunner produce(KafkaTemplate<String, CustomMessage> kafkaTemplate) {
return event -> {
for (int i = 0; i < 10; i++) {
kafkaTemplate.send("topic2", String.valueOf(i),
new CustomMessage(String.format("Testing %d", i), LocalDateTime.now()));
}
};
}
Finally, set the key and value serializer in the application.yml
:
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Use the Kafka Consumer CLI to verify the messages you produced:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic2 --from-beginning
You may have noticed that the output of LocalDateTime in CustomMessage is a little odd (e.g.
"timestamp":[2021,1,7,12,52,17,626364000]
).
- Why is that?
- What can you do to change that behavior?
- Can you customize it to produce output like
"timestamp":"2021-01-07T12:48:00.227244"
If you visited the actuator/health
endpoint you may have expected to see a Kafka health check. There isn't one
automatically provided by the current version of Spring Boot.
- Why?
- Can you think of what kinds of things you may want to monitor?
- What about metrics?
Make the Producer more dynamic and introduce a periodic scheduled task to send a new message on a fixed interval (hint: look up @EnableScheduling).