spring cloud stream升级到4.0版本后,消息收发的方式和以前有较大的差异,经过测试后,总结见下:
一、基本集成
application.yml内容见下:
spring:
profiles:
active: dev
application:
name: iios-keystone-broker-service
#配置中心
cloud:
stream:
rabbit:
bindings:
yypOutput-out-0:
# 生产者配置信息
producer:
# 生产者使用的交换机类型 如果已存在交换机名称,该类型必须与交换机类型一致
exchangeType: direct
# 用于指定 routing key 表达式
routing-key-expression: headers["routeTo"] # 该值表示使用头信息的routeTo字段作为 routing key
queueNameGroupOnly: true
yypInput-in-0:
# 消费者配置信息
consumer:
# 消费者使用的交换机类型 如果已存在交换机名称,该类型必须与交换机类型一致
exchangeType: direct
# 消息确认模式 具体查看AcknowledgeMode
acknowledge-mode: none
#queueNameGroupOnly: true
bindings:
yypOutput-out-0: #通道的名称
destination: yyp-test-001 #要使用的exchange名称
content-type: application/json
default-binder: iios_rabbit
yypInput-in-0: #通道的名称
destination: yyp-test-001 #要使用的exchange名称
#content-type: application/json
default-binder: iios_rabbit
group: yyp-test-001-queue-001 # 要使用的消息队列名称
binders:
iios_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.1.1
port:
1312
username: tiAAAAA
password: gd6pFWQFEQEQFEW
virtual-host: /
发送消息的代码
@GetMapping("/stream/{name}")
public String sendMsg(@PathVariable String name) {
String uuid = UUID.fastUUID().toString();
streamBridge.send("rec-obj", JSONUtil.toJsonStr(new Person(name)));
streamBridge.send("rec-str", uuid);
streamBridge
.send("yypOutput-out-0", MessageBuilder.withPayload("hello world!".getBytes(StandardCharsets.UTF_8))
.setHeader("routeTo", "routingkey-yyp-01").build());
return uuid;
}
接收消息的代码
@Service
@Slf4j
public class RCom {
@Bean
public Consumer<Message<byte[]>> yypInput() throws IOException {
return message -> {
String routingKey = String.valueOf(message.getHeaders().get("amqp_receivedRoutingKey"));
byte[] payload = message.getPayload();
String str = new String(payload, StandardCharsets.UTF_8);
log.info("yypInput Rev:"+str+"routingKey:"+routingKey);
};
}