当前位置:首页 » 操作系统 » rabbitmq源码

rabbitmq源码

发布时间: 2022-11-29 14:15:32

‘壹’ net开源项目整理

整理一些平时收藏和应用的开源代码,方便自己学习和查阅

1.应用

nopcommerce ,开源电商网站,开发环境asp.net mvc(未支持.net core),使用技术(autofac,ef,页面插件等)

https://github.com/nopSolutions/nopCommerce

OrchardCMS ,内容管理网站

https://github.com/OrchardCMS/Orchard(.net版本)

https://github.com/OrchardCMS/Orchard2(.net core版本)

ABP(aspnetboilerplate) ,提供一系列工具用于web应用创建,支持 ASP.NET Core, ASP.NET MVC & Web API,也提供了web应用的模板

https://github.com/aspnetboilerplate/aspnetboilerplate(.net core 版本,tag分支有支持.net版本的)

IdentityServer ,用户授权网站(支持openid和OAuth 2.0),可用于单点登录和第三方授权等

https://github.com/IdentityServer/IdentityServer3(.net版本)

https://github.com/IdentityServer/IdentityServer4(.net core版本)

eShopOnContainers 微软提供的微服务实例

https://github.com/dotnet-architecture/eShopOnContainers

PetShop 三层架构经典例子,用于新手学习,不过aspx有点过时了

https://github.com/songhhwd01/PetShop

BlogEngine.NET 博客网站,也是aspx

https://github.com/rxtur/BlogEngine.NET

2.组件

Lucene.Net 全文检索开发组件

https://github.com/apache/lucenenet

ServiceStack 半开源,用于创建web服务

https://github.com/ServiceStack/ServiceStack

MassTransit 可用于创建基于消息的服务和应用,依赖于RabbitMQ

https://github.com/MassTransit/MassTransit

stateless 简单的工作流开发组件,不支持在线定制工作流

https://github.com/dotnet-state-machine/stateless

Hangfire 任务调度开发利器

https://github.com/HangfireIO/Hangfire

Jwt.Net 用于生成JWT (JSON Web Token) 和JWT校验

https://github.com/jwt-dotnet/jwt

npoi 支持office文件的读写

https://github.com/tonyqus/npoi

StackExchange.Redis Redis的.net客户端

https://github.com/StackExchange/StackExchange.Redis

CacheManager 用于缓存的管理,支持Redis.Memcached,couchbase等

https://github.com/MichaCo/CacheManager

Autofac Ioc组件,用于依赖注入

https://github.com/autofac/Autofac

LightGBM 用于机器学习

https://github.com/Microsoft/LightGBM

3.框架

asp.net mvc

https://github.com/aspnet/Mvc

Nancy 类似asp.net mvc,web开发框架

https://github.com/NancyFx/Nancy

4.其他

dotnet core 主页 ,提供dotnet core相关知识的索引和例子,方便快速入门

https://github.com/dotnet/core

.net源码

https://github.com/Microsoft/referencesource

‘贰’ 在linux下安装rabbitmq失败怎么解决

RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这些优点。
AMQP 里主要要说两个组件:Exchange 和 Queue (在 AMQP 1.0 里还会有变动),如下图所示,绿色的 X 就是 Exchange ,红色的是 Queue ,这两者都在 Server 端,又称作 Broker ,这部分是 RabbitMQ 实现的,而蓝色的则是客户端,通常有 Procer 和 Consumer 两种类型:

1:mq的安装需要Erlang,所以首先下载Erlang,下载地址:http://www.erlang.org/download.html直接下载源码,编译安装即可。
将下载好的tar包解压编译安装,如下命令:
tar -zxvf otp_src_R16B03-1.tar.gz

cd otp_src_R16B03-1
./configure && make install

安装过程中可能出现如下错误:
configure:error:
No curses library functions found
configure: error:/bin/sh'/home/niewf/software/erlang_R13B01/erts/configure'
failed for erts

解决方法:
yum list|grep ncurses
yum -y install ncurses-devel
yum install ncurses-devel

或者直接下载ncurses包编译安装。
下载地址:http://download.chinaunix.net/download/0008000/7242.shtml
tar zxvf ncurses.tar.gz #解压缩并且释放 文件包
cd ncurses #进入解压缩的目录(注意版本)
./configure #按照你的系统环境制作安装配置文件
make #编译源代码并且编译NCURSES库
su root #切换到root用户环境
make install #安装编译好的NCURSES库

完成后继续返回上一步操作。

2:安装python,如果系统中python版本低于2.5的话需要升级python到2.6以上,具体可参考:http://gavinshaw.blog.51cto.com/385947/610585

3:安装simplejson,直接下载simplejson源码包编译安装即可,下载地址:https://pypi.python.org/pypi/simplejson/。
下载simplejson源码包后,运行python setup.py install即可完成安装。

4:安装rabbit mq,下载地址:https://www.rabbitmq.com/install-generic-unix.html
下载后放入相应目录解压,进入%RABBITMQ_HOME%/sbin目录下运行:./rabbitmq-server start即可启动mq。
如果遇到如下错误,则参考http://leeon.me/a/rabbitmq-start-fail-note解决方案
ERROR: epmd error for host "xxx": address (cannot connect to host/port)
到此mq已经安装完成。
在%RABBITMQ_HOME%/sbin目录运行./rabbitmqctl status可查看当前mq状态。
同时mq也提供了界面查看当前mq状态,但是需要启用该插件功能,运行如下命令:
rabbitmq-plugins enable rabbitmq_management,然后在浏览器中输入:http://host-name:15672/#/即可访问,页面结果如下:

‘叁’ [Android源码分析] - 异步通信Handler机制

一、问题:在Android启动后会在新进程里创建一个主线程,也叫UI线程( 非线程安全 )这个线程主要负责监听屏幕点击事件与界面绘制。当Application需要进行耗时操作如网络请求等,如直接在主线程进行容易发生ANR错误。所以会创建子线程来执行耗时任务,当子线程执行完毕需要通知UI线程并修改界面时,不可以直接在子线程修改UI,怎么办?

解决方法:Message Queue机制可以实现子线程与UI线程的通信。

该机制包括Handler、Message Queue、Looper。Handler可以把消息/ Runnable对象 发给Looper,由它把消息放入所属线程的消息队列中,然后Looper又会自动把消息队列里的消息/Runnable对象 广播 到所属线程里的Handler,由Handler处理接收到的消息或Runnable对象。

1、Handler

每次创建Handler对象时,它会自动绑定到创建它的线程上。如果是主线程则默认包含一个Message Queue,否则需要自己创建一个消息队列来存储

Handler是多个线程通信的信使。比如在线程A中创建AHandler,给它绑定一个ALooper,同时创建属于A的消息队列AMessageQueue。然后在线程B中使用AHandler发送消息给ALooper,ALooper会把消息存入到AMessageQueue,然后再把AMessageQueue广播给A线程里的AHandler,它接收到消息会进行处理。从而实现通信。

2、Message Queue

在主线程里默认包含了一个消息队列不需要手动创建。在子线程里,使用Looper.prepare()方法后,会先检查子线程是否已有一个looper对象,如果有则无法创建,因为每个线程只能拥有一个消息队列。没有的话就为子线程创建一个消息队列。

Handler类包含Looper指针和MessageQueue指针,而Looper里包含实际MessageQueue与当前线程指针。

下面分别就UI线程和worker线程讲解handler创建过程:

首先,创建handler时,会自动检查当前线程是否包含looper对象,如果包含,则将handler内的消息队列指向looper内部的消息队列,否则,抛出异常请求执行looper.prepare()方法。

 - 在 UI线程 中,系统自动创建了Looper 对象,所以,直接new一个handler即可使用该机制;

- 在 worker线程 中,如果直接创建handler会抛出运行时异常-即通过查‘线程-value’映射表发现当前线程无looper对象。所以需要先调用Looper.prepare()方法。在prepare方法里,利用ThreadLocal<Looper>对象为当前线程创建一个Looper(利用了一个Values类,即一个Map映射表,专为thread存储value,此处为当前thread存储一个looper对象)。然后继续创建handler, 让handler内部的消息队列指向该looper的消息队列(这个很重要,让handler指向looper里的消息队列,即二者共享同一个消息队列,然后handler向这个消息队列发送消息,looper从这个消息队列获取消息) 。然后looper循环消息队列即可。当获取到message消息,会找出message对象里的target,即原始发送handler,从而回调handler的handleMessage() 方法进行处理。

 - handler与looper共享消息队列 ,所以handler发送消息只要入列,looper直接取消息即可。

 - 线程与looper映射表 :一个线程最多可以映射一个looper对象。通过查表可知当前线程是否包含looper,如果已经包含则不再创建新looper。

5、基于这样的机制是怎样实现线程隔离的,即在线程中通信呢。 

核心在于 每一个线程拥有自己的handler、message queue、looper体系 。而 每个线程的Handler是公开 的。B线程可以调用A线程的handler发送消息到A的共享消息队列去,然后A的looper会自动从共享消息队列取出消息进行处理。反之一样。

二、上面是基于子线程中利用主线程提供的Handler发送消息出去,然后主线程的Looper从消息队列中获取并处理。那么还有另外两种情况:

1、主线程发送消息到子线程中;

采用的方法和前面类似。要在子线程中实例化AHandler并设定处理消息的方法,同时由于子线程没有消息队列和Looper的轮询,所以要加上Looper.prepare(),Looper.loop()分别创建消息队列和开启轮询。然后在主线程中使用该AHandler去发送消息即可。

2、子线程A与子线程B之间的通信。

1、 Handler为什么能够实现不同线程的通信?核心点在哪?

不同线程之间,每个线程拥有自己的Handler、消息队列和Looper。Handler是公共的,线程可以通过使用目标线程的Handler对象来发送消息,这个消息会自动发送到所属线程的消息队列中去,线程自带的Looper对象会不断循环从里面取出消息并把消息发送给Handler,回调自身Handler的handlerMessage方法,从而实现了消息的线程间传递。

2、 Handler的核心是一种事件激活式(类似传递一个中断)的还是主要是用于传递大量数据的?重点在Message的内容,偏向于数据传输还是事件传输。

目前的理解,它所依赖的是消息队列,发送的自然是消息,即类似事件中断。

0、 Android消息处理机制(Handler、Looper、MessageQueue与Message)

1、 Handler、Looper源码阅读

2、 Android异步消息处理机制完全解析,带你从源码的角度彻底理解

谢谢!

wingjay

![](https://avatars0.githubusercontent.com/u/9619875?v=3&s=460)

‘肆’ Spring boot集成RabbitMQ中Exchange与Queue参数详解

关于springBoot整合RabbitMQ及基本使用可以参考: springBoot整合RabbitMQ及基本使用

设置exchange为持久化之后,并不能保证消息不丢失,因为此时发送往exchange中的消息并不是持久化的,需要配置delivery_mode=2指明message为持久的。

在FanoutExchange中,会将发送的消息封装为 Message 对象,该对象中有一个 MessageProperties 对象,用来指定消息的属性,而 MessageProperties 中默认的 deliveryMode 属性为 MessageDeliveryMode.PERSISTENT ,查看源码得知,MessageDeliveryMode.PERSISTENT = 2,所以 FanoutExchange 发送的消息默认就是持久化的。

‘伍’ RabbitMQ笔记十:MessageConverter详解

org.springframework.amqp.support.converter.MessageConverter

Message toMessage(Object object, MessageProperties messageProperties);
java对象和属性对象转换成Message对象。

Object fromMessage(Message message) throws MessageConversionException;
将消息对象转换成java对象。

定义Config类

MessageListenerAdapter中定义的消息转换器,消费端接收的消息就从Message类型转换成了String类型

消费者处理消息的Handler

启动类

启动应用类,发送消息到 hao.miao.order 队列,控制台打印:

从控制台打印我们知道了在消费者处理消息之前会进行消息类型转换,调用 TestMessageConverter 的 fromMessage 方法,然后执行消息处理器的 onMessage 方法,方法参数就是 String 类型。

自定义一个MyBody类型,将消息从Message转换成MyBody类型

然后修改 TestMessageConverter 的 fromMessage 方法,返回了 MyBody 类型,那么消息处理器的消费方法也是MyBody参数的消费方法

此时的消息处理器,处理器中的方法的入参就是MyBody类型了,

此时控制台打印:

我们还测试如下如果不使用自定义的 Converter ,那么当消息的属性中含有属性content_type的值为text,那么默认的转换成的java类型就是String类型,如果不指定那么默认的转换类型就是byte[]

我们跟进去 MessageListenerAdapte r的 setMessageConverter 方法,

我们发现默认的 MessageConverter 是 SimpleMessageConverter ,我们进入 SimpleMessageConverter 类中看其默认的转换逻辑

源码分析总结:
1. MessageConverter 可以把 java 对象转换成 Message 对象,也可以把 Message 对象转换成 java 对象
2. MessageListenerAdapter 内部通过 MessageConverter 把 Message 转换成java对象,然后找到相应的处理方法,参数为转换成的java对象。
3. SimpleMessageConverter 处理逻辑:
如果 content_type 是以text开头,则把消息转换成 String 类型
如果 content_type的 值是 application/x-java-serialized-object 则把消息序列化为java对象,否则,把消息转换成字节数组。

‘陆’ 如何正确生成RabbitMQ-C客户端库

1. 下载 rabbitmq-c-master源码
2. 下载 rabbitmq-codegen 源码
3. 将 rabbitmq-codegen 中的内容拷贝到 rabbitmq-c-master 中的 codegen 目录下(如果没有该目录请自行创建)
4.打开openssl目录,修改ms文件夹下的ntdll.mak文件,将CFLAG的/WX选项去掉
5.执行Configure 运行perl Configure VC-WIN32
6.运行 ms\do_ms
7.运行 nmake -f ms\ntdll.mak 执行make进行编译.....nmake -f ms\ntdll.mak 命令将OpenSSL编译成动态库,如果想编译成静态库应使用 nmake -f ms\nt.mak
8.运行 nmake -f ms\ntdll.mak test.检查上一部编译是否成功。正常的话会执行编译后的测试程序
至此OpenSSL在windows下编译完成,编译得到的dll和lib文件位置:静态库libeay32.lib和ssleay32.lib位于out32文件夹下,动态库libeay32.dll,libeay32.lib,ssleay32.dll,ssleay32.lib位于out32dll文件夹下。
9.使用VS2010编译OpenSSL的过程记录
10. 通过 win32 的 cmd 或者 cmake-gui 执行相应的命令进行相关文件生成。
11.生成的sln用vs2010打开 编译 若成功后在librabbitmq文件夹debug中有rabbitmq.1.lib和rabbitmq.1.dll文件
12. 下载的 rabbit-c-master 源码中没有文件 amqp_framing.h 和 amqp_framing.c,这两个文件是通过 codegen.py 和 amqp_codegen.py 产生的。
13.使用 CMake 时路径中最好不要出现中文。

‘柒’ Spring整合rabbitmq实践(一):基础

Spring整合rabbitmq实践(二):扩展
Spring整合rabbitmq实践(三):源码

procer:消息生产者;

consumer:消息消费者;

queue:消息队列;

exchange:接收procer发送的消息按照binding规则转发给相应的queue;

binding:exchange与queue之间的关系;

virtualHost:每个virtualHost持有自己的exchange、queue、binding,用户只能在virtualHost粒度控制权限。

fanout:

群发到所有绑定的queue;

direct:

根据routing key routing到相应的queue,routing不到任何queue的消息扔掉;可以不同的key绑到同一个queue,也可以同一个key绑到不同的queue;

topic:

类似direct,区别是routing key是由一组以“.”分隔的单词组成,可以有通配符,“*”匹配一个单词,“#”匹配0个或多个单词;

headers:

根据arguments来routing。

arguments为一组key-value对,任意设置。

“x-match”是一个特殊的key,值为“all”时必须匹配所有argument,值为“any”时只需匹配任意一个argument,不设置默认为“all”。

通过以下配置,可以获得最基础的发送消息到queue,以及从queue接收消息的功能。

这个包同时包含了一些其它的包:spring-context、spring-tx、spring-web、spring-messaging、spring-retry、spring-amqp、amqp-client,如果想单纯一点,可以单独引入。

最主要的是以下几个包,

spring-amqp:

spring-rabbit:

amqp-client:

个人理解就是,spring-amqp是spring整合的amqp,spring-rabbit是spring整合的rabbitmq(rabbitmq是amqp的一个实现,所以可能spring-rabbit也是类似关系),amqp-client提供操作rabbitmq的java api。

目前最新的是2.0.5.RELEASE版本。如果编译报错,以下信息或许能有所帮助:

(1)

解决方案:spring-amqp版本改为2.0.5.RELEASE。

(2)

解决方案:spring-context版本改为5.0.7.RELEASE。

(3)

解决方案:spring-core版本改为5.0.7.RELEASE。

(4)

解决方案:spring-beans版本改为5.0.7.RELEASE。

(5)

解决方案:spring-aop版本改为5.0.7.RELEASE。

总之,需要5.0.7.RELEASE版本的spring,及相匹配版本的amqp-client。

后面所讲的这些bean配置,spring-amqp中都有默认配置,如果不需要修改默认配置,则不用人为配置这些bean。后面这些配置也没有涉及到所有的属性。

这里的ConnectionFactory指的是spring-rabbit包下面的ConnectionFactory接口,不是amqp-client包下面的ConnectionFactory类。

上面这个bean是spring-amqp的核心,不论是发送消息还是接收消息都需要这个bean,下面描述一下里面这些配置的含义。

setAddresses :设置了rabbitmq的地址、端口,集群部署的情况下可填写多个,“,”分隔。

setUsername :设置rabbitmq的用户名。

setPassword :设置rabbitmq的用户密码。

setVirtualHost :设置virtualHost。

setCacheMode :设置缓存模式,共有两种, CHANNEL 和 CONNECTION 模式。

CHANNEL 模式,程序运行期间ConnectionFactory会维护着一个Connection,所有的操作都会使用这个Connection,但一个Connection中可以有多个Channel,操作rabbitmq之前都必须先获取到一个Channel,否则就会阻塞(可以通过setChannelCheckoutTimeout()设置等待时间),这些Channel会被缓存(缓存的数量可以通过setChannelCacheSize()设置);

CONNECTION 模式,这个模式下允许创建多个Connection,会缓存一定数量的Connection,每个Connection中同样会缓存一些Channel,除了可以有多个Connection,其它都跟CHANNEL模式一样。

这里的Connection和Channel是spring-amqp中的概念,并非rabbitmq中的概念,官方文档对Connection和Channel有这样的描述:

关于 CONNECTION 模式中,可以存在多个Connection的使用场景,官方文档的描述:

setChannelCacheSize :设置每个Connection中(注意是每个Connection)可以缓存的Channel数量,注意只是缓存的Channel数量,不是Channel的数量上限,操作rabbitmq之前(send/receive message等)要先获取到一个Channel,获取Channel时会先从缓存中找闲置的Channel,如果没有则创建新的Channel,当Channel数量大于缓存数量时,多出来没法放进缓存的会被关闭。

注意,改变这个值不会影响已经存在的Connection,只影响之后创建的Connection。

setChannelCheckoutTimeout :当这个值大于0时, channelCacheSize 不仅是缓存数量,同时也会变成数量上限,从缓存获取不到可用的Channel时,不会创建新的Channel,会等待这个值设置的毫秒数,到时间仍然获取不到可用的Channel会抛出AmqpTimeoutException异常。

同时,在 CONNECTION 模式,这个值也会影响获取Connection的等待时间,超时获取不到Connection也会抛出AmqpTimeoutException异常。

setPublisherReturns、setPublisherConfirms :procer端的消息确认机制(confirm和return),设为true后开启相应的机制,后文详述。

官方文档描述publisherReturns设为true打开return机制,publisherComfirms设为true打开confirm机制,但测试结果(2.0.5.RELEASE版本)是,任意一个设为true,两个都会打开。

addConnectionListener、addChannelListener、setRecoveryListener :添加或设置相应的Listener,后文详述。

setConnectionCacheSize :仅在 CONNECTION 模式使用,设置Connection的缓存数量。

setConnectionLimit :仅在 CONNECTION 模式使用,设置Connection的数量上限。

上面的bean配置,除了需要注入的几个listener bean以外,其它设置的都是其默认值(2.0.5.RELEASE版本),后面的bean示例配置也是一样,部分属性不同版本的默认值可能有所不同。

一般不用配置这个bean,这里简单提一下。

这个ConnectionFactory是rabbit api中的ConnectionFactory类,这里面是连接rabbitmq节点的Connection配置。

如果想修改这些配置,可以按如下方式配置:

consumer端如果通过@RabbitListener注解的方式接收消息,不需要这个bean。

不建议直接通过ConnectionFactory获取Channel操作rabbitmq,建议通过amqpTemplate操作。

setConnectionFactory :设置spring-amqp的ConnectionFactory。

setRetryTemplate :设置重试机制,详情见后文。

setMessageConverter :设置MessageConverter,用于java对象与Message对象(实际发送和接收的消息对象)之间的相互转换,详情见后文。

setChannelTransacted :打开或关闭Channel的事务,关于amqp的事务后文描述。

setReturnCallback、setConfirmCallback :return和confirm机制的回调接口,后文详述。

setMandatory :设为true使ReturnCallback生效。

这个bean仅在consumer端通过@RabbitListener注解的方式接收消息时使用,每一个@RabbitListener注解的方法都会由这个创建一个MessageListenerContainer,负责接收消息。

setConnectionFactory :设置spring-amqp的ConnectionFactory。

setMessageConverter :对于consumer端,MessageConverter也可以在这里配置。

setAcknowledgeMode :设置consumer端的应答模式,共有三种:NONE、AUTO、MANUAL。

NONE,无应答,这种模式下rabbitmq默认consumer能正确处理所有发出的消息,所以不管消息有没有被consumer收到,有没有正确处理都不会恢复;

AUTO,由Container自动应答,正确处理发出ack信息,处理失败发出nack信息,rabbitmq发出消息后将会等待consumer端的应答,只有收到ack确认信息才会把消息清除掉,收到nack信息的处理办法由setDefaultRequeueRejected()方法设置,所以在这种模式下,发生错误的消息是可以恢复的。

MANUAL,基本同AUTO模式,区别是需要人为调用方法给应答。

setConcurrentConsumers :设置每个MessageListenerContainer将会创建的Consumer的最小数量,默认是1个。

setMaxConcurrentConsumers :设置每个MessageListenerContainer将会创建的Consumer的最大数量,默认等于最小数量。

setPrefetchCount :设置每次请求发送给每个Consumer的消息数量。

setChannelTransacted :设置Channel的事务。

setTxSize :设置事务当中可以处理的消息数量。

setDefaultRequeueRejected :设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃。

setErrorHandler :实现ErrorHandler接口设置进去,所有未catch的异常都会由ErrorHandler处理。

AmqpTamplate里面有下面几个方法可以向queue发送消息:

这里,exchange必须存在,否则消息发不出去,会看到错误日志,但不影响程序运行:

Message是org.springframework.amqp.core.Message类,spring-amqp发送和接收的都是这个Message。

从Message类源码可以看到消息内容放在byte[]里面,MessageProperties对象包含了非常多的一些其它信息,如Header、exchange、routing key等。

这种方式,需要将消息内容(String,或其它Object)转换为byte[],示例:

也可以直接调用下面几个方法,Object将会自动转为Message对象发送:

有两种方法接收消息:

1.polling consumer,轮询调用方法一次获取一条;

2.asynchronous consumer,listener异步接收消息。

polling consumer

直接通过AmqpTemplate的方法从queue获取消息,有如下方法:

如果queue里面没有消息,会立刻返回null;传入timeoutMillis参数后可阻塞等待一段时间。

如果想直接从queue获取想要的java对象,可调用下面这一组方法:

后面4个方法是带泛型的,示例如下:

使用这四个方法需要配置org.springframework.amqp.support.converter.SmartMessageConverter,这是一个接口,Jackson2JsonMessageConverter已经实现了这个接口,所以只要将Jackson2JsonMessageConverter设置到RabbitTemplate中即可。

asynchronous consumer

有多种方式可以实现,详情参考官方文档。

最简单的实现方式是@RabbitListener注解,示例:

这里接收消息的对象用的是Message,也可以是自定义的java对象,但调用Converter转换失败会报错。

注解上指定的queue必须是已经存在并且绑定到某个exchange的,否则会报错:

如果在@RabbitListener注解中指明binding信息,就能自动创建queue、exchange并建立binding关系。

direct和topic类型的exchange需要routingKey,示例:

fanout类型的exchange,示例:

2.0版本之后,可以指定多个routingKey,示例:

并且支持arguments属性,可用于headers类型的exchange,示例:

@Queue有两个参数exclusive和autoDelete顺便解释一下:

exclusive,排他队列,只对创建这个queue的Connection可见,Connection关闭queue删除;

autoDelete,没有consumer对这个queue消费时删除。

对于这两种队列,rable=true是不起作用的。

另外,如果注解申明的queue和exchange及binding关系都已经存在,但与已存在的设置不同,比如,已存在的exchange的是direct类型,这里尝试改为fanout类型,结果是不会有任何影响,不论是修改或者新增参数都不会生效。

如果queue存在,exchange存在,但没有binding,那么程序启动后会自动建立起binding关系。

‘捌’ linux下安装rabbitmq报错

1.建议使用rpm包安装
2.相对来说比源码编译方便
3.你源码编译安装,自己没注意或者没加载库,报错了自己都不知道
4.希望可以帮助你,请采纳,谢谢

‘玖’ 如何在同一台服务器上安装两个RabbitMQ

源码编译方式RabbitMQ,配置不同端口即可。

‘拾’ RabbitMQ消息过滤的一个思路

生产者 Procer 向 一个 队列发送消息,并且为消息打上不同的 Tag。假设这个队列有 3 个消费者:Consumer #[1:3],Consumer #1 只想消费 tag1 标记的消息,Consumer #2 只想消费 tag2 标记的消息,Consumer #3 只想消费 tag3 标记的消息。

生产者 publish 消息时,将 Tag 保存在 Map<String, Object> 类型的 header 字段,作为构建 AMQP.BasicProperties 参数

消费者如何告知 Broker 只消费特定 Tag?

假设 Consumer #1 只希望消费带 tag1 标记的消息,那么 Consumer #1 可以在向 Broker 请求 Basic.Consume 指令时,捎带自己期望的 Tag 字符串。Client 在具体生成 consumerTag 时可以用 Tag 关键字加上随机字符串(避免 consumerTag 重复):

消费者通过 Basic.Consume 指令来监听队列的消息,这些消费者信息服务端是如何存储的?

保存在队列主进程(Pid)的 state 中(具体调试可以通过 sys:get_state(Pid) )

并且队列进程在初始化时,会进行 consumers 初始化:

consumers 字段实际由 priority_queue:new() 初始化。当有新的 consumer 注册到队列进程,那么会调用 rabbit_queue_consumers 模块的 add_consumer 方法来向 priority_queue 添加一个元素;同理当有 consumer下线时,最终也会调用该模块的 remove_consumer 方法。 priority_queue 完整实现见 附二

Broker 向 Consumer 投递消息时,底层是通过 rabbit_amqqueue_process 调用 rabbit_queue_consumers 模块的 deliver 方法。默认采用

从 priority_queue 中获取一个 QEntry( {ChPid, Consumer} ),然后通过 FetchFun 从队列中获取消息,发送到 ChPid(Channel 进程)

在 consumers 不为空的情况下,通过 FetchFun 获取消息,此时可以获取该消息的 header,取出 Tag 值(如果消息打了 Tag 标记),然后通过 priority_queue 的 filter/2 方法

在 Pred 实现中,我们可以判断当前消息 Tag 值是否被包含在 consumerTag 中,从而可以过滤出消费特定 tag 的consumers,最后向这些 consumers 中的一个发送 Message 消息。

附一 (队列进程 state 中的 consumers 信息例子)

附二 (priority_queue 模块实现
rabbit_common )

注 :上述思路建议在测试环境测试,考虑到有可能出现的性能问题,作为一个调研也会有很多工作要做,整个过程会涉及 RabbitMQ 服务端源码改造、编译、打包( rabbitmq-public-umbrella )以及客户端的相关改造,如果能实际尝试下,也会有不小的收获。

热点内容
新名图配置怎么样 发布:2024-05-19 09:31:30 浏览:94
php获取子节点 发布:2024-05-19 09:21:18 浏览:160
php生成html 发布:2024-05-19 09:20:24 浏览:795
keil编译步骤 发布:2024-05-19 08:58:12 浏览:702
ipad有哪些好用的c语言编译器 发布:2024-05-19 08:41:56 浏览:767
征途手游版脚本 发布:2024-05-19 08:38:11 浏览:165
安卓咪咕音乐怎么录制视频 发布:2024-05-19 07:56:06 浏览:838
如何搞出超大声的听声辨位安卓版 发布:2024-05-19 07:46:21 浏览:927
linux安全模式 发布:2024-05-19 07:27:25 浏览:176
为什么安卓手机安装不了cpk 发布:2024-05-19 07:22:21 浏览:313