Work queues
- 不需要设置交换机,只需指定唯一的消息队列即可进行消息传递
- 可以有多个消费者,多个消费者通过轮询从队列中取消息
- 消息被接受后,队列将消息移除
- 消费在可以在没有处理完消息的情况下继续获取消息
- 通过设置 spring.rabbitmq.listener.simple.prefetch: 1 来设置每次处理完消息后才能获取下一条

监听队列,自动绑定消息。通过设置休眠来模拟不同的消费能力
@Component public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue") public void listenSimpleQueue01(String msg) throws InterruptedException { System.out.println("消费者01 接收到simple.queue的消息为:" + msg + "," + LocalTime.now()); Thread.sleep(20); }
@RabbitListener(queues = "simple.queue") public void listenSimpleQueue02(String msg) throws InterruptedException { System.err.println("消费者02 接收到simple.queue的消息为:" + msg + "," + LocalTime.now()); Thread.sleep(200); } }
|
@Test public void testSendMessage02() throws InterruptedException { String queueName = "simple.queue"; String message = "hello,spring amqp_"; for (int i = 0; i < 50; i++) { rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }
|
Publish/Subscribe
- 需要设置交换机,并将队列绑定到交换机
- 常见的交换机类型有 fanout、direct、topic
- 可以通过基于配置和基于注解的方式来声明交换机、声明队列、绑定队列到交换机

- fanout:广播模式,路由key 为空
- direct:定义路由 key
- topic:路由key支持通配符
- *:匹配一个
- #:匹配零个或多个
- headers:
- whereAny:匹配任意一个
- whereAll:必须都满足
Config
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
import java.time.LocalTime;
@Configuration public class RabbitMQConfig { @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("heroxin.fanout"); }
@Bean public Queue queue01() { return new Queue("queue01"); }
@Bean public Queue queue02() { return new Queue("queue02"); }
@Bean public Binding fanoutBinding01() { return BindingBuilder.bind(queue01()).to(fanoutExchange()); }
@Bean public Binding fanoutBinding02() { return BindingBuilder.bind(queue02()).to(fanoutExchange()); }
}
|
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitMQConfig { @Bean public DirectExchange directExchange() { return new DirectExchange("heroxin.direct"); }
@Bean public Queue queue01() { return new Queue("queue01"); }
@Bean public Queue queue02() { return new Queue("queue02"); } @Bean public Binding directBinding01() { return BindingBuilder.bind(queue01()).to(directExchange()).with("hero"); }
@Bean public Binding directBinding02() { return BindingBuilder.bind(queue02()).to(directExchange()).with("xin"); } }
|
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitMQConfig {
@Bean public TopicExchange topicExchange() { return new TopicExchange("heroxin.topic"); }
@Bean public Queue queue01() { return new Queue("queue01"); }
@Bean public Queue queue02() { return new Queue("queue02"); }
@Bean public Binding topicBinding01() { return BindingBuilder.bind(queue01()).to(topicExchange()).with("#.hero.#"); }
@Bean public Binding topicBinding02() { return BindingBuilder.bind(queue02()).to(topicExchange()).with("*.xin.*"); }
}
|
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitMQConfig {
@Bean public HeadersExchange headersExchange() { return new HeadersExchange("heroxin.headers"); }
@Bean public Queue queue01() { return new Queue("queue01"); }
@Bean public Queue queue02() { return new Queue("queue02"); }
@Bean public Binding headersBinding01() { HashMap<String, Object> map = new HashMap<>(); map.put("color", "red"); map.put("speed", "low"); return BindingBuilder.bind(queue01()).to(headersExchange()).whereAny(map).match(); }
@Bean public Binding headersBinding02() { HashMap<String, Object> map = new HashMap<>(); map.put("color", "red"); map.put("speed", "fast"); return BindingBuilder.bind(queue02()).to(headersExchange()).whereAll(map).match(); }
}
|
服务提供者
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
@Service @Slf4j public class MQSender { @Autowired private RabbitTemplate rabbitTemplate;
public void sendFanout(Object msg) { rabbitTemplate.convertAndSend("heroxin.fanout", "", msg); } }
|
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
@Service @Slf4j public class MQSender { @Autowired private RabbitTemplate rabbitTemplate;
public void sendDirect01(Object msg) { log.info("发送 [hero] 消息" + msg); rabbitTemplate.convertAndSend("heroxin.direct", "hero", msg); } public void sendDirect02(Object msg) { log.info("发送 [xin] 消息" + msg); rabbitTemplate.convertAndSend("heroxin.direct", "xin", msg); }
}
|
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
@Service @Slf4j public class MQSender { @Autowired private RabbitTemplate rabbitTemplate;
public void sendTopic01(Object msg) { log.info("发送 [#.hero.#] 消息" + msg); rabbitTemplate.convertAndSend("heroxin.topic", "hero.xin", msg); }
public void sendTopic02(Object msg) { log.info("发送 [*.xin.*] 消息" + msg); rabbitTemplate.convertAndSend("heroxin.topic", "hero.xin.heroxin", msg); }
}
|
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
@Service @Slf4j public class MQSender { @Autowired private RabbitTemplate rabbitTemplate;
public void sendHeaders01(String msg) { log.info("发送 [都能接收到] 消息" + msg); MessageProperties properties = new MessageProperties(); properties.setHeader("color", "red"); properties.setHeader("speed", "fast"); Message message = new Message(msg.getBytes(), properties); rabbitTemplate.convertAndSend("heroxin.headers", "", message); }
public void sendHeaders02(String msg) { log.info("发送 [只有队列一接受] 消息" + msg); MessageProperties properties = new MessageProperties(); properties.setHeader("color", "red"); properties.setHeader("speed", "low"); Message message = new Message(msg.getBytes(), properties); rabbitTemplate.convertAndSend("heroxin.headers", "", message); }
}
|
服务消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;
import java.time.LocalTime;
@Service public class MQReceiver {
@RabbitListener(queues = "queue01") public void listenQueue01(String msg) throws InterruptedException { System.out.println("消费者01 接收到queue01的消息为:" + msg + "," + LocalTime.now()); Thread.sleep(200); }
@RabbitListener(queues = "queue02") public void listenQueue02(String msg) throws InterruptedException { System.out.println("消费者02 接收到queue02的消息为:" + msg + "," + LocalTime.now()); Thread.sleep(200); } }
|
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;
import java.time.LocalTime;
@Service public class MQReceiver {
@RabbitListener(queues = "queue01") public void listenQueue01(Message msg) throws InterruptedException { System.out.println("消费者01 接收到queue01的消息为:" + new String(msg.getBody()) + "," + LocalTime.now()); Thread.sleep(200); }
@RabbitListener(queues = "queue02") public void listenQueue02(Message msg) throws InterruptedException { System.out.println("消费者02 接收到queue02的消息为:" + new String(msg.getBody()) + "," + LocalTime.now()); Thread.sleep(200); } }
|
测试
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.ResponseBody;
@Controller @RequestMapping("/mq") public class UserController { @Autowired private MQSender mqSender;
@RequestMapping("/fanout") @ResponseBody public void mqFanout() { mqSender.sendFanout("Heroxin Hello Fei"); }
@RequestMapping("/direct01") @ResponseBody public void mqDirect01() { mqSender.sendDirect01("Heroxin Hello Fei"); }
@RequestMapping("/direct02") @ResponseBody public void mqDirect02() { mqSender.sendDirect02("Heroxin Hello Fei"); }
@RequestMapping("/topic01") @ResponseBody public void mqTopic01() { mqSender.sendTopic01("Heroxin Hello Fei"); }
@RequestMapping("/topic02") @ResponseBody public void mqTopic02() { mqSender.sendTopic02("Heroxin Hello Fei"); }
@RequestMapping("/headers01") @ResponseBody public void mqHeaders01() { mqSender.sendHeaders01("Heroxin Hello Fei"); }
@RequestMapping("/headers02") @ResponseBody public void mqHeaders02() { mqSender.sendHeaders02("Heroxin Hello Fei"); } }
|
补充:基于注解配置
@RabbitListener( bindings = @QueueBinding( value = @Queue(name = "direct.queue01"), exchange = @Exchange(name = "heroxin.direct"), key = {"red", "blue"} ) ) public void listenDirectqueue01(String msg) throws InterruptedException { System.out.println("消费者01 接收到direct.queue的消息为:" + msg + "," + LocalTime.now()); Thread.sleep(200); }
@RabbitListener( bindings = @QueueBinding( value = @Queue(name = "direct.queue02"), exchange = @Exchange(name = "heroxin.direct"), key = {"red", "yellow"} ) ) public void listenDirectqueue02(String msg) throws InterruptedException { System.out.println("消费者02 接收到direct.queue的消息为:" + msg + "," + LocalTime.now()); Thread.sleep(200); }
|