當前位置:首頁 » 操作系統 » rabbitmq源碼

rabbitmq源碼

發布時間: 2022-11-29 14:15:32

『壹』 net開源項目整理

整理一些平時收藏和應用的開源代碼,方便自己學習和查閱

1.應用

nopcommerce ,開源電商網站,開發環境asp.net mvc(未支持.net core),使用技術(autofac,ef,頁面插件等)

https://github.com/nopSolutions/nopCommerce

OrchardCMS ,內容管理網站

https://github.com/OrchardCMS/Orchard(.net版本)

https://github.com/OrchardCMS/Orchard2(.net core版本)

ABP(aspnetboilerplate) ,提供一系列工具用於web應用創建,支持 ASP.NET Core, ASP.NET MVC & Web API,也提供了web應用的模板

https://github.com/aspnetboilerplate/aspnetboilerplate(.net core 版本,tag分支有支持.net版本的)

IdentityServer ,用戶授權網站(支持openid和OAuth 2.0),可用於單點登錄和第三方授權等

https://github.com/IdentityServer/IdentityServer3(.net版本)

https://github.com/IdentityServer/IdentityServer4(.net core版本)

eShopOnContainers 微軟提供的微服務實例

https://github.com/dotnet-architecture/eShopOnContainers

PetShop 三層架構經典例子,用於新手學習,不過aspx有點過時了

https://github.com/songhhwd01/PetShop

BlogEngine.NET 博客網站,也是aspx

https://github.com/rxtur/BlogEngine.NET

2.組件

Lucene.Net 全文檢索開發組件

https://github.com/apache/lucenenet

ServiceStack 半開源,用於創建web服務

https://github.com/ServiceStack/ServiceStack

MassTransit 可用於創建基於消息的服務和應用,依賴於RabbitMQ

https://github.com/MassTransit/MassTransit

stateless 簡單的工作流開發組件,不支持在線定製工作流

https://github.com/dotnet-state-machine/stateless

Hangfire 任務調度開發利器

https://github.com/HangfireIO/Hangfire

Jwt.Net 用於生成JWT (JSON Web Token) 和JWT校驗

https://github.com/jwt-dotnet/jwt

npoi 支持office文件的讀寫

https://github.com/tonyqus/npoi

StackExchange.Redis Redis的.net客戶端

https://github.com/StackExchange/StackExchange.Redis

CacheManager 用於緩存的管理,支持Redis.Memcached,couchbase等

https://github.com/MichaCo/CacheManager

Autofac Ioc組件,用於依賴注入

https://github.com/autofac/Autofac

LightGBM 用於機器學習

https://github.com/Microsoft/LightGBM

3.框架

asp.net mvc

https://github.com/aspnet/Mvc

Nancy 類似asp.net mvc,web開發框架

https://github.com/NancyFx/Nancy

4.其他

dotnet core 主頁 ,提供dotnet core相關知識的索引和例子,方便快速入門

https://github.com/dotnet/core

.net源碼

https://github.com/Microsoft/referencesource

『貳』 在linux下安裝rabbitmq失敗怎麼解決

RabbitMQ 是由 LShift 提供的一個 Advanced Message Queuing Protocol (AMQP) 的開源實現,由以高性能、健壯以及可伸縮性出名的 Erlang 寫成,因此也是繼承了這些優點。
AMQP 里主要要說兩個組件:Exchange 和 Queue (在 AMQP 1.0 里還會有變動),如下圖所示,綠色的 X 就是 Exchange ,紅色的是 Queue ,這兩者都在 Server 端,又稱作 Broker ,這部分是 RabbitMQ 實現的,而藍色的則是客戶端,通常有 Procer 和 Consumer 兩種類型:

1:mq的安裝需要Erlang,所以首先下載Erlang,下載地址:http://www.erlang.org/download.html直接下載源碼,編譯安裝即可。
將下載好的tar包解壓編譯安裝,如下命令:
tar -zxvf otp_src_R16B03-1.tar.gz

cd otp_src_R16B03-1
./configure && make install

安裝過程中可能出現如下錯誤:
configure:error:
No curses library functions found
configure: error:/bin/sh'/home/niewf/software/erlang_R13B01/erts/configure'
failed for erts

解決方法:
yum list|grep ncurses
yum -y install ncurses-devel
yum install ncurses-devel

或者直接下載ncurses包編譯安裝。
下載地址:http://download.chinaunix.net/download/0008000/7242.shtml
tar zxvf ncurses.tar.gz #解壓縮並且釋放 文件包
cd ncurses #進入解壓縮的目錄(注意版本)
./configure #按照你的系統環境製作安裝配置文件
make #編譯源代碼並且編譯NCURSES庫
su root #切換到root用戶環境
make install #安裝編譯好的NCURSES庫

完成後繼續返回上一步操作。

2:安裝python,如果系統中python版本低於2.5的話需要升級python到2.6以上,具體可參考:http://gavinshaw.blog.51cto.com/385947/610585

3:安裝simplejson,直接下載simplejson源碼包編譯安裝即可,下載地址:https://pypi.python.org/pypi/simplejson/。
下載simplejson源碼包後,運行python setup.py install即可完成安裝。

4:安裝rabbit mq,下載地址:https://www.rabbitmq.com/install-generic-unix.html
下載後放入相應目錄解壓,進入%RABBITMQ_HOME%/sbin目錄下運行:./rabbitmq-server start即可啟動mq。
如果遇到如下錯誤,則參考http://leeon.me/a/rabbitmq-start-fail-note解決方案
ERROR: epmd error for host "xxx": address (cannot connect to host/port)
到此mq已經安裝完成。
在%RABBITMQ_HOME%/sbin目錄運行./rabbitmqctl status可查看當前mq狀態。
同時mq也提供了界面查看當前mq狀態,但是需要啟用該插件功能,運行如下命令:
rabbitmq-plugins enable rabbitmq_management,然後在瀏覽器中輸入:http://host-name:15672/#/即可訪問,頁面結果如下:

『叄』 [Android源碼分析] - 非同步通信Handler機制

一、問題:在Android啟動後會在新進程里創建一個主線程,也叫UI線程( 非線程安全 )這個線程主要負責監聽屏幕點擊事件與界面繪制。當Application需要進行耗時操作如網路請求等,如直接在主線程進行容易發生ANR錯誤。所以會創建子線程來執行耗時任務,當子線程執行完畢需要通知UI線程並修改界面時,不可以直接在子線程修改UI,怎麼辦?

解決方法:Message Queue機制可以實現子線程與UI線程的通信。

該機制包括Handler、Message Queue、Looper。Handler可以把消息/ Runnable對象 發給Looper,由它把消息放入所屬線程的消息隊列中,然後Looper又會自動把消息隊列里的消息/Runnable對象 廣播 到所屬線程里的Handler,由Handler處理接收到的消息或Runnable對象。

1、Handler

每次創建Handler對象時,它會自動綁定到創建它的線程上。如果是主線程則默認包含一個Message Queue,否則需要自己創建一個消息隊列來存儲

Handler是多個線程通信的信使。比如在線程A中創建AHandler,給它綁定一個ALooper,同時創建屬於A的消息隊列AMessageQueue。然後在線程B中使用AHandler發送消息給ALooper,ALooper會把消息存入到AMessageQueue,然後再把AMessageQueue廣播給A線程里的AHandler,它接收到消息會進行處理。從而實現通信。

2、Message Queue

在主線程里默認包含了一個消息隊列不需要手動創建。在子線程里,使用Looper.prepare()方法後,會先檢查子線程是否已有一個looper對象,如果有則無法創建,因為每個線程只能擁有一個消息隊列。沒有的話就為子線程創建一個消息隊列。

Handler類包含Looper指針和MessageQueue指針,而Looper里包含實際MessageQueue與當前線程指針。

下面分別就UI線程和worker線程講解handler創建過程:

首先,創建handler時,會自動檢查當前線程是否包含looper對象,如果包含,則將handler內的消息隊列指向looper內部的消息隊列,否則,拋出異常請求執行looper.prepare()方法。

 - 在 UI線程 中,系統自動創建了Looper 對象,所以,直接new一個handler即可使用該機制;

- 在 worker線程 中,如果直接創建handler會拋出運行時異常-即通過查『線程-value』映射表發現當前線程無looper對象。所以需要先調用Looper.prepare()方法。在prepare方法里,利用ThreadLocal<Looper>對象為當前線程創建一個Looper(利用了一個Values類,即一個Map映射表,專為thread存儲value,此處為當前thread存儲一個looper對象)。然後繼續創建handler, 讓handler內部的消息隊列指向該looper的消息隊列(這個很重要,讓handler指向looper里的消息隊列,即二者共享同一個消息隊列,然後handler向這個消息隊列發送消息,looper從這個消息隊列獲取消息) 。然後looper循環消息隊列即可。當獲取到message消息,會找出message對象里的target,即原始發送handler,從而回調handler的handleMessage() 方法進行處理。

 - handler與looper共享消息隊列 ,所以handler發送消息只要入列,looper直接取消息即可。

 - 線程與looper映射表 :一個線程最多可以映射一個looper對象。通過查表可知當前線程是否包含looper,如果已經包含則不再創建新looper。

5、基於這樣的機制是怎樣實現線程隔離的,即在線程中通信呢。 

核心在於 每一個線程擁有自己的handler、message queue、looper體系 。而 每個線程的Handler是公開 的。B線程可以調用A線程的handler發送消息到A的共享消息隊列去,然後A的looper會自動從共享消息隊列取出消息進行處理。反之一樣。

二、上面是基於子線程中利用主線程提供的Handler發送消息出去,然後主線程的Looper從消息隊列中獲取並處理。那麼還有另外兩種情況:

1、主線程發送消息到子線程中;

採用的方法和前面類似。要在子線程中實例化AHandler並設定處理消息的方法,同時由於子線程沒有消息隊列和Looper的輪詢,所以要加上Looper.prepare(),Looper.loop()分別創建消息隊列和開啟輪詢。然後在主線程中使用該AHandler去發送消息即可。

2、子線程A與子線程B之間的通信。

1、 Handler為什麼能夠實現不同線程的通信?核心點在哪?

不同線程之間,每個線程擁有自己的Handler、消息隊列和Looper。Handler是公共的,線程可以通過使用目標線程的Handler對象來發送消息,這個消息會自動發送到所屬線程的消息隊列中去,線程自帶的Looper對象會不斷循環從裡面取出消息並把消息發送給Handler,回調自身Handler的handlerMessage方法,從而實現了消息的線程間傳遞。

2、 Handler的核心是一種事件激活式(類似傳遞一個中斷)的還是主要是用於傳遞大量數據的?重點在Message的內容,偏向於數據傳輸還是事件傳輸。

目前的理解,它所依賴的是消息隊列,發送的自然是消息,即類似事件中斷。

0、 Android消息處理機制(Handler、Looper、MessageQueue與Message)

1、 Handler、Looper源碼閱讀

2、 Android非同步消息處理機制完全解析,帶你從源碼的角度徹底理解

謝謝!

wingjay

![](https://avatars0.githubusercontent.com/u/9619875?v=3&s=460)

『肆』 Spring boot集成RabbitMQ中Exchange與Queue參數詳解

關於springBoot整合RabbitMQ及基本使用可以參考: springBoot整合RabbitMQ及基本使用

設置exchange為持久化之後,並不能保證消息不丟失,因為此時發送往exchange中的消息並不是持久化的,需要配置delivery_mode=2指明message為持久的。

在FanoutExchange中,會將發送的消息封裝為 Message 對象,該對象中有一個 MessageProperties 對象,用來指定消息的屬性,而 MessageProperties 中默認的 deliveryMode 屬性為 MessageDeliveryMode.PERSISTENT ,查看源碼得知,MessageDeliveryMode.PERSISTENT = 2,所以 FanoutExchange 發送的消息默認就是持久化的。

『伍』 RabbitMQ筆記十:MessageConverter詳解

org.springframework.amqp.support.converter.MessageConverter

Message toMessage(Object object, MessageProperties messageProperties);
java對象和屬性對象轉換成Message對象。

Object fromMessage(Message message) throws MessageConversionException;
將消息對象轉換成java對象。

定義Config類

MessageListenerAdapter中定義的消息轉換器,消費端接收的消息就從Message類型轉換成了String類型

消費者處理消息的Handler

啟動類

啟動應用類,發送消息到 hao.miao.order 隊列,控制台列印:

從控制台列印我們知道了在消費者處理消息之前會進行消息類型轉換,調用 TestMessageConverter 的 fromMessage 方法,然後執行消息處理器的 onMessage 方法,方法參數就是 String 類型。

自定義一個MyBody類型,將消息從Message轉換成MyBody類型

然後修改 TestMessageConverter 的 fromMessage 方法,返回了 MyBody 類型,那麼消息處理器的消費方法也是MyBody參數的消費方法

此時的消息處理器,處理器中的方法的入參就是MyBody類型了,

此時控制台列印:

我們還測試如下如果不使用自定義的 Converter ,那麼當消息的屬性中含有屬性content_type的值為text,那麼默認的轉換成的java類型就是String類型,如果不指定那麼默認的轉換類型就是byte[]

我們跟進去 MessageListenerAdapte r的 setMessageConverter 方法,

我們發現默認的 MessageConverter 是 SimpleMessageConverter ,我們進入 SimpleMessageConverter 類中看其默認的轉換邏輯

源碼分析總結:
1. MessageConverter 可以把 java 對象轉換成 Message 對象,也可以把 Message 對象轉換成 java 對象
2. MessageListenerAdapter 內部通過 MessageConverter 把 Message 轉換成java對象,然後找到相應的處理方法,參數為轉換成的java對象。
3. SimpleMessageConverter 處理邏輯:
如果 content_type 是以text開頭,則把消息轉換成 String 類型
如果 content_type的 值是 application/x-java-serialized-object 則把消息序列化為java對象,否則,把消息轉換成位元組數組。

『陸』 如何正確生成RabbitMQ-C客戶端庫

1. 下載 rabbitmq-c-master源碼
2. 下載 rabbitmq-codegen 源碼
3. 將 rabbitmq-codegen 中的內容拷貝到 rabbitmq-c-master 中的 codegen 目錄下(如果沒有該目錄請自行創建)
4.打開openssl目錄,修改ms文件夾下的ntdll.mak文件,將CFLAG的/WX選項去掉
5.執行Configure 運行perl Configure VC-WIN32
6.運行 ms\do_ms
7.運行 nmake -f ms\ntdll.mak 執行make進行編譯.....nmake -f ms\ntdll.mak 命令將OpenSSL編譯成動態庫,如果想編譯成靜態庫應使用 nmake -f ms\nt.mak
8.運行 nmake -f ms\ntdll.mak test.檢查上一部編譯是否成功。正常的話會執行編譯後的測試程序
至此OpenSSL在windows下編譯完成,編譯得到的dll和lib文件位置:靜態庫libeay32.lib和ssleay32.lib位於out32文件夾下,動態庫libeay32.dll,libeay32.lib,ssleay32.dll,ssleay32.lib位於out32dll文件夾下。
9.使用VS2010編譯OpenSSL的過程記錄
10. 通過 win32 的 cmd 或者 cmake-gui 執行相應的命令進行相關文件生成。
11.生成的sln用vs2010打開 編譯 若成功後在librabbitmq文件夾debug中有rabbitmq.1.lib和rabbitmq.1.dll文件
12. 下載的 rabbit-c-master 源碼中沒有文件 amqp_framing.h 和 amqp_framing.c,這兩個文件是通過 codegen.py 和 amqp_codegen.py 產生的。
13.使用 CMake 時路徑中最好不要出現中文。

『柒』 Spring整合rabbitmq實踐(一):基礎

Spring整合rabbitmq實踐(二):擴展
Spring整合rabbitmq實踐(三):源碼

procer:消息生產者;

consumer:消息消費者;

queue:消息隊列;

exchange:接收procer發送的消息按照binding規則轉發給相應的queue;

binding:exchange與queue之間的關系;

virtualHost:每個virtualHost持有自己的exchange、queue、binding,用戶只能在virtualHost粒度控制許可權。

fanout:

群發到所有綁定的queue;

direct:

根據routing key routing到相應的queue,routing不到任何queue的消息扔掉;可以不同的key綁到同一個queue,也可以同一個key綁到不同的queue;

topic:

類似direct,區別是routing key是由一組以「.」分隔的單片語成,可以有通配符,「*」匹配一個單詞,「#」匹配0個或多個單詞;

headers:

根據arguments來routing。

arguments為一組key-value對,任意設置。

「x-match」是一個特殊的key,值為「all」時必須匹配所有argument,值為「any」時只需匹配任意一個argument,不設置默認為「all」。

通過以下配置,可以獲得最基礎的發送消息到queue,以及從queue接收消息的功能。

這個包同時包含了一些其它的包:spring-context、spring-tx、spring-web、spring-messaging、spring-retry、spring-amqp、amqp-client,如果想單純一點,可以單獨引入。

最主要的是以下幾個包,

spring-amqp:

spring-rabbit:

amqp-client:

個人理解就是,spring-amqp是spring整合的amqp,spring-rabbit是spring整合的rabbitmq(rabbitmq是amqp的一個實現,所以可能spring-rabbit也是類似關系),amqp-client提供操作rabbitmq的java api。

目前最新的是2.0.5.RELEASE版本。如果編譯報錯,以下信息或許能有所幫助:

(1)

解決方案:spring-amqp版本改為2.0.5.RELEASE。

(2)

解決方案:spring-context版本改為5.0.7.RELEASE。

(3)

解決方案:spring-core版本改為5.0.7.RELEASE。

(4)

解決方案:spring-beans版本改為5.0.7.RELEASE。

(5)

解決方案:spring-aop版本改為5.0.7.RELEASE。

總之,需要5.0.7.RELEASE版本的spring,及相匹配版本的amqp-client。

後面所講的這些bean配置,spring-amqp中都有默認配置,如果不需要修改默認配置,則不用人為配置這些bean。後面這些配置也沒有涉及到所有的屬性。

這里的ConnectionFactory指的是spring-rabbit包下面的ConnectionFactory介面,不是amqp-client包下面的ConnectionFactory類。

上面這個bean是spring-amqp的核心,不論是發送消息還是接收消息都需要這個bean,下面描述一下裡面這些配置的含義。

setAddresses :設置了rabbitmq的地址、埠,集群部署的情況下可填寫多個,「,」分隔。

setUsername :設置rabbitmq的用戶名。

setPassword :設置rabbitmq的用戶密碼。

setVirtualHost :設置virtualHost。

setCacheMode :設置緩存模式,共有兩種, CHANNEL 和 CONNECTION 模式。

CHANNEL 模式,程序運行期間ConnectionFactory會維護著一個Connection,所有的操作都會使用這個Connection,但一個Connection中可以有多個Channel,操作rabbitmq之前都必須先獲取到一個Channel,否則就會阻塞(可以通過setChannelCheckoutTimeout()設置等待時間),這些Channel會被緩存(緩存的數量可以通過setChannelCacheSize()設置);

CONNECTION 模式,這個模式下允許創建多個Connection,會緩存一定數量的Connection,每個Connection中同樣會緩存一些Channel,除了可以有多個Connection,其它都跟CHANNEL模式一樣。

這里的Connection和Channel是spring-amqp中的概念,並非rabbitmq中的概念,官方文檔對Connection和Channel有這樣的描述:

關於 CONNECTION 模式中,可以存在多個Connection的使用場景,官方文檔的描述:

setChannelCacheSize :設置每個Connection中(注意是每個Connection)可以緩存的Channel數量,注意只是緩存的Channel數量,不是Channel的數量上限,操作rabbitmq之前(send/receive message等)要先獲取到一個Channel,獲取Channel時會先從緩存中找閑置的Channel,如果沒有則創建新的Channel,當Channel數量大於緩存數量時,多出來沒法放進緩存的會被關閉。

注意,改變這個值不會影響已經存在的Connection,隻影響之後創建的Connection。

setChannelCheckoutTimeout :當這個值大於0時, channelCacheSize 不僅是緩存數量,同時也會變成數量上限,從緩存獲取不到可用的Channel時,不會創建新的Channel,會等待這個值設置的毫秒數,到時間仍然獲取不到可用的Channel會拋出AmqpTimeoutException異常。

同時,在 CONNECTION 模式,這個值也會影響獲取Connection的等待時間,超時獲取不到Connection也會拋出AmqpTimeoutException異常。

setPublisherReturns、setPublisherConfirms :procer端的消息確認機制(confirm和return),設為true後開啟相應的機制,後文詳述。

官方文檔描述publisherReturns設為true打開return機制,publisherComfirms設為true打開confirm機制,但測試結果(2.0.5.RELEASE版本)是,任意一個設為true,兩個都會打開。

addConnectionListener、addChannelListener、setRecoveryListener :添加或設置相應的Listener,後文詳述。

setConnectionCacheSize :僅在 CONNECTION 模式使用,設置Connection的緩存數量。

setConnectionLimit :僅在 CONNECTION 模式使用,設置Connection的數量上限。

上面的bean配置,除了需要注入的幾個listener bean以外,其它設置的都是其默認值(2.0.5.RELEASE版本),後面的bean示例配置也是一樣,部分屬性不同版本的默認值可能有所不同。

一般不用配置這個bean,這里簡單提一下。

這個ConnectionFactory是rabbit api中的ConnectionFactory類,這裡面是連接rabbitmq節點的Connection配置。

如果想修改這些配置,可以按如下方式配置:

consumer端如果通過@RabbitListener註解的方式接收消息,不需要這個bean。

不建議直接通過ConnectionFactory獲取Channel操作rabbitmq,建議通過amqpTemplate操作。

setConnectionFactory :設置spring-amqp的ConnectionFactory。

setRetryTemplate :設置重試機制,詳情見後文。

setMessageConverter :設置MessageConverter,用於java對象與Message對象(實際發送和接收的消息對象)之間的相互轉換,詳情見後文。

setChannelTransacted :打開或關閉Channel的事務,關於amqp的事務後文描述。

setReturnCallback、setConfirmCallback :return和confirm機制的回調介面,後文詳述。

setMandatory :設為true使ReturnCallback生效。

這個bean僅在consumer端通過@RabbitListener註解的方式接收消息時使用,每一個@RabbitListener註解的方法都會由這個創建一個MessageListenerContainer,負責接收消息。

setConnectionFactory :設置spring-amqp的ConnectionFactory。

setMessageConverter :對於consumer端,MessageConverter也可以在這里配置。

setAcknowledgeMode :設置consumer端的應答模式,共有三種:NONE、AUTO、MANUAL。

NONE,無應答,這種模式下rabbitmq默認consumer能正確處理所有發出的消息,所以不管消息有沒有被consumer收到,有沒有正確處理都不會恢復;

AUTO,由Container自動應答,正確處理發出ack信息,處理失敗發出nack信息,rabbitmq發出消息後將會等待consumer端的應答,只有收到ack確認信息才會把消息清除掉,收到nack信息的處理辦法由setDefaultRequeueRejected()方法設置,所以在這種模式下,發生錯誤的消息是可以恢復的。

MANUAL,基本同AUTO模式,區別是需要人為調用方法給應答。

setConcurrentConsumers :設置每個MessageListenerContainer將會創建的Consumer的最小數量,默認是1個。

setMaxConcurrentConsumers :設置每個MessageListenerContainer將會創建的Consumer的最大數量,默認等於最小數量。

setPrefetchCount :設置每次請求發送給每個Consumer的消息數量。

setChannelTransacted :設置Channel的事務。

setTxSize :設置事務當中可以處理的消息數量。

setDefaultRequeueRejected :設置當rabbitmq收到nack/reject確認信息時的處理方式,設為true,扔回queue頭部,設為false,丟棄。

setErrorHandler :實現ErrorHandler介面設置進去,所有未catch的異常都會由ErrorHandler處理。

AmqpTamplate裡面有下面幾個方法可以向queue發送消息:

這里,exchange必須存在,否則消息發不出去,會看到錯誤日誌,但不影響程序運行:

Message是org.springframework.amqp.core.Message類,spring-amqp發送和接收的都是這個Message。

從Message類源碼可以看到消息內容放在byte[]裡面,MessageProperties對象包含了非常多的一些其它信息,如Header、exchange、routing key等。

這種方式,需要將消息內容(String,或其它Object)轉換為byte[],示例:

也可以直接調用下面幾個方法,Object將會自動轉為Message對象發送:

有兩種方法接收消息:

1.polling consumer,輪詢調用方法一次獲取一條;

2.asynchronous consumer,listener非同步接收消息。

polling consumer

直接通過AmqpTemplate的方法從queue獲取消息,有如下方法:

如果queue裡面沒有消息,會立刻返回null;傳入timeoutMillis參數後可阻塞等待一段時間。

如果想直接從queue獲取想要的java對象,可調用下面這一組方法:

後面4個方法是帶泛型的,示例如下:

使用這四個方法需要配置org.springframework.amqp.support.converter.SmartMessageConverter,這是一個介面,Jackson2JsonMessageConverter已經實現了這個介面,所以只要將Jackson2JsonMessageConverter設置到RabbitTemplate中即可。

asynchronous consumer

有多種方式可以實現,詳情參考官方文檔。

最簡單的實現方式是@RabbitListener註解,示例:

這里接收消息的對象用的是Message,也可以是自定義的java對象,但調用Converter轉換失敗會報錯。

註解上指定的queue必須是已經存在並且綁定到某個exchange的,否則會報錯:

如果在@RabbitListener註解中指明binding信息,就能自動創建queue、exchange並建立binding關系。

direct和topic類型的exchange需要routingKey,示例:

fanout類型的exchange,示例:

2.0版本之後,可以指定多個routingKey,示例:

並且支持arguments屬性,可用於headers類型的exchange,示例:

@Queue有兩個參數exclusive和autoDelete順便解釋一下:

exclusive,排他隊列,只對創建這個queue的Connection可見,Connection關閉queue刪除;

autoDelete,沒有consumer對這個queue消費時刪除。

對於這兩種隊列,rable=true是不起作用的。

另外,如果註解申明的queue和exchange及binding關系都已經存在,但與已存在的設置不同,比如,已存在的exchange的是direct類型,這里嘗試改為fanout類型,結果是不會有任何影響,不論是修改或者新增參數都不會生效。

如果queue存在,exchange存在,但沒有binding,那麼程序啟動後會自動建立起binding關系。

『捌』 linux下安裝rabbitmq報錯

1.建議使用rpm包安裝
2.相對來說比源碼編譯方便
3.你源碼編譯安裝,自己沒注意或者沒載入庫,報錯了自己都不知道
4.希望可以幫助你,請採納,謝謝

『玖』 如何在同一台伺服器上安裝兩個RabbitMQ

源碼編譯方式RabbitMQ,配置不同埠即可。

『拾』 RabbitMQ消息過濾的一個思路

生產者 Procer 向 一個 隊列發送消息,並且為消息打上不同的 Tag。假設這個隊列有 3 個消費者:Consumer #[1:3],Consumer #1 只想消費 tag1 標記的消息,Consumer #2 只想消費 tag2 標記的消息,Consumer #3 只想消費 tag3 標記的消息。

生產者 publish 消息時,將 Tag 保存在 Map<String, Object> 類型的 header 欄位,作為構建 AMQP.BasicProperties 參數

消費者如何告知 Broker 只消費特定 Tag?

假設 Consumer #1 只希望消費帶 tag1 標記的消息,那麼 Consumer #1 可以在向 Broker 請求 Basic.Consume 指令時,捎帶自己期望的 Tag 字元串。Client 在具體生成 consumerTag 時可以用 Tag 關鍵字加上隨機字元串(避免 consumerTag 重復):

消費者通過 Basic.Consume 指令來監聽隊列的消息,這些消費者信息服務端是如何存儲的?

保存在隊列主進程(Pid)的 state 中(具體調試可以通過 sys:get_state(Pid) )

並且隊列進程在初始化時,會進行 consumers 初始化:

consumers 欄位實際由 priority_queue:new() 初始化。當有新的 consumer 注冊到隊列進程,那麼會調用 rabbit_queue_consumers 模塊的 add_consumer 方法來向 priority_queue 添加一個元素;同理當有 consumer下線時,最終也會調用該模塊的 remove_consumer 方法。 priority_queue 完整實現見 附二

Broker 向 Consumer 投遞消息時,底層是通過 rabbit_amqqueue_process 調用 rabbit_queue_consumers 模塊的 deliver 方法。默認採用

從 priority_queue 中獲取一個 QEntry( {ChPid, Consumer} ),然後通過 FetchFun 從隊列中獲取消息,發送到 ChPid(Channel 進程)

在 consumers 不為空的情況下,通過 FetchFun 獲取消息,此時可以獲取該消息的 header,取出 Tag 值(如果消息打了 Tag 標記),然後通過 priority_queue 的 filter/2 方法

在 Pred 實現中,我們可以判斷當前消息 Tag 值是否被包含在 consumerTag 中,從而可以過濾出消費特定 tag 的consumers,最後向這些 consumers 中的一個發送 Message 消息。

附一 (隊列進程 state 中的 consumers 信息例子)

附二 (priority_queue 模塊實現
rabbit_common )

注 :上述思路建議在測試環境測試,考慮到有可能出現的性能問題,作為一個調研也會有很多工作要做,整個過程會涉及 RabbitMQ 服務端源碼改造、編譯、打包( rabbitmq-public-umbrella )以及客戶端的相關改造,如果能實際嘗試下,也會有不小的收獲。

熱點內容
加密視頻怎麼解密 發布:2024-05-17 11:02:52 瀏覽:570
柳工挖機密碼多少合適 發布:2024-05-17 11:00:40 瀏覽:187
android工程嘆號 發布:2024-05-17 10:56:21 瀏覽:480
在蘋果手機應用怎麼比安卓貴 發布:2024-05-17 10:56:20 瀏覽:547
賽歐313配置怎麼樣 發布:2024-05-17 10:43:16 瀏覽:988
c語言預算 發布:2024-05-17 10:43:16 瀏覽:492
推薦對稱加密演算法 發布:2024-05-17 10:43:15 瀏覽:822
有存儲功能計算器 發布:2024-05-17 10:42:34 瀏覽:118
小米賬號密碼保險箱在哪裡 發布:2024-05-17 10:17:00 瀏覽:752
抖音引流腳本推薦 發布:2024-05-17 10:11:16 瀏覽:724