RabbitMQ的核心组成部分超详细

RabbitMQ的核心组成部分

核心概念: Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server

Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手

Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。

Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。

Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange

Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)

Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.

Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。

Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。

RabbitMQ的运行流程

RabbitMQ支持消息的模式

参考rabbitmq官网:https://www.rabbitmq.com/getstarted.html

1. 简单模式功能:

功能:一个生产者P发送消息到队列Q,一个消费者C接收 生产者实现思路: 创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel向队列中发送消息,关闭通道和连接。

生产者

public class Producer {

public static void main(String[] args) {

//所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp

// 1:创建连接工程

ConnectionFactory connectionFactory=new ConnectionFactory();

//设置ip

connectionFactory.setHost("139.196.122.115");

//设置默认端口

connectionFactory.setPort(5672);

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

//消息发送地址 根目录下

connectionFactory.setVirtualHost("/");

Connection connection=null;

Channel channel=null;

try {

// 2:创建连接获取通道Connection Rabbitmq为什么是基于channel去处理而不是connection ?长连接 ----信道channel 在高并发的情况,会创建多个通道(高性能)

connection=connectionFactory.newConnection("生产者");

// 3:通过连接获取通道Channel

channel=connection.createChannel();

// 4:通过通道创建交换机,生命队列,绑定关系,路由key,发送消息,和接收消息

//设置队列名称

String queueName="queue1";

/*

* @params1 队列的名称

* @params2 是否持久化durable=false 所谓持久化消息是否存盘,如果false非持久化,true是持久化

* @params3 是否排他性,是否是独占队列

* @params4 是否真的删除,随着最后一个消费者消息完毕以后是否把队列自动删除

* @params5 携带附属参数

*/

channel.queueDeclare(queueName,false,false,false,null);

// 5:准备消息内容

String message="Hello World ";

// 6:发送消息给队列queue

/*

* @param1:交换机 @param2:队列,路由key @params3 消息的状态控制,@params4消息主题

* 面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机

*/

channel.basicPublish("",queueName,null,message.getBytes());

} catch (IOException e) {

e.printStackTrace();

} catch (TimeoutException e) {

e.printStackTrace();

}finally {

// 7:关闭连接

if(channel!=null&& channel.isOpen()){

try {

channel.close();

} catch (IOException e) {

e.printStackTrace();

} catch (TimeoutException e) {

e.printStackTrace();

}

}

// 8:关闭通道

if(connection!=null&& connection.isOpen()){

try {

connection.close();

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

消费者实现思路

创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue, 创建消费者并监听队列,从队列中读取消息。

消费者

package com.newer.rabbitmq.simple;

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Consumer {

public static void main(String[] args) {

//所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp

// 1:创建连接工程

ConnectionFactory connectionFactory=new ConnectionFactory();

//设置ip

connectionFactory.setHost("139.196.122.115");

//设置默认端口

connectionFactory.setPort(5672);

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

//消息发送地址 根目录下

connectionFactory.setVirtualHost("/");

Connection connection=null;

Channel channel=null;

try {

// 2:创建连接获取通道Connection

connection=connectionFactory.newConnection("生产者");

// 3:通过连接获取通道Channel

channel=connection.createChannel();

// 4:通过通道创建交换机,生命队列,绑定关系,路由key,发送消息,和接收消息

channel.basicConsume("queue1", true, new DeliverCallback() {

public void handle(String s, Delivery message) throws IOException {

System.out.println("收到消息是" + new String(message.getBody(), "UTF-8"));

}

}, new CancelCallback() {

public void handle(String s) throws IOException {

System.out.println("接收失败了...");

}

});

System.out.println("开始接收消息");

//对消息进行阻断不在往下运行

System.in.read();

} catch (IOException e) {

e.printStackTrace();

} catch (TimeoutException e) {

e.printStackTrace();

}finally {

// 7:关闭连接

if(channel!=null&& channel.isOpen()){

try {

channel.close();

} catch (IOException e) {

e.printStackTrace();

} catch (TimeoutException e) {

e.printStackTrace();

}

}

// 8:关闭通道

// 7:关闭连接

if(connection!=null&& connection.isOpen()){

try {

connection.close();

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

}

2. 工作队列模式Work Queue

功能:一个生产者,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列

任务队列:避免立即做一个资源密集型任务,必须等待它完成,而是把这个任务安排到稍后再做。我们将任务封装为消息并将其发送给队列。后台运行的工作进程将弹出任务并最终执行作业。当有多个worker同时运行时,任务将在它们之间共享。

生产者实现思路:

创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel向队列中发送消息,2条消息之间间隔一定时间,关闭通道和连接。

消费者实现思路:

创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,创建消费者C1并监听队列,获取消息并暂停10ms,另外一个消费者C2暂停1000ms,由于消费者C1消费速度快,所以C1可以执行更多的任务。 当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢? 主要有两种模式: 1、轮询模式的分发:一个消费者一条,按均分配; 2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;

轮询模式

生产者

package com.newer.rabbitmq.work.lunxun;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

/**

* @author: 学相伴-飞哥

* @description: Producer 简单队列生产者

* @Date : 2021/3/2

*/

public class Producer {

public static void main(String[] args) {

// 1: 创建连接工厂

ConnectionFactory connectionFactory = new ConnectionFactory();

// 2: 设置连接属性

connectionFactory.setHost("139.196.122.115");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("/");

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

Connection connection = null;

Channel channel = null;

try {

// 3: 从连接工厂中获取连接

connection = connectionFactory.newConnection("生产者");

// 4: 从连接中获取通道channel

channel = connection.createChannel();

// 6: 准备发送消息的内容

//===============================end topic模式==================================

for (int i = 1; i <= 20; i++) {

//消息的内容

String msg = "学相伴:" + i;

// 7: 发送消息给中间件rabbitmq-server

// @params1: 交换机exchange

// @params2: 队列名称/routingkey

// @params3: 属性配置

// @params4: 发送消息的内容

channel.basicPublish("", "queue5", null, msg.getBytes());

}

System.out.println("消息发送成功!");

} catch (Exception ex) {

ex.printStackTrace();

System.out.println("发送消息出现异常...");

} finally {

// 7: 释放连接关闭通道

if (channel != null && channel.isOpen()) {

try {

channel.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

if (connection != null) {

try {

connection.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

}

}

消费者1

package com.newer.rabbitmq.work.lunxun;

import com.rabbitmq.client.*;

import java.io.IOException;

/**

* @author:楚风

* @description: Consumer

* @Date : 2021/3/2

*/

public class Work1 {

public static void main(String[] args) {

// 1: 创建连接工厂

ConnectionFactory connectionFactory = new ConnectionFactory();

// 2: 设置连接属性

connectionFactory.setHost("139.196.122.115");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("/");

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

Connection connection = null;

Channel channel = null;

try {

// 3: 从连接工厂中获取连接

connection = connectionFactory.newConnection("消费者-Work1");

// 4: 从连接中获取通道channel

channel = connection.createChannel();

// 5: 申明队列queue存储消息

/*

* 如果队列不存在,则会创建

* Rabbitmq不允许创建两个相同的队列名称,否则会报错。

*

* @params1: queue 队列的名称

* @params2: durable 队列是否持久化

* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭

* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。

* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。

* */

// 这里如果queue已经被创建过一次了,可以不需要定义

// channel.queueDeclare("queue1", false, false, false, null);

// 同一时刻,服务器只会推送一条消息给消费者

// 6: 定义接受消息的回调

Channel finalChannel = channel;

// finalChannel.basicQos(1);

finalChannel.basicConsume("queue5", false, new DeliverCallback() {

@Override

public void handle(String s, Delivery delivery) throws IOException {

try{

System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));

Thread.sleep(2000);

}catch(Exception ex){

ex.printStackTrace();

}

}

}, new CancelCallback() {

@Override

public void handle(String s) throws IOException {

}

});

System.out.println("Work1-开始接受消息");

System.in.read();

} catch (Exception ex) {

ex.printStackTrace();

System.out.println("发送消息出现异常...");

} finally {

// 7: 释放连接关闭通道

if (channel != null && channel.isOpen()) {

try {

channel.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

if (connection != null && connection.isOpen()) {

try {

connection.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

}

}

消费者2

package com.newer.rabbitmq.work.lunxun;

import com.rabbitmq.client.*;

import java.io.IOException;

/**

* @author: 楚风

* @description: Consumer

* @Date : 2021/3/2

*/

public class Work2 {

public static void main(String[] args) {

// 1: 创建连接工厂

ConnectionFactory connectionFactory = new ConnectionFactory();

// 2: 设置连接属性

connectionFactory.setHost("139.196.122.115");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("/");

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

Connection connection = null;

Channel channel = null;

try {

// 3: 从连接工厂中获取连接

connection = connectionFactory.newConnection("消费者-Work2");

// 4: 从连接中获取通道channel

channel = connection.createChannel();

// 5: 申明队列queue存储消息

/*

* 如果队列不存在,则会创建

* Rabbitmq不允许创建两个相同的队列名称,否则会报错。

*

* @params1: queue 队列的名称

* @params2: durable 队列是否持久化

* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭

* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。

* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。

* */

// 这里如果queue已经被创建过一次了,可以不需要定义

//channel.queueDeclare("queue1", false, true, false, null);

// 同一时刻,服务器只会推送一条消息给消费者

//channel.basicQos(1);

// 6: 定义接受消息的回调

Channel finalChannel = channel;

// finalChannel.basicQos(1);

finalChannel.basicConsume("queue5", false, new DeliverCallback() {

@Override

public void handle(String s, Delivery delivery) throws IOException {

try{

System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));

Thread.sleep(200);

}catch(Exception ex){

ex.printStackTrace();

}

}

}, new CancelCallback() {

@Override

public void handle(String s) throws IOException {

}

});

System.out.println("Work2-开始接受消息");

System.in.read();

} catch (Exception ex) {

ex.printStackTrace();

System.out.println("发送消息出现异常...");

} finally {

// 7: 释放连接关闭通道

if (channel != null && channel.isOpen()) {

try {

channel.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

if (connection != null && connection.isOpen()) {

try {

connection.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

}

}

小结:work1和work2的消息处理能力不同,但是最后处理的消息条数相同,是“按均分配”。

公共分发模式

生产者

package com.newer.rabbitmq.work.fair;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

/**

* @author: 楚风

* @description: Producer 简单队列生产者

* @Date : 2021/3/2

*/

public class Producer {

public static void main(String[] args) {

// 1: 创建连接工厂

ConnectionFactory connectionFactory = new ConnectionFactory();

// 2: 设置连接属性

connectionFactory.setHost("139.196.122.115");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("/");

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

Connection connection = null;

Channel channel = null;

try {

// 3: 从连接工厂中获取连接

connection = connectionFactory.newConnection("生产者");

// 4: 从连接中获取通道channel

channel = connection.createChannel();

// 6: 准备发送消息的内容

//===============================end topic模式==================================

for (int i = 1; i <= 20; i++) {

//消息的内容

String msg = "楚风:" + i;

// 7: 发送消息给中间件rabbitmq-server

// @params1: 交换机exchange

// @params2: 队列名称/routingkey

// @params3: 属性配置

// @params4: 发送消息的内容

channel.basicPublish("", "queue5", null, msg.getBytes());

}

System.out.println("消息发送成功!");

} catch (Exception ex) {

ex.printStackTrace();

System.out.println("发送消息出现异常...");

} finally {

// 7: 释放连接关闭通道

if (channel != null && channel.isOpen()) {

try {

channel.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

if (connection != null) {

try {

connection.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

}

}

消费者1

package com.newer.rabbitmq.work.fair;

import com.rabbitmq.client.*;

import java.io.IOException;

/**

* @author: 楚风

* @description: Consumer

* @Date : 2021/3/2

*/

public class Work1 {

public static void main(String[] args) {

// 1: 创建连接工厂

ConnectionFactory connectionFactory = new ConnectionFactory();

// 2: 设置连接属性

connectionFactory.setHost("139.196.122.115");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("/");

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

Connection connection = null;

Channel channel = null;

try {

// 3: 从连接工厂中获取连接

connection = connectionFactory.newConnection("消费者-Work1");

// 4: 从连接中获取通道channel

channel = connection.createChannel();

// 5: 申明队列queue存储消息

/*

* 如果队列不存在,则会创建

* Rabbitmq不允许创建两个相同的队列名称,否则会报错。

*

* @params1: queue 队列的名称

* @params2: durable 队列是否持久化

* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭

* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。

* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。

* */

// 这里如果queue已经被创建过一次了,可以不需要定义

// channel.queueDeclare("queue1", false, false, false, null);

// 同一时刻,服务器只会推送一条消息给消费者

// 6: 定义接受消息的回调

Channel finalChannel = channel;

//公平分发指标一定要定义出来 速度快的消费者会多执行 每次执行一次 根据磁盘Cpu内存去决定

finalChannel.basicQos(1);

//公平分发必须手动应答

finalChannel.basicConsume("queue5", false, new DeliverCallback() {

@Override

public void handle(String s, Delivery delivery) throws IOException {

try{

System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));

Thread.sleep(2000);

finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

}catch(Exception ex){

ex.printStackTrace();

}

}

}, new CancelCallback() {

@Override

public void handle(String s) throws IOException {

}

});

System.out.println("Work1-开始接受消息");

System.in.read();

} catch (Exception ex) {

ex.printStackTrace();

System.out.println("发送消息出现异常...");

} finally {

// 7: 释放连接关闭通道

if (channel != null && channel.isOpen()) {

try {

channel.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

if (connection != null && connection.isOpen()) {

try {

connection.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

}

}

消费者2

package com.newer.rabbitmq.work.fair;

import com.rabbitmq.client.*;

import java.io.IOException;

/**

* @author: 楚风

* @description: Consumer

* @Date : 2021/3/2

*/

public class Work2 {

public static void main(String[] args) {

// 1: 创建连接工厂

ConnectionFactory connectionFactory = new ConnectionFactory();

// 2: 设置连接属性

connectionFactory.setHost("139.196.122.115");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("/");

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

Connection connection = null;

Channel channel = null;

try {

// 3: 从连接工厂中获取连接

connection = connectionFactory.newConnection("消费者-Work2");

// 4: 从连接中获取通道channel

channel = connection.createChannel();

// 5: 申明队列queue存储消息

/*

* 如果队列不存在,则会创建

* Rabbitmq不允许创建两个相同的队列名称,否则会报错。

*

* @params1: queue 队列的名称

* @params2: durable 队列是否持久化

* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭

* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。

* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。

* */

// 这里如果queue已经被创建过一次了,可以不需要定义

//channel.queueDeclare("queue1", false, true, false, null);

// 同一时刻,服务器只会推送一条消息给消费者

//channel.basicQos(1);

// 6: 定义接受消息的回调

Channel finalChannel = channel;

finalChannel.basicQos(1);

finalChannel.basicConsume("queue5", false, new DeliverCallback() {

@Override

public void handle(String s, Delivery delivery) throws IOException {

try{

System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));

Thread.sleep(200);

finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

}catch(Exception ex){

ex.printStackTrace();

}

}

}, new CancelCallback() {

@Override

public void handle(String s) throws IOException {

}

});

System.out.println("Work2-开始接受消息");

System.in.read();

} catch (Exception ex) {

ex.printStackTrace();

System.out.println("发送消息出现异常...");

} finally {

// 7: 释放连接关闭通道

if (channel != null && channel.isOpen()) {

try {

channel.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

if (connection != null && connection.isOpen()) {

try {

connection.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

}

}

小结: 从结果可以看到,消费者1在相同时间内,处理了更多的消息;以上代码我们实现了公平分发模式; 消费者一次接收一条消息,代码channel.BasicQos(0, 1, false); 公平分发需要消费者开启手动应答,关闭自动应答 关闭自动应答代码channel.BasicConsume(“queue_test”, false, consumer); 消费者开启手动应答代码:channel.BasicAck(ea.DeliveryTag, false);

总结

(1)当队列里消息较多时,我们通常会开启多个消费者处理消息;公平分发和轮询分发都是我们经常使用的模式。

(2)轮询分发的主要思想是“按均分配”,不考虑消费者的处理能力,所有消费者均分;这种情况下,处理能力弱的服务器,一直都在处理消息,而处理能力强的服务器,在处理完消息后,处于空闲状态;

(3) 公平分发的主要思想是”能者多劳”,按需分配,能力强的干的多。

3. 发布/订阅模式Publish/Subscribe

功能:一个生产者发送的消息会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者

生产者:可以将消息发送到队列或者是交换机。

消费者:只能从队列中获取消息。

如果消息发送到没有队列绑定的交换机上,那么消息将丢失。

交换机不能存储消息,消息存储在队列中 我们可以通过两种方式实现图形化界面实现和代码实现 通过图形化界面实现 1.添加一个新的交换机,类型选择fanout 2.添加新的队列,因为需要多个队列进行操作 3.将队列跟交换机进行绑定 绑定成功,多绑定几个队列进行操作 4.发送消息

小结:只要消费者订阅了交换机,发送消息时所有订阅的消费者都能收到,这就是fanout模式

应用场景:微信,短信,发送邮件

生产者实现思路:

创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel创建交换机并指定交换机类型为fanout,使用通道向交换机发送消息,关闭通道和连接。

package com.newer.rabbitmq.all;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

/**

* @Producer:订阅与发布模式生产者

*/

public class Producer {

public static void main(String[] args) {

//1.创建工厂

ConnectionFactory connectionFactory=new ConnectionFactory();

//2.设置连接属性

//设置ip地址

connectionFactory.setHost("139.196.122.115");

//设置端口号

connectionFactory.setPort(5672);

//设置根目录

connectionFactory.setVirtualHost("/");

//账号密码

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

Connection connection=null;

Channel channel=null;

try {

//3.从连接工厂获取连接

connection=connectionFactory.newConnection("生产者");

//4.从连接中获取通道channel

channel=connection.createChannel();

//5.准备发送消息的内容

String message="hello,MC,chu Feng";

//交换机的名字 如果在图形化界面已经创建好了记得复制

String exchangeName="fanout-Exchanges";

//由于是发布与订阅模式,所以routingkey绑定也没有意义,所以给一个空的路由key

String routingkey="";

//6.发送消息给中间件rabbitmq-server

// @params1: 交换机exchange

// @params2: 队列名称/routingkey

// @params3: 属性配置

// @params4: 发送消息的内容

channel.basicPublish(exchangeName,routingkey,null,message.getBytes());

System.out.println("消息发送成功");

} catch (IOException e) {

e.printStackTrace();

} catch (TimeoutException e) {

e.printStackTrace();

}finally {

// 7: 释放连接关闭通道

if (channel != null && channel.isOpen()) {

try {

channel.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

if (connection != null) {

try {

connection.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

}

}

消费者实现思路:

创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,绑定队列到交换机,设置Qos=1,创建消费者并监听队列,使用手动方式返回完成。可以有多个队列绑定到交换机,多个消费者进行监听。

package com.newer.rabbitmq.all;

import com.rabbitmq.client.*;

import java.io.IOException;

/**

* Consumer:发布与订阅模式消费者

*/

public class Consumer {

private static Runnable runnable = () -> {

// 1: 创建连接工厂

ConnectionFactory connectionFactory = new ConnectionFactory();

// 2: 设置连接属性

connectionFactory.setHost("139.196.122.115");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("/");

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

//获取队列的名称

final String queueName = Thread.currentThread().getName();

Connection connection = null;

Channel channel = null;

try {

// 3: 从连接工厂中获取连接

connection = connectionFactory.newConnection("生产者");

// 4: 从连接中获取通道channel

channel = connection.createChannel();

// 5: 申明队列queue存储消息

/*

* 如果队列不存在,则会创建

* Rabbitmq不允许创建两个相同的队列名称,否则会报错。

*

* @params1: queue 队列的名称

* @params2: durable 队列是否持久化

* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭

* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。

* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。

* */

// 这里如果queue已经被创建过一次了,可以不需要定义

//channel.queueDeclare("queue1", false, false, false, null);

// 6: 定义接受消息的回调

Channel finalChannel = channel;

finalChannel.basicConsume(queueName, true, new DeliverCallback() {

@Override

public void handle(String s, Delivery delivery) throws IOException {

System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));

}

}, new CancelCallback() {

@Override

public void handle(String s) throws IOException {

}

});

System.out.println(queueName + ":开始接受消息");

System.in.read();

} catch (Exception ex) {

ex.printStackTrace();

System.out.println("发送消息出现异常...");

} finally {

// 7: 释放连接关闭通道

if (channel != null && channel.isOpen()) {

try {

channel.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

if (connection != null && connection.isOpen()) {

try {

connection.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

};

public static void main(String[] args) {

// 启动两个线程去执行

new Thread(runnable, "queue2").start();

new Thread(runnable, "queue3").start();

new Thread(runnable, "queue4").start();

}

}

4. 路由模式Routing

说明:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定路由key

图形化界面实现

路由模式说白了就是比消息订阅模式多了一个路由key,学过sql的同学,可以理解为where条件,通过路由key去将消息发送到指定的队列,比如我们需要给微信,短信,邮箱发送消息,但是经理说不用给邮箱发送消息了,那么我们就可以指定路由key,那么最后接收到消息的就是微信和短信

1.添加交换机 2.绑定队列并添加路由key,绑定后我们发送消息就可以不填写队列名直接通过路由key去发送了 只有添加了sms路由key的能收到消息

生产者实现思路:

创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel创建交换机并指定交换机类型为direct,使用通道向交换机发送消息并指定key=b,关闭通道和连接。

package com.newer.rabbitmq.routing;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

/**

* 路由模式

*/

public class Producer {

public static void main(String[] args) {

// 1: 创建连接工厂

ConnectionFactory connectionFactory = new ConnectionFactory();

// 2: 设置连接属性

connectionFactory.setHost("139.196.122.115");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("/");

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

Connection connection = null;

Channel channel = null;

try {

// 3: 从连接工厂中获取连接

connection = connectionFactory.newConnection("生产者");

// 4: 从连接中获取通道channel

channel = connection.createChannel();

// 6: 准备发送消息的内容

String message = "你好,楚风!!!";

String exchangeName = "direct-Exchanges";

// String routingKey1 = "qqemail";

String routingKey2 = "sms";

// 7: 发送消息给中间件rabbitmq-server

// @params1: 交换机exchange

// @params2: 队列名称/routingkey

// @params3: 属性配置

// @params4: 发送消息的内容

channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());

// channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());

System.out.println("消息发送成功!");

} catch (Exception ex) {

ex.printStackTrace();

System.out.println("发送消息出现异常...");

} finally {

// 7: 释放连接关闭通道

if (channel != null && channel.isOpen()) {

try {

channel.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

if (connection != null) {

try {

connection.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

}

}

消费者实现思路:

创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,绑定队列到交换机,设置Qos=1,创建消费者并监听队列,使用手动方式返回完成。可以有多个队列绑定到交换机,但只要绑定key=b的队列key接收到消息,多个消费者进行监听。

package com.newer.rabbitmq.routing;

import com.rabbitmq.client.*;

import java.io.IOException;

/**

* Consumer:路由

*/

public class Consumer {

private static Runnable runnable = () -> {

// 1: 创建连接工厂

ConnectionFactory connectionFactory = new ConnectionFactory();

// 2: 设置连接属性

connectionFactory.setHost("139.196.122.115");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("/");

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

//获取队列的名称

final String queueName = Thread.currentThread().getName();

Connection connection = null;

Channel channel = null;

try {

// 3: 从连接工厂中获取连接

connection = connectionFactory.newConnection("生产者");

// 4: 从连接中获取通道channel

channel = connection.createChannel();

// 5: 申明队列queue存储消息

/*

* 如果队列不存在,则会创建

* Rabbitmq不允许创建两个相同的队列名称,否则会报错。

*

* @params1: queue 队列的名称

* @params2: durable 队列是否持久化

* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭

* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。

* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。

* */

// 这里如果queue已经被创建过一次了,可以不需要定义

//channel.queueDeclare("queue1", false, false, false, null);

// 6: 定义接受消息的回调

Channel finalChannel = channel;

finalChannel.basicConsume(queueName, true, new DeliverCallback() {

@Override

public void handle(String s, Delivery delivery) throws IOException {

System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));

}

}, new CancelCallback() {

@Override

public void handle(String s) throws IOException {

}

});

System.out.println(queueName + ":开始接受消息");

System.in.read();

} catch (Exception ex) {

ex.printStackTrace();

System.out.println("发送消息出现异常...");

} finally {

// 7: 释放连接关闭通道

if (channel != null && channel.isOpen()) {

try {

channel.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

if (connection != null && connection.isOpen()) {

try {

connection.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

};

public static void main(String[] args) {

// 启动两个线程去执行

new Thread(runnable, "queue2").start();

new Thread(runnable, "queue3").start();

new Thread(runnable, "queue4").start();

}

}

5.主题模式Topics

功能:生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;符号#:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor

符号*:只能匹配一个词lazy.* 可以匹配lazy.irs或者lazy.cor

生产者实现思路:

创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel创建交换机并指定交换机类型为topic,使用通道向交换机发送消息并指定key=key.1,关闭通道和连接。

package com.newer.rabbitmq.topic;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

/**

* 主题模式

*/

public class Producer {

public static void main(String[] args) {

// 1: 创建连接工厂

ConnectionFactory connectionFactory = new ConnectionFactory();

// 2: 设置连接属性

connectionFactory.setHost("139.196.122.115");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("/");

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

Connection connection = null;

Channel channel = null;

try {

// 3: 从连接工厂中获取连接

connection = connectionFactory.newConnection("生产者");

// 4: 从连接中获取通道channel

channel = connection.createChannel();

// 6: 准备发送消息的内容

String message = "你好,学相伴!!!";

String exchangeName = "topic_Exchanges";

String routingKey1 = "com.course.order";//都可以收到 queue-1 queue-2

String routingKey2 = "com.order.user";//都可以收到 queue-1 queue-3

// 7: 发送消息给中间件rabbitmq-server

// @params1: 交换机exchange

// @params2: 队列名称/routingkey

// @params3: 属性配置

// @params4: 发送消息的内容

channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());

System.out.println("消息发送成功!");

} catch (Exception ex) {

ex.printStackTrace();

System.out.println("发送消息出现异常...");

} finally {

// 7: 释放连接关闭通道

if (channel != null && channel.isOpen()) {

try {

channel.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

if (connection != null) {

try {

connection.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

}

}

消费者实现思路:

创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,绑定队列到交换机,设置Qos=1,创建消费者并监听队列,使用手动方式返回完成。可以有多个队列绑定到交换机,凡是绑定规则符合通配符规则的队列均可以接收到消息,比如key.*,key.#,多个消费者进行监听。

package com.newer.rabbitmq.topic;

import com.rabbitmq.client.*;

import java.io.IOException;

/**

* Consumer:路由

*/

public class Consumer {

private static Runnable runnable = () -> {

// 1: 创建连接工厂

ConnectionFactory connectionFactory = new ConnectionFactory();

// 2: 设置连接属性

connectionFactory.setHost("139.196.122.115");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("/");

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

//获取队列的名称

final String queueName = Thread.currentThread().getName();

Connection connection = null;

Channel channel = null;

try {

// 3: 从连接工厂中获取连接

connection = connectionFactory.newConnection("生产者");

// 4: 从连接中获取通道channel

channel = connection.createChannel();

// 5: 申明队列queue存储消息

/*

* 如果队列不存在,则会创建

* Rabbitmq不允许创建两个相同的队列名称,否则会报错。

*

* @params1: queue 队列的名称

* @params2: durable 队列是否持久化

* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭

* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。

* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。

* */

// 这里如果queue已经被创建过一次了,可以不需要定义

//channel.queueDeclare("queue1", false, false, false, null);

// 6: 定义接受消息的回调

Channel finalChannel = channel;

finalChannel.basicConsume(queueName, true, new DeliverCallback() {

@Override

public void handle(String s, Delivery delivery) throws IOException {

System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));

}

}, new CancelCallback() {

@Override

public void handle(String s) throws IOException {

}

});

System.out.println(queueName + ":开始接受消息");

System.in.read();

} catch (Exception ex) {

ex.printStackTrace();

System.out.println("发送消息出现异常...");

} finally {

// 7: 释放连接关闭通道

if (channel != null && channel.isOpen()) {

try {

channel.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

if (connection != null && connection.isOpen()) {

try {

connection.close();

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

};

public static void main(String[] args) {

// 启动两个线程去执行

new Thread(runnable, "queue2").start();

new Thread(runnable, "queue3").start();

new Thread(runnable, "queue4").start();

}

}

小结:

rabbitmq发送消息一定有一个交换机