# 整合Spring Cloud Stream Binder与Kafka进行消息发送与接收

# 1 简介

之前讲解了Spring Cloud Stream整合RabbitMQ和GCP Pubsub,都是非常简单,而且代码没什么区别的。本文讲解Spring Cloud Stram与Kafka的整合,同样也是非常简单。

之前的文章:

整合Spring Cloud Stream Binder与RabbitMQ进行消息发送与接收 (opens new window)

整合Spring Cloud Stream Binder与GCP Pubsub进行消息发送与接收 (opens new window)

# 2 安装Kafka

为了演示简单,这里只是安装Standalone的版本,而不是集群。先到官网下载安装包,然后进行解压即可使用。

# 解压安装包
$ tar -xzf kafka_2.13-2.8.0.tgz
# 进入目录
$ cd kafka_2.13-2.8.0/
# 启动zookeeper
$ $ bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动kafka(新的命令行终端)
$ bin/kafka-server-start.sh config/server.properties

这里Zookeeper和Kafka都使用了默认配置,所以不用修改了。启动的时候会有日志输出,如果没有报错,说明启动成功了。

# 3 整合

引入相关依赖:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

实现简单的Publisher和Consumer:

package com.pkslow.cloud.stream.binder.kafka;

@SpringBootApplication
public class StreamBinderKafka {
    private static final Logger log = LoggerFactory.getLogger(StreamBinderKafka.class);
    public static void main(String[] args) {
        SpringApplication.run(StreamBinderKafka.class, args);
    }

    @Bean
    public Supplier<String> pkslowSource() {
        return () -> {
            String message = "www.pkslow.com";
            log.info("Sending value: " + message);
            return message;
        };
    }

    @Bean
    public Consumer<String> pkslowSink() {
        return message -> {
            log.info("Received message " + message);
        };
    }
}

配置必要的属性如下:

spring:
  cloud:
    stream:
      function:
        definition: pkslowSource;pkslowSink
      bindings:
        pkslowSource-out-0:
         destination: pkslow-topic
        pkslowSink-in-0:
          destination: pkslow-topic
      poller:
        fixed-delay: 500
      kafka:
        binder:
          brokers: localhost:9092
          auto-create-topics: true
          required-acks: 1

运行日志如下:

# 4 总结

三个MQ的整合下来几乎没有什么区别,也没有太大的代码改动,这就是Spring Cloud Stream给我们带来的便利。

代码请查看:https://github.com/LarryDpk/pkslow-samples


参考资料:

Spring Cloud Stream官方 (opens new window)

Kafka Quickstart (opens new window)

上次更新: 2023/8/18 23:39:36