RabbitMQ的是一个复杂的野兽。 它灵活,强大,但也很难完全把控和掌握。 许多不同的使用情况和使用模式都可以建立在这个强大的软件之上,但在第一次尝试为一个特定的解决方案编写代码时,差错和设计错误也是司空见惯的事情。 在本文中,我将讨论 Palermo 的设计和实现,它是一个实现了可以将RabbitMQ用作底层排队机制构建的那些可能的使用模式其中之一:批处理作业处理系统。 通过批处理作业处理系统,我们引用了一种机制来由 作业程序从不同队列中提取作业的自动执行。客户可以排入新的作业到这些队列,它们最终将被传递给将执行作业的程序。如果在执行一个任务失败了,作业程序线程将会把失败的作业放入一个特殊的队列,它可以重新执行,或者检查出来进行错误调试。 创建Palermo的灵感主要来自于Resque ,它是一个作业处理系统,由Github使用Ruby和Redis创建. 对于系统的使用者,Resque让这些变成可能:定义拥有不同名字的队列;作业排队时,在系统中通过某个输入参数匹配Ruby类名;在不同的机器中启动工作者进程--它们将处理作业,实例化Ruby类,并使用提供的输入参数来执行作业.如果作业执行失败,作业将被路由到一个特殊的"失败"作业队列.在这个队列中的作业会重新执行或被删除.Resque工作者和底层的操作系统很好的集成在一起,它可以像系统操作者控制作业执行的方式,来处理即将到来的信号.它也提供了一个web接口,可以监控作业和工作者的状态,同时也可以执行某种动作,像作业的重新排队或队列中作业的清理. 从一个开发者的角度来讲,Resque的主要优势是整个系统的使用超级简单.定义和排队作业只需要几行Ruby代码即可,同时系统以一种持续的和鲁棒的方式运行. Palermo的目标是针对JVM语言,创建一个简单易用的作业处理系统,像Resque一样的健壮,并使用RabbitMQ作为底层的队列技术而不是Redis. 从一个正式的角度来讲,一个作业处理系统的队列技术可以定义为一个元组空间(tuple space),一种连续关联的内存,一些进程排队作业使用如下的方式编写元组: write(queue_name, job_type, input_argument) 工作者进程从内存删除元组,一次一个,使用谓词匹配队列名: read(queue_name, ?, ?) 一个仅有的,必须以元组空间,加入到作业处理队列技术模型中的限制是,来自分布式内存中的读函数必须遵从先进先出(FIFO)的语义.RabbitMQ队列可以看作是这种类型的,包含FIFO语义的内存.队列名作为第一个参数传递到"读"谓词,用来提取存储在内存中的下一个匹配的元组.
为了获取想要读取的元组空间语义,我们需要处理RabbitMQ内部的一些设置,并配置如下的选项:
- 队列的持久性 由于要使用可持续的内存,我们需要为RabbitMQ所管理的队列和消息增加可持续性.为了达到这个效果,在RabbitMQ之中,队列需要声明为"durable",消息需要声明为"persistent".通过这种方式,即便RabbitMQ broker崩溃了,对于已经创建的队列信息和未决的消息将在重启时恢复. 当超过一个工作者连接到同一个队列时,RabbitMQ会使用round robin的方式,在所有的可用工作者中分发消息.这里主要的问题在于,只要消息到达队列,RabbitMQ就将发送一个消息到下一个工作者,而不管这个工作者是否正在处理一个不同的消息.如果一个作业需要很长的时间来处理,到来的消息将堆叠在这个繁忙工作者的本地缓存中,而其他的工作者将处于闲置状态.我们可以使用RabbitMQ的两个特性来处理这种情况:消息确认和服务质量/预取数量. 首先,工作者采用显式确认的方式处理完消息后可以通知RabbitMQ.消息只有确认后才会从队列删除.如果工作者停止运行而没有确认消息,RabbitMQ会自动将消息重新排队. 同时,服务质量(qos)配置可以告知RabbitMQ,发送给特定工作者的最大未确认消息数.如果设置了这个值,即从预取数量到1,只有最后一条消息被工作者进程确认,其他消息才能从队列发送出去.
我们仍然需要找到一种方式实现写内存的操作.RabbitMQ不同于其他队列排队系统的一个重要的方面,是强烈的订阅者和消费者分离的考量.在RabbitMQ体系结构中,这点是通过队列和订阅者交换的介绍来实现的.订阅者不知道消息的最终目的地,他们只知道一个交换的名字和路由关键字,这个关键字可以看作是发送消息的地址. 针对不同的交换类型,RabbitMQ支持不同的语义,这将影响一个带有地址的消息匹配一个交换的方式.它将传递到实际的信箱(队列),消费者将从这里接收消息. 在我们的例子中,写方法的第一个参数是队列名称。 队列名称也是工作者预测下一个内存中待处理的任务时使用的参数。 Palermo中该方法下一个参数是一种直接交换类型。 在这种交换类型中,路由关键字必须和传输消息的队列名称匹配。 如果一条消息被发往 RabbitMQ 进行交换,并且没有队列连接到交换,那么这条消息会被丢弃或者被发往相关的备用交换。 为了避免这种情况, Palermo中一个发布者每次把一个新的任务加入到队列中时,在发送任务的数据之前,发布者会声明匹配路由关键字的队列。 通过这种方式,我们可以确定当没有工作者在等待处理任务时消息也不会被丢弃。 在 RabbitMQ中,使用相同的参数重新声明一个已存在的队列是一个合法的操作,且没有任何影响。 使用之前为RabbitMQ所描述的设置,我们可以建立工作处理系统的核心。然而,某些特性,像失败事务的管理或者消息的序列化,它们不能被编址放到RabbitMQ的特性中。在Palermo中,这个特性已经被实现,并被作为一个附加的应用软件层,这个层可以被分配作为一个Java库。 首要的问题是如何处理失败的事务。我们来看它是怎么(工作的),过去我们使用显式的人工确认,在人工处理或超时时,RabbitMQ将会处理致命的错误。这套机制也可以用于处理在事务处理逻辑中(产生的)任何类型的异常条件,除了我们期望的功能需求外,失败信息必定会发送给一个特别的失败事务队列,他们可以被查阅,移除和重入队。这个功能已经在Palermo中被完成,被包装为一个通用的Java事务用来处理try/catch块,并通过管道将失败信息添加到队列中,并添加一些关于事务的错误信息,(例如)重试次数和在消息元数据的头中的原始队列一并发送消息给RabbitMQ。 |