- AMQP(Advanced Message Queue Protocol) + elang으로 구성.
- 클라이언트들에서 메시지를 받아서 분배하는 역할
* 나의 경우는 웹으로 모니터링 할 수 있게끔 c#으로 client를 구축
1. 홈페이지에서 rabbitmq-dotnet-client-3.3.4-dotnet-3.0.zip 을 받아, RabbitMQ.Client.dll 을 참조하여 RabbitMQ 사용할 준비를 끝낸다.
2. 이 토끼씨는... 예제가 잘 되어 있다. 언어별로... ㅎ 여기를 차근차근 따라하면 okay! (Tutorials : http://www.rabbitmq.com/getstarted.html)
또한, pdf도 제공하고 있으니,, 자세한것은 pdf 참고!! ( 나의 경우는 //2.8. Retrieving messages by subscription used. )
3. 기본 알고리즘
var factory = new ConnectionFactory()
{
UserName = "username",
Password = "password",
Protocol = Protocols.FromEnvironment(),
Port = AmqpTcpEndpoint.UseDefaultPort,
//factory.Uri = "amqp://user:pass@hostName:port/vhost";
Uri = "amqp://username:password@uri"
};
public class ConnectionFactory
{
public const ushort DefaultChannelMax = 0;
public const int DefaultConnectionTimeout = 30000;
public const uint DefaultFrameMax = 0;
public const ushort DefaultHeartbeat = 0;
public const string DefaultPass = "guest";
public const string DefaultUser = "guest";
public const string DefaultVHost = "/";
public AuthMechanismFactory[] AuthMechanisms;
public IDictionary<string, object> ClientProperties;
public static AuthMechanismFactory[] DefaultAuthMechanisms;
public string HostName;
public string Password;
public int Port;
public IProtocol Protocol;
public ushort RequestedChannelMax;
public int RequestedConnectionTimeout;
public uint RequestedFrameMax;
public ushort RequestedHeartbeat;
public ConnectionFactory.ObtainSocket SocketFactory;
public SslOption Ssl;
public string UserName;
public string VirtualHost;
public ConnectionFactory();
public AmqpTcpEndpoint Endpoint { get; set; }
public string Uri { set; }
public Uri uri { set; }
public AuthMechanismFactory AuthMechanismFactory(string[] mechs);
public virtual IConnection CreateConnection();
public virtual IConnection CreateConnection(int maxRedirects);
protected virtual IConnection CreateConnection(int maxRedirects, IDictionary<AmqpTcpEndpoint, int> connectionAttempts, IDictionary<AmqpTcpEndpoint, Exception> connectionErrors, params AmqpTcpEndpoint[] endpoints);
public static TcpClient DefaultSocketFactory(AddressFamily addressFamily);
protected virtual IConnection FollowRedirectChain(int maxRedirects, IDictionary<AmqpTcpEndpoint, int> connectionAttempts, IDictionary<AmqpTcpEndpoint, Exception> connectionErrors, ref AmqpTcpEndpoint[] mostRecentKnownHosts, AmqpTcpEndpoint endpoint);
public delegate TcpClient ObtainSocket(AddressFamily addressFamily);
}
IConnection conn = factory.CreateConnection();
public interface IConnection : IDisposable
{
bool AutoClose { get; set; }
ushort ChannelMax { get; }
IDictionary<string, object> ClientProperties { get; }
ShutdownEventArgs CloseReason { get; }
AmqpTcpEndpoint Endpoint { get; }
uint FrameMax { get; }
ushort Heartbeat { get; }
bool IsOpen { get; }
AmqpTcpEndpoint[] KnownHosts { get; }
IProtocol Protocol { get; }
IDictionary<string, object> ServerProperties { get; }
IList<ShutdownReportEntry> ShutdownReport { get; }
event CallbackExceptionEventHandler CallbackException;
event ConnectionBlockedEventHandler ConnectionBlocked;
event ConnectionShutdownEventHandler ConnectionShutdown;
event ConnectionUnblockedEventHandler ConnectionUnblocked;
void Abort();
void Abort(int timeout);
void Abort(ushort reasonCode, string reasonText);
void Abort(ushort reasonCode, string reasonText, int timeout);
void Close();
void Close(int timeout);
void Close(ushort reasonCode, string reasonText);
void Close(ushort reasonCode, string reasonText, int timeout);
IModel CreateModel();
void HandleConnectionBlocked(string reason);
void HandleConnectionUnblocked();
}
IModel channel = conn.CreateModel();
channel.QueueDeclare(inputQueueName, durable, autoDelete, autoDelete, arguments);
public interface IModel : IDisposable
{
ShutdownEventArgs CloseReason { get; }
IBasicConsumer DefaultConsumer { get; set; }
bool IsClosed { get; }
bool IsOpen { get; }
ulong NextPublishSeqNo { get; }
event BasicAckEventHandler BasicAcks;
event BasicNackEventHandler BasicNacks;
event BasicRecoverOkEventHandler BasicRecoverOk;
event BasicReturnEventHandler BasicReturn;
event CallbackExceptionEventHandler CallbackException;
event FlowControlEventHandler FlowControl;
event ModelShutdownEventHandler ModelShutdown;
[AmqpMethodDoNotImplement("")]
void Abort();
[AmqpMethodDoNotImplement("")]
void Abort(ushort replyCode, string replyText);
void BasicAck(ulong deliveryTag, bool multiple);
[AmqpMethodDoNotImplement("")]
void BasicCancel(string consumerTag);
[AmqpMethodDoNotImplement("")]
string BasicConsume(string queue, bool noAck, IBasicConsumer consumer);
[AmqpMethodDoNotImplement("")]
string BasicConsume(string queue, bool noAck, string consumerTag, IBasicConsumer consumer);
[AmqpMethodDoNotImplement("")]
string BasicConsume(string queue, bool noAck, string consumerTag, IDictionary<string, object> arguments, IBasicConsumer consumer);
[AmqpMethodDoNotImplement("")]
string BasicConsume(string queue, bool noAck, string consumerTag, bool noLocal, bool exclusive, IDictionary<string, object> arguments, IBasicConsumer consumer);
[AmqpMethodDoNotImplement("")]
BasicGetResult BasicGet(string queue, bool noAck);
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
void BasicNack(ulong deliveryTag, bool multiple, bool requeue);
[AmqpMethodDoNotImplement("")]
void BasicPublish(PublicationAddress addr, IBasicProperties basicProperties, byte[] body);
[AmqpMethodDoNotImplement("")]
void BasicPublish(string exchange, string routingKey, IBasicProperties basicProperties, byte[] body);
[AmqpMethodDoNotImplement("")]
void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, byte[] body);
[AmqpMethodDoNotImplement("")]
void BasicPublish(string exchange, string routingKey, bool mandatory, bool immediate, IBasicProperties basicProperties, byte[] body);
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
[AmqpMethodDoNotImplement("")]
void BasicRecover(bool requeue);
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
void BasicRecoverAsync(bool requeue);
void BasicReject(ulong deliveryTag, bool requeue);
[AmqpMethodDoNotImplement("")]
void Close();
[AmqpMethodDoNotImplement("")]
void Close(ushort replyCode, string replyText);
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
[AmqpMethodDoNotImplement("")]
void ConfirmSelect();
[AmqpContentHeaderFactory("basic")]
IBasicProperties CreateBasicProperties();
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9_1")]
[AmqpContentHeaderFactory("file")]
IFileProperties CreateFileProperties();
[AmqpContentHeaderFactory("stream")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9_1")]
IStreamProperties CreateStreamProperties();
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9_1")]
void DtxSelect();
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9_1")]
void DtxStart(string dtxIdentifier);
[AmqpMethodDoNotImplement("")]
void ExchangeBind(string destination, string source, string routingKey);
[AmqpMethodDoNotImplement("")]
void ExchangeBind(string destination, string source, string routingKey, IDictionary<string, object> arguments);
[AmqpMethodDoNotImplement("")]
void ExchangeDeclare(string exchange, string type);
[AmqpMethodDoNotImplement("")]
void ExchangeDeclare(string exchange, string type, bool durable);
[AmqpMethodDoNotImplement("")]
void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);
[AmqpMethodDoNotImplement("")]
void ExchangeDeclarePassive(string exchange);
[AmqpMethodDoNotImplement("")]
void ExchangeDelete(string exchange);
[AmqpMethodDoNotImplement("")]
void ExchangeDelete(string exchange, bool ifUnused);
[AmqpMethodDoNotImplement("")]
void ExchangeUnbind(string destination, string source, string routingKey);
[AmqpMethodDoNotImplement("")]
void ExchangeUnbind(string destination, string source, string routingKey, IDictionary<string, object> arguments);
[AmqpMethodDoNotImplement("")]
void QueueBind(string queue, string exchange, string routingKey);
[AmqpMethodDoNotImplement("")]
void QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);
[AmqpMethodDoNotImplement("")]
QueueDeclareOk QueueDeclare();
[AmqpMethodDoNotImplement("")]
QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);
[AmqpMethodDoNotImplement("")]
QueueDeclareOk QueueDeclarePassive(string queue);
[AmqpMethodDoNotImplement("")]
uint QueueDelete(string queue);
[AmqpMethodDoNotImplement("")]
uint QueueDelete(string queue, bool ifUnused, bool ifEmpty);
[AmqpMethodDoNotImplement("")]
uint QueuePurge(string queue);
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
void QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);
void TxCommit();
void TxRollback();
void TxSelect();
[AmqpMethodDoNotImplement("")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
bool WaitForConfirms();
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
[AmqpMethodDoNotImplement("")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
bool WaitForConfirms(TimeSpan timeout, out bool timedOut);
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
[AmqpMethodDoNotImplement("")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
void WaitForConfirmsOrDie();
[AmqpMethodDoNotImplement("")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]
[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]
void WaitForConfirmsOrDie(TimeSpan timeout);
}