- AMQP(Advanced Message Queue Protocol) + elang으로 구성.
클라이언트들에서 메시지를 받아서 분배하는 역할
* 나의 경우는 웹으로 모니터링 할 수 있게끔 c#으로 client를 구축
1. 홈페이지에서 rabbitmq-dotnet-client-3.3.4-dotnet-3.0.zip 을 받아, RabbitMQ.Client.dll 을 참조하여 RabbitMQ 사용할 준비를 끝낸다.
2. 이 토끼씨는... 예제가 잘 되어 있다. 언어별로... ㅎ 여기를 차근차근 따라하면 okay! (Tutorials : http://www.rabbitmq.com/getstarted.html)
또한, pdf도 제공하고 있으니,, 자세한것은 pdf 참고!! ( 나의 경우는 //2.8. Retrieving messages by subscription used. )
3. 기본 알고리즘
var factory = new
ConnectionFactory()
{
UserName = "username",
Password = "password",
Protocol = Protocols.FromEnvironment(),
Port = AmqpTcpEndpoint.UseDefaultPort,
//factory.Uri = "amqp://user:pass@hostName:port/vhost";
Uri = "amqp://username:password@uri"
};
public
class
ConnectionFactory
{
public
const
ushort DefaultChannelMax = 0;
public
const
int DefaultConnectionTimeout = 30000;
public
const
uint DefaultFrameMax = 0;
public
const
ushort DefaultHeartbeat = 0;
public
const
string DefaultPass = "guest";
public
const
string DefaultUser = "guest";
public
const
string DefaultVHost = "/";
public
AuthMechanismFactory[] AuthMechanisms;
public
IDictionary<string, object> ClientProperties;
public
static
AuthMechanismFactory[] DefaultAuthMechanisms;
public
string HostName;
public
string Password;
public
int Port;
public
IProtocol Protocol;
public
ushort RequestedChannelMax;
public
int RequestedConnectionTimeout;
public
uint RequestedFrameMax;
public
ushort RequestedHeartbeat;
public
ConnectionFactory.ObtainSocket SocketFactory;
public
SslOption Ssl;
public
string UserName;
public
string VirtualHost;
public ConnectionFactory();
public
AmqpTcpEndpoint Endpoint { get; set; }
public
string Uri { set; }
public
Uri uri { set; }
public
AuthMechanismFactory AuthMechanismFactory(string[] mechs);
public
virtual
IConnection CreateConnection();
public
virtual
IConnection CreateConnection(int maxRedirects);
protected
virtual
IConnection CreateConnection(int maxRedirects, IDictionary<AmqpTcpEndpoint, int> connectionAttempts, IDictionary<AmqpTcpEndpoint, Exception> connectionErrors, params
AmqpTcpEndpoint[] endpoints);
public
static
TcpClient DefaultSocketFactory(AddressFamily addressFamily);
protected
virtual
IConnection FollowRedirectChain(int maxRedirects, IDictionary<AmqpTcpEndpoint, int> connectionAttempts, IDictionary<AmqpTcpEndpoint, Exception> connectionErrors, ref
AmqpTcpEndpoint[] mostRecentKnownHosts, AmqpTcpEndpoint endpoint);
public
delegate
TcpClient
ObtainSocket(AddressFamily addressFamily);
}
IConnection conn = factory.CreateConnection();
public
interface
IConnection : IDisposable
{
bool AutoClose { get; set; }
ushort ChannelMax { get; }
IDictionary<string, object> ClientProperties { get; }
ShutdownEventArgs CloseReason { get; }
AmqpTcpEndpoint Endpoint { get; }
uint FrameMax { get; }
ushort Heartbeat { get; }
bool IsOpen { get; }
AmqpTcpEndpoint[] KnownHosts { get; }
IProtocol Protocol { get; }
IDictionary<string, object> ServerProperties { get; }
IList<ShutdownReportEntry> ShutdownReport { get; }
event
CallbackExceptionEventHandler CallbackException;
event
ConnectionBlockedEventHandler ConnectionBlocked;
event
ConnectionShutdownEventHandler ConnectionShutdown;
event
ConnectionUnblockedEventHandler ConnectionUnblocked;
void Abort();
void Abort(int timeout);
void Abort(ushort reasonCode, string reasonText);
void Abort(ushort reasonCode, string reasonText, int timeout);
void Close();
void Close(int timeout);
void Close(ushort reasonCode, string reasonText);
void Close(ushort reasonCode, string reasonText, int timeout);
IModel CreateModel();
void HandleConnectionBlocked(string reason);
void HandleConnectionUnblocked();
}
IModel channel = conn.CreateModel();
channel.QueueDeclare(inputQueueName, durable, autoDelete, autoDelete, arguments);
public
interface
IModel : IDisposable
{
ShutdownEventArgs CloseReason { get; }
IBasicConsumer DefaultConsumer { get; set; }
bool IsClosed { get; }
bool IsOpen { get; }
ulong NextPublishSeqNo { get; }
event
BasicAckEventHandler BasicAcks;
event
BasicNackEventHandler BasicNacks;
event
BasicRecoverOkEventHandler BasicRecoverOk;
event
BasicReturnEventHandler BasicReturn;
event
CallbackExceptionEventHandler CallbackException;
event
FlowControlEventHandler FlowControl;
event
ModelShutdownEventHandler ModelShutdown;
[AmqpMethodDoNotImplement("")]
void Abort();
[AmqpMethodDoNotImplement("")]
void Abort(ushort replyCode, string replyText);
void BasicAck(ulong deliveryTag, bool multiple);
[AmqpMethodDoNotImplement("")]
void BasicCancel(string consumerTag);
[AmqpMethodDoNotImplement("")]
string BasicConsume(string queue, bool noAck, IBasicConsumer consumer);
[AmqpMethodDoNotImplement("")]
string BasicConsume(string queue, bool noAck, string consumerTag, IBasicConsumer consumer);
[AmqpMethodDoNotImplement("")]
string BasicConsume(string queue, bool noAck, string consumerTag, IDictionary<string, object> arguments, IBasicConsumer consumer);
[AmqpMethodDoNotImplement("")]
string BasicConsume(string queue, bool noAck, string consumerTag, bool noLocal, bool exclusive, IDictionary<string, object> arguments, IBasicConsumer consumer);
[AmqpMethodDoNotImplement("")]
BasicGetResult BasicGet(string queue, bool noAck);
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
void BasicNack(ulong deliveryTag, bool multiple, bool requeue);
[AmqpMethodDoNotImplement("")]
void BasicPublish(PublicationAddress addr, IBasicProperties basicProperties, byte[] body);
[AmqpMethodDoNotImplement("")]
void BasicPublish(string exchange, string routingKey, IBasicProperties basicProperties, byte[] body);
[AmqpMethodDoNotImplement("")]
void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, byte[] body);
[AmqpMethodDoNotImplement("")]
void BasicPublish(string exchange, string routingKey, bool mandatory, bool immediate, IBasicProperties basicProperties, byte[] body);
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
[AmqpMethodDoNotImplement("")]
void BasicRecover(bool requeue);
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
void BasicRecoverAsync(bool requeue);
void BasicReject(ulong deliveryTag, bool requeue);
[AmqpMethodDoNotImplement("")]
void Close();
[AmqpMethodDoNotImplement("")]
void Close(ushort replyCode, string replyText);
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
[AmqpMethodDoNotImplement("")]
void ConfirmSelect();
[AmqpContentHeaderFactory("basic")]
IBasicProperties CreateBasicProperties();
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9_1")]
[AmqpContentHeaderFactory("file")]
IFileProperties CreateFileProperties();
[AmqpContentHeaderFactory("stream")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9_1")]
IStreamProperties CreateStreamProperties();
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9_1")]
void DtxSelect();
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9_1")]
void DtxStart(string dtxIdentifier);
[AmqpMethodDoNotImplement("")]
void ExchangeBind(string destination, string source, string routingKey);
[AmqpMethodDoNotImplement("")]
void ExchangeBind(string destination, string source, string routingKey, IDictionary<string, object> arguments);
[AmqpMethodDoNotImplement("")]
void ExchangeDeclare(string exchange, string type);
[AmqpMethodDoNotImplement("")]
void ExchangeDeclare(string exchange, string type, bool durable);
[AmqpMethodDoNotImplement("")]
void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);
[AmqpMethodDoNotImplement("")]
void ExchangeDeclarePassive(string exchange);
[AmqpMethodDoNotImplement("")]
void ExchangeDelete(string exchange);
[AmqpMethodDoNotImplement("")]
void ExchangeDelete(string exchange, bool ifUnused);
[AmqpMethodDoNotImplement("")]
void ExchangeUnbind(string destination, string source, string routingKey);
[AmqpMethodDoNotImplement("")]
void ExchangeUnbind(string destination, string source, string routingKey, IDictionary<string, object> arguments);
[AmqpMethodDoNotImplement("")]
void QueueBind(string queue, string exchange, string routingKey);
[AmqpMethodDoNotImplement("")]
void QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);
[AmqpMethodDoNotImplement("")]
QueueDeclareOk QueueDeclare();
[AmqpMethodDoNotImplement("")]
QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);
[AmqpMethodDoNotImplement("")]
QueueDeclareOk QueueDeclarePassive(string queue);
[AmqpMethodDoNotImplement("")]
uint QueueDelete(string queue);
[AmqpMethodDoNotImplement("")]
uint QueueDelete(string queue, bool ifUnused, bool ifEmpty);
[AmqpMethodDoNotImplement("")]
uint QueuePurge(string queue);
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
void QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);
void TxCommit();
void TxRollback();
void TxSelect();
[AmqpMethodDoNotImplement("")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
bool WaitForConfirms();
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
[AmqpMethodDoNotImplement("")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
bool WaitForConfirms(TimeSpan timeout, out
bool timedOut);
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
[AmqpMethodDoNotImplement("")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
void WaitForConfirmsOrDie();
[AmqpMethodDoNotImplement("")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
void WaitForConfirmsOrDie(TimeSpan timeout);
}