1 简介
之前讲解了Spring Cloud Stream整合RabbitMQ和GCP Pubsub,都是非常简单,而且代码没什么区别的。本文讲解Spring Cloud Stram与Kafka的整合,同样也是非常简单。
之前的文章:
《整合Spring Cloud Stream Binder与RabbitMQ进行消息发送与接收》
《整合Spring Cloud Stream Binder与GCP Pubsub进行消息发送与接收》
2 安装Kafka
为了演示简单,这里只是安装Standalone的版本,而不是集群。先到官网下载安装包,然后进行解压即可使用。
# 解压安装包
$ tar -xzf kafka_2.13-2.8.0.tgz
# 进入目录
$ cd kafka_2.13-2.8.0/
# 启动zookeeper
$ $ bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动kafka(新的命令行终端)
$ bin/kafka-server-start.sh config/server.properties
这里Zookeeper和Kafka都使用了默认配置,所以不用修改了。启动的时候会有日志输出,如果没有报错,说明启动成功了。
3 整合
引入相关依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
实现简单的Publisher和Consumer:
package com.pkslow.cloud.stream.binder.kafka;
@SpringBootApplication
public class StreamBinderKafka {
private static final Logger log = LoggerFactory.getLogger(StreamBinderKafka.class);
public static void main(String[] args) {
SpringApplication.run(StreamBinderKafka.class, args);
}
@Bean
public Supplier<String> pkslowSource() {
return () -> {
String message = "www.pkslow.com";
log.info("Sending value: " + message);
return message;
};
}
@Bean
public Consumer<String> pkslowSink() {
return message -> {
log.info("Received message " + message);
};
}
}
配置必要的属性如下:
spring:
cloud:
stream:
function:
definition: pkslowSource;pkslowSink
bindings:
pkslowSource-out-0:
destination: pkslow-topic
pkslowSink-in-0:
destination: pkslow-topic
poller:
fixed-delay: 500
kafka:
binder:
brokers: localhost:9092
auto-create-topics: true
required-acks: 1
运行日志如下:
4 总结
三个MQ的整合下来几乎没有什么区别,也没有太大的代码改动,这就是Spring Cloud Stream给我们带来的便利。
代码请查看:https://github.com/LarryDpk/pkslow-samples
参考资料: