- AMQP(Advanced Message Queue Protocol) + elang으로 구성.

  1. 클라이언트들에서 메시지를 받아서 분배하는 역할

 

* 나의 경우는 웹으로 모니터링 할 수 있게끔 c#으로 client를 구축

1. 홈페이지에서 rabbitmq-dotnet-client-3.3.4-dotnet-3.0.zip 을 받아, RabbitMQ.Client.dll 을 참조하여 RabbitMQ 사용할 준비를 끝낸다.

2. 이 토끼씨는... 예제가 잘 되어 있다. 언어별로... ㅎ 여기를 차근차근 따라하면 okay! (Tutorials : http://www.rabbitmq.com/getstarted.html)

또한, pdf도 제공하고 있으니,, 자세한것은 pdf 참고!! ( 나의 경우는 //2.8. Retrieving messages by subscription used. )

 

3. 기본 알고리즘

var factory = new ConnectionFactory()

{

UserName = "username",

Password = "password",

Protocol = Protocols.FromEnvironment(),

Port = AmqpTcpEndpoint.UseDefaultPort,

//factory.Uri = "amqp://user:pass@hostName:port/vhost";

Uri = "amqp://username:password@uri"

};

 

public class ConnectionFactory

{

public const ushort DefaultChannelMax = 0;

public const int DefaultConnectionTimeout = 30000;

public const uint DefaultFrameMax = 0;

public const ushort DefaultHeartbeat = 0;

public const string DefaultPass = "guest";

public const string DefaultUser = "guest";

public const string DefaultVHost = "/";

 

public AuthMechanismFactory[] AuthMechanisms;

public IDictionary<string, object> ClientProperties;

public static AuthMechanismFactory[] DefaultAuthMechanisms;

public string HostName;

public string Password;

public int Port;

public IProtocol Protocol;

public ushort RequestedChannelMax;

public int RequestedConnectionTimeout;

public uint RequestedFrameMax;

public ushort RequestedHeartbeat;

public ConnectionFactory.ObtainSocket SocketFactory;

public SslOption Ssl;

public string UserName;

public string VirtualHost;

 

public ConnectionFactory();

 

public AmqpTcpEndpoint Endpoint { get; set; }

public string Uri { set; }

public Uri uri { set; }

 

public AuthMechanismFactory AuthMechanismFactory(string[] mechs);

public virtual IConnection CreateConnection();

public virtual IConnection CreateConnection(int maxRedirects);

protected virtual IConnection CreateConnection(int maxRedirects, IDictionary<AmqpTcpEndpoint, int> connectionAttempts, IDictionary<AmqpTcpEndpoint, Exception> connectionErrors, params AmqpTcpEndpoint[] endpoints);

public static TcpClient DefaultSocketFactory(AddressFamily addressFamily);

protected virtual IConnection FollowRedirectChain(int maxRedirects, IDictionary<AmqpTcpEndpoint, int> connectionAttempts, IDictionary<AmqpTcpEndpoint, Exception> connectionErrors, ref AmqpTcpEndpoint[] mostRecentKnownHosts, AmqpTcpEndpoint endpoint);

 

public delegate TcpClient ObtainSocket(AddressFamily addressFamily);

}

 

 

 

IConnection conn = factory.CreateConnection();

 

public interface IConnection : IDisposable

{

bool AutoClose { get; set; }

ushort ChannelMax { get; }

IDictionary<string, object> ClientProperties { get; }

ShutdownEventArgs CloseReason { get; }

AmqpTcpEndpoint Endpoint { get; }

uint FrameMax { get; }

ushort Heartbeat { get; }

bool IsOpen { get; }

AmqpTcpEndpoint[] KnownHosts { get; }

IProtocol Protocol { get; }

IDictionary<string, object> ServerProperties { get; }

IList<ShutdownReportEntry> ShutdownReport { get; }

 

event CallbackExceptionEventHandler CallbackException;

event ConnectionBlockedEventHandler ConnectionBlocked;

event ConnectionShutdownEventHandler ConnectionShutdown;

event ConnectionUnblockedEventHandler ConnectionUnblocked;

 

void Abort();

void Abort(int timeout);

void Abort(ushort reasonCode, string reasonText);

void Abort(ushort reasonCode, string reasonText, int timeout);

void Close();

void Close(int timeout);

void Close(ushort reasonCode, string reasonText);

void Close(ushort reasonCode, string reasonText, int timeout);

IModel CreateModel();

void HandleConnectionBlocked(string reason);

void HandleConnectionUnblocked();

}

 

 

IModel channel = conn.CreateModel();

channel.QueueDeclare(inputQueueName, durable, autoDelete, autoDelete, arguments);

 

 

public interface IModel : IDisposable

{

ShutdownEventArgs CloseReason { get; }

IBasicConsumer DefaultConsumer { get; set; }

bool IsClosed { get; }

bool IsOpen { get; }

ulong NextPublishSeqNo { get; }

 

event BasicAckEventHandler BasicAcks;

event BasicNackEventHandler BasicNacks;

event BasicRecoverOkEventHandler BasicRecoverOk;

event BasicReturnEventHandler BasicReturn;

event CallbackExceptionEventHandler CallbackException;

event FlowControlEventHandler FlowControl;

event ModelShutdownEventHandler ModelShutdown;

 

[AmqpMethodDoNotImplement("")]

void Abort();

[AmqpMethodDoNotImplement("")]

void Abort(ushort replyCode, string replyText);

void BasicAck(ulong deliveryTag, bool multiple);

[AmqpMethodDoNotImplement("")]

void BasicCancel(string consumerTag);

[AmqpMethodDoNotImplement("")]

string BasicConsume(string queue, bool noAck, IBasicConsumer consumer);

[AmqpMethodDoNotImplement("")]

string BasicConsume(string queue, bool noAck, string consumerTag, IBasicConsumer consumer);

[AmqpMethodDoNotImplement("")]

string BasicConsume(string queue, bool noAck, string consumerTag, IDictionary<string, object> arguments, IBasicConsumer consumer);

[AmqpMethodDoNotImplement("")]

string BasicConsume(string queue, bool noAck, string consumerTag, bool noLocal, bool exclusive, IDictionary<string, object> arguments, IBasicConsumer consumer);

[AmqpMethodDoNotImplement("")]

BasicGetResult BasicGet(string queue, bool noAck);

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]

void BasicNack(ulong deliveryTag, bool multiple, bool requeue);

[AmqpMethodDoNotImplement("")]

void BasicPublish(PublicationAddress addr, IBasicProperties basicProperties, byte[] body);

[AmqpMethodDoNotImplement("")]

void BasicPublish(string exchange, string routingKey, IBasicProperties basicProperties, byte[] body);

[AmqpMethodDoNotImplement("")]

void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, byte[] body);

[AmqpMethodDoNotImplement("")]

void BasicPublish(string exchange, string routingKey, bool mandatory, bool immediate, IBasicProperties basicProperties, byte[] body);

void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);

[AmqpMethodDoNotImplement("")]

void BasicRecover(bool requeue);

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]

void BasicRecoverAsync(bool requeue);

void BasicReject(ulong deliveryTag, bool requeue);

[AmqpMethodDoNotImplement("")]

void Close();

[AmqpMethodDoNotImplement("")]

void Close(ushort replyCode, string replyText);

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]

[AmqpMethodDoNotImplement("")]

void ConfirmSelect();

[AmqpContentHeaderFactory("basic")]

IBasicProperties CreateBasicProperties();

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9_1")]

[AmqpContentHeaderFactory("file")]

IFileProperties CreateFileProperties();

[AmqpContentHeaderFactory("stream")]

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9_1")]

IStreamProperties CreateStreamProperties();

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9_1")]

void DtxSelect();

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9_1")]

void DtxStart(string dtxIdentifier);

[AmqpMethodDoNotImplement("")]

void ExchangeBind(string destination, string source, string routingKey);

[AmqpMethodDoNotImplement("")]

void ExchangeBind(string destination, string source, string routingKey, IDictionary<string, object> arguments);

[AmqpMethodDoNotImplement("")]

void ExchangeDeclare(string exchange, string type);

[AmqpMethodDoNotImplement("")]

void ExchangeDeclare(string exchange, string type, bool durable);

[AmqpMethodDoNotImplement("")]

void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);

[AmqpMethodDoNotImplement("")]

void ExchangeDeclarePassive(string exchange);

[AmqpMethodDoNotImplement("")]

void ExchangeDelete(string exchange);

[AmqpMethodDoNotImplement("")]

void ExchangeDelete(string exchange, bool ifUnused);

[AmqpMethodDoNotImplement("")]

void ExchangeUnbind(string destination, string source, string routingKey);

[AmqpMethodDoNotImplement("")]

void ExchangeUnbind(string destination, string source, string routingKey, IDictionary<string, object> arguments);

[AmqpMethodDoNotImplement("")]

void QueueBind(string queue, string exchange, string routingKey);

[AmqpMethodDoNotImplement("")]

void QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);

[AmqpMethodDoNotImplement("")]

QueueDeclareOk QueueDeclare();

[AmqpMethodDoNotImplement("")]

QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);

[AmqpMethodDoNotImplement("")]

QueueDeclareOk QueueDeclarePassive(string queue);

[AmqpMethodDoNotImplement("")]

uint QueueDelete(string queue);

[AmqpMethodDoNotImplement("")]

uint QueueDelete(string queue, bool ifUnused, bool ifEmpty);

[AmqpMethodDoNotImplement("")]

uint QueuePurge(string queue);

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]

void QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);

void TxCommit();

void TxRollback();

void TxSelect();

[AmqpMethodDoNotImplement("")]

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]

bool WaitForConfirms();

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]

[AmqpMethodDoNotImplement("")]

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]

bool WaitForConfirms(TimeSpan timeout, out bool timedOut);

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]

[AmqpMethodDoNotImplement("")]

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]

void WaitForConfirmsOrDie();

[AmqpMethodDoNotImplement("")]

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_9")]

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8qpid")]

[AmqpUnsupported("RabbitMQ.Client.Framing.v0_8")]

void WaitForConfirmsOrDie(TimeSpan timeout);

}

 

 

 

반응형

+ Recent posts