掃二維碼與項(xiàng)目經(jīng)理溝通
我們?cè)谖⑿派?4小時(shí)期待你的聲音
解答本文疑問/技術(shù)咨詢/運(yùn)營(yíng)咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流
開發(fā)中,服務(wù)與服務(wù)之間通信通常會(huì)用到消息中間件,如果我們使用了某一個(gè)MQ,那么消息中間件與我們的系統(tǒng)算是高耦合。將來有一天,要替換成另外的MQ,我們的改動(dòng)就會(huì)比較大。為了解決這個(gè)問題,我們可以使用Spring Cloud Stream 來整合我們的消息中間件,降低耦合度,使服務(wù)可以更多關(guān)注自己的業(yè)務(wù)邏輯等。

今天為大家?guī)硪粋€(gè)人人可實(shí)操的SpringCloudStream集成Kafka的快速入門示例。
SpringCloudStream是一個(gè)構(gòu)建高擴(kuò)展性的事件消息驅(qū)動(dòng)的微服務(wù)框架。簡(jiǎn)單點(diǎn)說就是幫助你操作MQ,可以與底層MQ框架解耦。將來想要替換MQ框架的時(shí)候會(huì)比較容易。
Kafka是一個(gè)分布式發(fā)布 - 訂閱消息系統(tǒng),源于LinkedIn的一個(gè)項(xiàng)目,2011年成為開源Apache項(xiàng)目。
ZooKeeper 是 Apache 軟件基金會(huì)的一個(gè)軟件項(xiàng)目,它為大型分布式計(jì)算提供開源的分布式配置服務(wù)、同步服務(wù)和命名注冊(cè),Kafka的實(shí)現(xiàn)同時(shí)也依賴于zookeeper。
使用Kafka首先需要啟動(dòng)zookeeper,windows中搭建zookeeper也很簡(jiǎn)單。以下幾步即可完成:
將conf文件夾下面的 zoo_sample.cfg 重命名zoo.cfg。并修改其工作目錄dataDir。
bin文件夾下面有zkEnv.cmd有zookeeper相關(guān)的配置,其中就包括JAVA_HOME,所以系統(tǒng)環(huán)境變量需要配置JAVA_HOME,或者直接用Java的路徑來替換。
默認(rèn)啟動(dòng)端口2181為。
正常啟動(dòng)如下:
本地使用kafka同樣也是如下的幾個(gè)步驟:
查看config文件下面的 server.properties配置文件中的zookeeper的配置。
zookeeper.connect=localhost:2181
在bin/windows文件夾下面kafka-run-class.bat文件中有JAVA_HOME的配置,同樣也可以直接改成系統(tǒng)的Java路徑。
# .\bin\windows\kafka-server-start.bat .\config\server.properties
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic test
#test topic的消息生產(chǎn)者
kafka-console-producer.bat --broker-list localhost:9092 --topic test
#test topic的消息消費(fèi)者
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
#test topic的消息消費(fèi)者(從頭消費(fèi))
kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic
kafka啟動(dòng)windows界面如下:
由于我們直接使用Spring Cloud Stream 集成Kafka,官方也已經(jīng)有現(xiàn)成的starter。
org.springframework.cloud
spring-cloud-starter-stream-kafka
2.1.0.RELEASE
spring:
application:
name: shop-server
cloud:
stream:
bindings:
#配置自己定義的通道與哪個(gè)中間件交互
input: #MessageChannel里Input和Output的值
destination: test #目標(biāo)主題 相當(dāng)于kafka的topic
output:
destination: test1 #本例子創(chuàng)建了另外一個(gè)topic (test1)用于區(qū)分不同的功能區(qū)分。
default-binder: kafka #默認(rèn)的binder是kafka
kafka:
binder:
zk-nodes: localhost:2181
bootstrap-servers: localhost:9092 #kafka服務(wù)地址,集群部署的時(shí)候需要配置多個(gè),
consumer:
group-id: consumer1
producer:
key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
client-id: producer1
server:
port: 8100
首先需要定義SubscribableChannel 接口方法使用Input注解。
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
然后簡(jiǎn)單的使用 StreamListener 監(jiān)聽某一通道的消息。
@Service
@EnableBinding(Sink.class)
public class MessageSinkHandler {
@StreamListener(Sink.INPUT)
public void handler(Messagemsg){
System.out.println(" received message : "+msg);
}
}
cloud stream配置中綁定了對(duì)應(yīng)的Kafka topic,如下:
cloud:
stream:
bindings:
#配置自己定義的通道與哪個(gè)中間件交互
input: #SubscribableChannel里Input值
destination: test #目標(biāo)主題
我們使用Kafka console producer 生產(chǎn)消息。
kafka-console-producer.bat --broker-list localhost:9092 --topic test
同時(shí)啟動(dòng)我們的示例SpringBoot項(xiàng)目,使用producer推送幾條消息。
我們同時(shí)啟動(dòng)一個(gè)Kafka console consumer。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
消費(fèi)結(jié)果如下:
Spring Boot 項(xiàng)目消費(fèi)消息如下:
首先需要定義生產(chǎn)者M(jìn)essageChannel,這里會(huì)用到Output注解。
public interface KafkaSource {
String OUTPUT = "output";
@Output(KafkaSource.OUTPUT)
MessageChannel output();
}
使用MessageChannel 發(fā)送消息。
@Component
public class MessageService {
@Autowired
private KafkaSource source;
public Object sendMessage(Object msg) {
source.output().send(MessageBuilder.withPayload(msg).build());
return msg;
}
定義一個(gè)Rest API 來觸發(fā)消息發(fā)送。
@RestController
public class MessageController {
@Autowired
private MessageService messageService;
@GetMapping(value = "/sendMessage/{msg}")
public String sendMessage(@PathVariable("msg") String msg){
messageService.sendMessage("messageService send out : " + msg + LocalDateTime.now());
return "sent message";
}
}
配置中關(guān)于producer的配置如下:
cloud:
stream:
bindings:
input:
destination: test
output:
destination: test1 #目標(biāo)topic
啟動(dòng)SpringBoot App, 并觸發(fā)如下API call。
??http://localhost:8100/sendMessage/JavaNorthProducer??
我們同時(shí)啟動(dòng)一個(gè)Kafka console consumer,這里我們使用另一個(gè)test1 topic。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1
console consumer消費(fèi)消息如下:
本章初步介紹了Spring Cloud Stream 集成Kafka的簡(jiǎn)單示例,實(shí)現(xiàn)了簡(jiǎn)單的發(fā)布-訂閱功能。但是Spring Cloud Stream肯定還有更多的功能,我們后續(xù)還將繼續(xù)深入學(xué)習(xí)更多Stream的功能。
以上示例倉(cāng)庫(kù):https://github.com/javatechnorth/java-study-note/tree/master/kafka
下載鏈接:
??https://dlcdn.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz??
??https://kafka.apache.org/downloads??

我們?cè)谖⑿派?4小時(shí)期待你的聲音
解答本文疑問/技術(shù)咨詢/運(yùn)營(yíng)咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流