问题导读: 1. 如何设置配置文件? 2. 如何定义接口,进行channel绑定? 3. 如何编写输出应用? 4. 如何发送消息? 5. 如何接收消息? 6.如何实现与kafka整合? 最近在工作中会使用到spring cloud stream,网上中文资料几乎没有,阅读官网配置,好不容易搞定,在这里分享一下使用过程,也是自己做一个记录。
主页君自己使用场景是实现两个app之间的通信,第一个app输出数据,第二个app输入数据。这里的app也就是指一个service。
1.配置文件中进行绑定
app1中,配置输出通道相关信息。 application.yml 或者 application.properties或者配置文件中,配置 [mw_shl_code=applescript,true]spring:
cloud:
stream:
bindings:
output_channel: #channelName
destination: mydest #destination,或者可以认为是发布-订阅模型里面的topic
binder: rabbit1
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.1.1 #rabbitMQ服务器地址
port: 5672 #rabbitMQ服务器端口
username: username
password: pwd
virtual-host: /hostName [/mw_shl_code]
app2中,配置输入通道相关信息。[mw_shl_code=applescript,true]spring:
cloud:
stream:
bindings:
input_channel: #<span style="font-family: Arial, Helvetica, sans-serif;">position1。</span><span style="font-family: Arial, Helvetica, sans-serif;">channelName.</span>
destination: modest #position2。destination,或者可以认为是发布-订阅模型里面的topic,这里应该与输出app中发布的topic一致,表示订阅该主题
binder: rabbit1
binders:
rabbit1:
type: rabbit #可以是其它,比如kafka
environment:
spring:
rabbitmq:
host: 192.168.1.1 #rabbitMQ服务器地址
port: 5672 #rabbitMQ服务器端口
username: username
password: pwd
virtual-host: /hostName [/mw_shl_code]
2.定义接口,进行channel绑定
官网的例子中,使用的是sink和source接口,如果每个app里面只有一个通道,可以直接使用stream自带的接口,但是如果要实现多个接口,就需要自己进行接口定义了。下面是我自己定义的接口例子,具体用法会在注释中详细说明。
[mw_shl_code=java,true]import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface Barista {
String INPUT_CHANNEL = "input_channel"; #position3
String OUTPUT<span style="font-family: Arial, Helvetica, sans-serif;">_CHANNEL</span> = "output_channel";
String INPUT1 = "input1";
String OUTPUT1 = "output1";
#注解@Input声明了它是一个输入类型的通道,名字是<span style="font-family: Arial, Helvetica, sans-serif;">Barista</span><span style="font-family: Arial, Helvetica, sans-serif;">.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题
</span> @Input(<span style="font-family: Arial, Helvetica, sans-serif;">Barista</span><span style="font-family: Arial, Helvetica, sans-serif;">.INPUT_CHANNEL</span><span style="font-family: Arial, Helvetica, sans-serif;">)</span>
SubscribableChannel logInput();
#注解@Output声明了它是一个输出类型的通道,名字是<span style="font-family: Arial, Helvetica, sans-serif;">output_channel。这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道,类型是output,发布的主题名为mydest。</span>
@Output(Barista.<span style="font-family: Arial, Helvetica, sans-serif;">OUTPUT</span><span style="font-family: Arial, Helvetica, sans-serif;">_CHANNEL</span><span style="font-family: Arial, Helvetica, sans-serif;">)</span>
MessageChannel logOutPut();
@Input(Barista.INPUT1)
SubscribableChannel input1();
@Output(Barista.OUTPUT1)
MessageChannel output1();
} [/mw_shl_code]
这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
3.在输出应用的Application.java类中注入上述Barista接口
[mw_shl_code=java,true]import Barista;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.feign.EnableFeignClients;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.session.data.redis.config.annotation.web.http.EnableRedisHttpSession;
/**
*
*/
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
@EnableRedisHttpSession
@EnableBinding(Barista.class)
public class OutputServiceApplication {
public static void main(String[] args) {
SpringApplication.run(<span style="font-family: Arial, Helvetica, sans-serif;">OutputServiceApplication</span><span style="font-family: Arial, Helvetica, sans-serif;">.class, args);</span>
}
} [/mw_shl_code]
这里需要在Application类中注入接口,否则在引用接口的时候,会提示找不到bean。原因是stream只有找到@EnableBinding注解,才会自动给其中的参数Barista.class创建实例。这是spring cloud stream自带的操作,需要遵守。
4.发送消息
[mw_shl_code=java,true]import Barista;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
@Service
@Component
public class RabbitSender {
@Autowired
private Barista source;
// 发送消息
public String sendMessage(){
try{
Obj obj = new Obj();
source.logOutPut().send(MessageBuilder.withPayload(obj).build());
logger.info("发送");
}catch (Exception e){
e.printStackTrace();
}
return null;
}
} [/mw_shl_code]
source是一个实例化的Barista对象,调用其中的logOutPut()方法实际上是调用了一个MessageChannel,并使用它将消息发送出去。
5.接收消息
在输入应用中注入接口,并声明监听 [mw_shl_code=java,true]import Barista;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
@EnableBinding(Barista.class)
public class RabbitReceiver {
private static final Log logger = LogFactory.getLog(RabbitReceiver.class);
@StreamListener(Barista.LOG_INPUT)
public void receiver(Message<Obj> message){
Obj obj = message.getPayload();
logger.info("接受对象:"+<span style="font-family: Arial, Helvetica, sans-serif;">obj</span><span style="font-family: Arial, Helvetica, sans-serif;">+"\n");</span>
}
} [/mw_shl_code]
首先,在类上添加注解@EnableBinding(Barista.class),实现与消息代理的连接。在方法receiver()上添加注解@StreamListener,并以通道名称作为参数,可以实现对相应通道的监听。
至此,实现了两个app之间进行消息传递。
##################
spring cloud还可以与kafka整合。下面是整合配置
1、在pom.xml里面添加kafka的maven依赖
[mw_shl_code=xml,true]<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>[/mw_shl_code]
2、在properties 配置文件里面添加 kafka binder 参数
[mw_shl_code=bash,true]spring.cloud.stream.kafka.binder.brokers=127.0.0.1:9092
spring.cloud.stream.kafka.binder.zk-nodes=127.0.0.1:2181
spring.cloud.stream.kafka.binder.minPartitionCount=1
spring.cloud.stream.kafka.binder.autoCreateTopics=true
spring.cloud.stream.kafka.binder.autoAddPartitions=true[/mw_shl_code]
3、输入通道定义,供消费者使用
(1)在properties配置文件里面添加输入通道配置信息
[mw_shl_code=bash,true]spring.cloud.stream.bindings.testa.destination=test_spring_stream
spring.cloud.stream.bindings.testa.group=group-1
spring.cloud.stream.bindings.testa.consumer.concurrency=1
spring.cloud.stream.bindings.testa.consumer.partitioned=false [/mw_shl_code]
(2)定义输入通道并绑定输入通道配置信息
[mw_shl_code=java,true]public interface Sink {
//接收队列1
String INPUT_1 = "testa";
@Input(Sink.INPUT_1)
SubscribableChannel input1();
} [/mw_shl_code]
INPUT_1 = "testa" 跟配置文件里面的通道名称 testa 保持一致
4、输出通道定义,供生产者使用
(1)在properties配置文件里面添加输出通道配置信息
[mw_shl_code=bash,true]spring.cloud.stream.bindings.sourceA.destination=test_spring_stream
spring.cloud.stream.bindings.sourceA.producer.partitionCount=1 [/mw_shl_code]
(2)定义输出通道并绑定输出通道配置信息
[mw_shl_code=bash,true]public interface Source {
//发送队列1
String OUTPUT_1 = "sourceA";
@Output(Source.OUTPUT_1)
MessageChannel output1();
} [/mw_shl_code]
OUTPUT_1 = "sourceA" 跟配置文件里面的通道名称 sourceA 保持一致
5、生产者端代码
[mw_shl_code=java,true]@EnableBinding(Source.class)
public class KafkaSender {
private final Logger logger = LoggerFactory.getLogger(KafkaSender.class);
@Autowired
private Source source;
public void sendMessage(String message) {
try {
source.output1().send(MessageBuilder.withPayload("message: " + message).build());
} catch (Exception e) {
logger.info("消息发送失败,原因:"+e);
e.printStackTrace();
}
}
} [/mw_shl_code]
调用sendMessage方法发送消息
6、消费者端代码
[mw_shl_code=java,true]
@EnableBinding(Sink.class)
public class KafkaReceiver {
private final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
@StreamListener(Sink.INPUT_1)
private void receive(String vote) {
logger.info("receive message : " + vote);
}
}[/mw_shl_code]
通过receive方法接收消息
来源:http://blog.csdn.net/phyllisy/article/details/51382018
作者:phyllisy
|