博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ实例教程:用Java搞定工作队列
阅读量:6627 次
发布时间:2019-06-25

本文共 8847 字,大约阅读时间需要 29 分钟。

  在上一节中,我们学会了使用编程的方式发送和接收一个命名好的队列。本节中我们将会使用工作队列在多个工作者之间分发任务。

  工作队列的核心思想是避免立即处理高密集度必须等待完成的任务。它采用了安排任务的方式,将一个任务封装成一个消息把它放进队列。在后台运行的工作进程到时候会将它弹出并执行,这样任务队列中的任务就会被工作进程共享执行。

  工作队列适用于Web应用中在一个短的HTTP请求中处理复杂任务的场景。

  在上节中,我们发送了一个“Hello World!”字符串消息。现在发送多个字符串消息表示复杂任务。我们现在像图片重置大小,渲染PDF文件这样的真实任务,但我们使用 Thread.sleep() 假装正在我们忙。我们将字符串中的点的数量作为其复杂性;每个点都占1秒钟“工作”。例如,一个包含“...”这样的假任务就会需要三秒钟。

  NewTask.java

package com.favccxx.favrabbit;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;public class NewTask {	  private static final String TASK_QUEUE_NAME = "task_queue";	  public static void main(String[] argv) throws Exception {	    ConnectionFactory factory = new ConnectionFactory();	    factory.setHost("localhost");	    Connection connection = factory.newConnection();	    Channel channel = connection.createChannel();	    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);	    String[] args = {"Shuai Ge","ai","MeiNv","..."};	    String message = getMessage(args);	    channel.basicPublish("", TASK_QUEUE_NAME,	        MessageProperties.PERSISTENT_TEXT_PLAIN,	        message.getBytes("UTF-8"));	    System.out.println(" [x] Sent '" + message + "'");	    	    for(int i=0;i<10;i++){	    	channel.basicPublish("", TASK_QUEUE_NAME,	    	        MessageProperties.PERSISTENT_TEXT_PLAIN,	    	        (message+i).getBytes("UTF-8"));	    	System.out.println("Sent Message:" + message+i);	    }	    channel.close();	    connection.close();	  }	  private static String getMessage(String[] strings) {	    if (strings.length < 1)	      return "Hello World!";	    return joinStrings(strings, " ");	  }	  private static String joinStrings(String[] strings, String delimiter) {	    int length = strings.length;	    if (length == 0) return "";	    StringBuilder words = new StringBuilder(strings[0]);	    for (int i = 1; i < length; i++) {	      words.append(delimiter).append(strings[i]);	    }	    return words.toString();	  }}

  控制台输出

 [x] Sent 'Shuai Ge ai MeiNv ...'

Sent Message:Shuai Ge ai MeiNv ...0

Sent Message:Shuai Ge ai MeiNv ...1

Sent Message:Shuai Ge ai MeiNv ...2

Sent Message:Shuai Ge ai MeiNv ...3

Sent Message:Shuai Ge ai MeiNv ...4

Sent Message:Shuai Ge ai MeiNv ...5

Sent Message:Shuai Ge ai MeiNv ...6

Sent Message:Shuai Ge ai MeiNv ...7

Sent Message:Shuai Ge ai MeiNv ...8

Sent Message:Shuai Ge ai MeiNv ...9

  Worker.java

package com.favccxx.favrabbit;import java.io.IOException;import java.text.DateFormat;import java.text.SimpleDateFormat;import java.util.Date;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class Worker {	private static final String TASK_QUEUE_NAME = "task_queue";		private static DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");	public static void main(String[] argv) throws Exception {		ConnectionFactory factory = new ConnectionFactory();		factory.setHost("localhost");		final Connection connection = factory.newConnection();		final Channel channel = connection.createChannel();		channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");		channel.basicQos(1);		final Consumer consumer = new DefaultConsumer(channel) {			@Override			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,					byte[] body) throws IOException {				String message = new String(body, "UTF-8");				System.out.println(df.format(new Date()) + " [x] Received '" + message + "'");				try {					doWork(message);				} finally {					System.out.println(" [x] Done");					channel.basicAck(envelope.getDeliveryTag(), false);				}			}		};		channel.basicConsume(TASK_QUEUE_NAME, false, consumer);	}	private static void doWork(String task) {		for (char ch : task.toCharArray()) {			if (ch == '.') {				try {					Thread.sleep(1000);				} catch (InterruptedException _ignored) {					Thread.currentThread().interrupt();				}			}		}	}}

  控制台输出

 [*] Waiting for messages. To exit press CTRL+C

2015-10-08 15:41:36 [x] Received 'Shuai Ge ai MeiNv ...'

 [x] Done

2015-10-08 15:41:39 [x] Received 'Shuai Ge ai MeiNv ...0'

 [x] Done

2015-10-08 15:41:42 [x] Received 'Shuai Ge ai MeiNv ...1'

 [x] Done

2015-10-08 15:41:45 [x] Received 'Shuai Ge ai MeiNv ...2'

 [x] Done

2015-10-08 15:41:48 [x] Received 'Shuai Ge ai MeiNv ...3'

 [x] Done

2015-10-08 15:41:51 [x] Received 'Shuai Ge ai MeiNv ...4'

 [x] Done

2015-10-08 15:41:54 [x] Received 'Shuai Ge ai MeiNv ...5'

 [x] Done

2015-10-08 15:41:57 [x] Received 'Shuai Ge ai MeiNv ...6'

 [x] Done

2015-10-08 15:42:00 [x] Received 'Shuai Ge ai MeiNv ...7'

 [x] Done

2015-10-08 15:42:03 [x] Received 'Shuai Ge ai MeiNv ...8'

 [x] Done

2015-10-08 15:42:06 [x] Received 'Shuai Ge ai MeiNv ...9'

 [x] Done

2015-10-08 15:42:46 [x] Received 'Shuai Ge ai MeiNv ...'

 [x] Done

2015-10-08 15:42:49 [x] Received 'Shuai Ge ai MeiNv ...0'

 [x] Done

2015-10-08 15:42:52 [x] Received 'Shuai Ge ai MeiNv ...1'

 [x] Done

2015-10-08 15:42:55 [x] Received 'Shuai Ge ai MeiNv ...2'

 [x] Done

2015-10-08 15:42:58 [x] Received 'Shuai Ge ai MeiNv ...3'

 [x] Done

2015-10-08 15:43:01 [x] Received 'Shuai Ge ai MeiNv ...4'

 [x] Done

2015-10-08 15:43:04 [x] Received 'Shuai Ge ai MeiNv ...5'

 [x] Done

2015-10-08 15:43:07 [x] Received 'Shuai Ge ai MeiNv ...6'

 [x] Done

2015-10-08 15:43:10 [x] Received 'Shuai Ge ai MeiNv ...7'

 [x] Done

2015-10-08 15:43:13 [x] Received 'Shuai Ge ai MeiNv ...8'

 [x] Done

2015-10-08 15:43:16 [x] Received 'Shuai Ge ai MeiNv ...9'

 [x] Done

  循环分发消息(Round-robin dispatching)

  使用任务队列的一个好处是轻松处理并行工作,如果我们有一个积压的工作,通过添加更多的工人就可以完成。

  首先,现在有两个worker实例在同时工作,他们都从队列中读取消息。接下来这么做:

  (1)运行NewTask类,发送10个消息队列,控制台输出如下内容:

 [x] Sent 'Shuai Ge ai MeiNv ...'Sent Message:Shuai Ge ai MeiNv ...0Sent Message:Shuai Ge ai MeiNv ...1Sent Message:Shuai Ge ai MeiNv ...2Sent Message:Shuai Ge ai MeiNv ...3Sent Message:Shuai Ge ai MeiNv ...4Sent Message:Shuai Ge ai MeiNv ...5Sent Message:Shuai Ge ai MeiNv ...6Sent Message:Shuai Ge ai MeiNv ...7Sent Message:Shuai Ge ai MeiNv ...8Sent Message:Shuai Ge ai MeiNv ...9

   (2)启动一个worker实例,其输出内容如下:

2015-10-08 15:53:45 [x] Received 'Shuai Ge ai MeiNv ...' [x] Done2015-10-08 15:53:48 [x] Received 'Shuai Ge ai MeiNv ...1' [x] Done2015-10-08 15:53:51 [x] Received 'Shuai Ge ai MeiNv ...3' [x] Done2015-10-08 15:53:54 [x] Received 'Shuai Ge ai MeiNv ...5' [x] Done2015-10-08 15:53:57 [x] Received 'Shuai Ge ai MeiNv ...7' [x] Done2015-10-08 15:54:00 [x] Received 'Shuai Ge ai MeiNv ...9' [x] Done

  (3)启动另外一个worker实例,其输出内容如下:

2015-10-08 15:53:45 [x] Received 'Shuai Ge ai MeiNv ...0' [x] Done2015-10-08 15:53:48 [x] Received 'Shuai Ge ai MeiNv ...2' [x] Done2015-10-08 15:53:51 [x] Received 'Shuai Ge ai MeiNv ...4' [x] Done2015-10-08 15:53:54 [x] Received 'Shuai Ge ai MeiNv ...6' [x] Done2015-10-08 15:53:57 [x] Received 'Shuai Ge ai MeiNv ...8' [x] Done

  RabbitMQ可能会出现下述所示的队列变化图  

  默认情况下,RabbitMQ会按顺序将消息发送给下一个消费者,每个消费者都有相同数量的信息,跟消息的持续时长没有关系。这种分发消息的模式就是循环分发(round-robin)。

  消息应答模式(Message acknowledgment)

  每个任务执行都会占用几秒钟时间,如果一个任务启动用了很长时间后因为某种原因死掉了,但只完成了部分任务,该怎么办呢?在上面的round-robin模式下,一旦RabbitMQ将消息分发给一个消费者就会立即将其从内存中移除。在这种情况下,如果杀掉worker进程就会丢失正在处理的消息,当然也会丢失分发给该worker的未处理的消息。

  但我们不想丢失任何任务。如果一个worker进程死掉了,我们希望将该任务分发给其它工作进程。

  为了解决上面的问题,RabbitMQ支持应答模式让消费者告诉RabbitMQ特定的消息是否已经收到并处理,如果处理了就从内存中移除。

  如果一个消息消费者没有应答的话,RabbitMQ会假设该消息没有处理并将它转发给其它消费者。这样就能确保消息不会丢失,即便工作进程意外死掉。

  消息没有超时一说,RabbitMQ只有在工作进程连接死掉的时候才会重新投递消息。即便一个消息需要很长很长的时间处理也是不会出问题。

  消息应答模式默认是开启的,在前面的例子我们通过autoAck=true显式的关闭了。现在将该属性设置为true即可。


  消息持久化(Message durability)

  上面我们知道了如何处理消息消费者死机的问题,但是如果RabbitMQ服务器宕机呢?

  当RabbitMQ退出或崩溃时,除非你提醒它,否则它会忘记队列和消息。若想消息不丢失的话,就必须让队列和消息都设为持久化。

  若想RabbitMQ不会丢失队列的话,可以通过下面的方式将其声明为持久化:

boolean durable = true;channel.queueDeclare("hello", durable, false, false, null);

  尽管上面的代码是正确的,但是它不会起作用的,因为我们已经定义了非持久化的“hello”队列。RabbitMQ不允许使用不同的参数重新定义已存在的队列,那样的话会返回错误。我们可以采用将其声明为不同的队列名字作为解决方案,如:

boolean durable = true;channel.queueDeclare("task_queue", durable, false, false, null);

  队列声明改变后需要同时应用到消息生产者和消息消费者身上。

  这时,我们就能确保RabbitMQ重启后task_queue队列不会丢失。现在需要通过设置 MessageProperties 属性值为 PERSISTENT_TEXT_PLAIN 将消息标记为持久化。

import com.rabbitmq.client.MessageProperties;channel.basicPublish("", "task_queue",  MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

  公平分发消息(Fair dispatch)

  你可能注意到分发有时候并不像我们想象的那样,比如,有两个消息消费者时有一个一边的消息是复杂耗时的,而另一边消息是简单快速的,这样一个队列经常是繁忙的,而另一个队列非常轻松。RabbitMQ并不知道这些仍然是平均分发消息。

  造成这样的原因是RabbitMQ仅仅是当消息到达队列的出口时才转发消息,它并不在乎未到达消息消费者的消息数量。它只是盲目的将奇数消息发送给一个消费者,偶数消息发送给另一个消费者。

  解决上面问题的方法就是设置 prefetchCount = 1,这就好比告诉RabbitMQ每个只给工作进程一个消息。换句话说,就是在工作进程处理完并应答该消息前,不会发送给它新的消息,它会把它消息发送给其它的空闲工作进程。

int prefetchCount = 1;channel.basicQos(prefetchCount);

转载地址:http://sumpo.baihongyu.com/

你可能感兴趣的文章
[LeetCode] Two Sum III - Data Structure Design
查看>>
课后作业-阅读任务-阅读笔记-4
查看>>
【转】ARC下dealloc过程及.cxx_destruct的探究
查看>>
NGUI的窗体的推动和调节大小(drag object和drag resize object)
查看>>
关于WordPress中字体加载慢的问题解决方案(转)
查看>>
PhotoShop常用快捷键
查看>>
关于 MySQL LEFT JOIN 你可能需要了解的三点
查看>>
mysql filesort 的解决方案
查看>>
GDAL源码剖析(十一)之OGR投影说明
查看>>
第七章例题、心得及问题。
查看>>
windows7系统下一些常用工具的总结
查看>>
Python垃圾回收机制(转)
查看>>
02-CSS基础与进阶-day9_2018-09-12-21-37-34
查看>>
OBJECT_ID 技巧整理
查看>>
【转】 cin、cin.get()、cin.getline()、getline()、gets()等函数的用法
查看>>
对链表的操作(数据结构线性表算法设计练习)
查看>>
关于 this 和 prototype 的理解
查看>>
面试总结——Java高级工程师(三)
查看>>
java开始到熟悉62
查看>>
hd acm2035
查看>>