1 Intro
It's very easy to integrate the Spring Cloud Stream with Kafka, let's Go!
2 Install Kafka
We installed the standalone Kafka, please download the package from the official page:
$ tar -xzf kafka_2.13-2.8.0.tgz
$ cd kafka_2.13-2.8.0/
# startup the zookeeper
$ $ bin/zookeeper-server-start.sh config/zookeeper.properties
# startup the kafka
$ bin/kafka-server-start.sh config/server.properties
We use the default configuration of Zookeeper and Kafka, so no need to change it. If there's no error log, contiune...
3 Connect to Kafka
Add dependency to maven pom.xml:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
We need to implement the Publisher and Consumer:
package com.pkslow.cloud.stream.binder.kafka;
@SpringBootApplication
public class StreamBinderKafka {
private static final Logger log = LoggerFactory.getLogger(StreamBinderKafka.class);
public static void main(String[] args) {
SpringApplication.run(StreamBinderKafka.class, args);
}
@Bean
public Supplier<String> pkslowSource() {
return () -> {
String message = "www.pkslow.com";
log.info("Sending value: " + message);
return message;
};
}
@Bean
public Consumer<String> pkslowSink() {
return message -> {
log.info("Received message " + message);
};
}
}
Add the config to your yaml file:
spring:
cloud:
stream:
function:
definition: pkslowSource;pkslowSink
bindings:
pkslowSource-out-0:
destination: pkslow-topic
pkslowSink-in-0:
destination: pkslow-topic
poller:
fixed-delay: 500
kafka:
binder:
brokers: localhost:9092
auto-create-topics: true
required-acks: 1
OK, startup your application and the log is here:
4 sum-up
Spring Cloud Stream is so great, quite easy to use with less code.
The sample code can go to: https://github.com/LarryDpk/pkslow-samples
Reference: