一、概念介绍
RabbitMQ是一个消息中介(message broker):它接收和发送消息。你可以把它当成邮局:当你把想要寄的信投递进信箱,你可以确信邮递员最终会把信件送到收件人手中。在这个比喻中,RabbitMQ是信箱、邮局和邮递员。
RabbitMQ和邮局的主要不同在于它不处理信件,而是接收、保存和发送二进制数据块——消息(messages)。
RabbitMQ和一般的消息传递使用了一些术语:
生产(Producing )意味着仅发送消息。生产者(producer )就是一段发送消息的程序
队列(queue )是存在于RabbitMQ内部的信箱的名字。虽然消息流转于RabbitMQ和你的应用之间,但是它们只能存储在队列(queue)之中。队列的容量只受到宿主机的内存和硬盘的限制,它本质上是一个大的消息缓冲区。多个生产者可以发送消息到同一个队列,同时多个消费者也可以从队列中获取消息。我们用下图表示一个队列:
消费者(Consuming )表示消息的接收者。消费者(consumer )就是一段等待接收消息的程序。
注意:生产者(producer)、消费者(consumer)和消息中心(broker)不必放在同一台机器上;事实上在大多数应用中也确实如此。
二、“Hello World”
在这部分教程中我们将会写两个java程序;一个发送单条消息的生产者,和一个接收消息并打印出来的消费者。我们将详细介绍Java API中的一些细节,着重于这个简单的需求便于新人入门。这是一个消息传递的“Hello World”。
RabbitMQ Java客户端
RabbitMQ支持多种协议。这篇教程使用一个开源、通用的消息协议——AMQP 0-9-1。RabbitMQ有许多不同语言的客户端。在这里我们使用RabbitMQ官方提供的Java客户端
下载 和它的依赖 ( and )。把这些jar包粘贴到工程目录下,与你的java文件放在一起
请注意本教程使用SLF4J Simple 已经足够,但是在生产环境中你需要使用logback之类的更全面的日志库
(RabbitMQ Java客户端也存在于maven仓库中,groupId是com.rabbitmq,artifactId 是amqp-client)
现在我们拥有了Java客户端和相关依赖,可以开始写代码了!
Sending
我们把我们的消息发布者(sender)叫做Send ,把消息消费者(receiver)叫做Recv。发布者将会连接到RabbitMQ,发送一条消息,然后退出。
在中,我们需要引入一些包:
import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;
设置类和队列的名字:
ublic class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException { //... }}
然后我们可以创建一个到server的连接:
ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();
这里的connection 抽象了socket连接,并帮我们处理协议版本的协商和认证等。这里我们连接到了本地的一个消息中心——因此是localhost。如果我们想要连接到其他机器上的消息中心,我们只需要在这里简单的指定它的名字或者IP地址即可。
下一步我们会创建一个channel,这是大部分用于完成任务的API驻留的地方。
为了发送消息,我们必须声明一个队列让我们去发送消息;然后我们可以发送消息到这个队列:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");
声明一个队列是幂等的(idempotent)——它仅仅会在不存在的时候被创建。消息的内容是一个字节数组,所以你可以把它编码成任意你想要的形式。
最后,我们关闭channel 和connection:
channel.close();connection.close();
下边是完整的Send.java代码:
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }}
发送没有成功!
如果这是你第一次使用RabbitMQ并且你没有看到“Sent”消息,你可能会急的抓耳挠腮想要知道什么地方出了问题。或许是消息中心启动的时候硬盘没有足够的空间(默认情况下至少需要200MB剩余)所以它拒绝接收消息。检查消息中心的日志来确认并在必要情况下减小这个限制。这份会告诉你怎么设置disk_free_limit。
Receiving
以上是我们的消息发布者。我们的消费者接收RabbitMQ的消息推送,所以与发布者发布一个简单的消息不同,我们将会保持消费者运行来监听消息并打印。
Recv.java与Send.java有很多相同的导入:
import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;
这里额外的DefaultConsumer是一个实现了Consumer接口的类,我们会用它来缓存从消息中心推送过来的消息。
初始化代码与发布者相同;我们开启了一个connection和一个channel,并声明了一个用于消费的队列。注意这个队列要与消息发布者声明的队列相同。
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ... }}
注意这里我们也声明了一个队列。因为我们可能在发布者启动之前先启动消费者,所以我们要确认在消费消息前队列存在。
我们将告诉消息中心从队列中传递消息给我们。因为它推送消息是异步的,所以我们以对象的形式提供回调来缓存消息直到我们准备好使用它们。这就是DefaultConsumer所做的事情。
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }};channel.basicConsume(QUEUE_NAME, true, consumer);
以下是Recv.java的完整代码:
import com.rabbitmq.client.*;import java.io.IOException;public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); }}
这一部分介绍的是如何运行,使用IDE(Eclipse、IDEA等等)直接run就好,略
消费者会打印出通过RabbitMQ获得的提供者发布的消息。消费者会一直运行,等待获取消息
列出已有的队列
rabbitmqctl.bat list_queues
是时候学习第二部分来构建一个简单的工作的队列了!
原文地址:http://www.rabbitmq.com/tutorials/tutorial-one-java.html