RabbitMq 之客户端(publish,subscrbe)
时间:2011-04-19 来源:陈力
{
// Uri uri = new Uri("amqp://192.168.1.100:5672/");
Uri uri = new Uri("amqp://localhost:5672/");
string exchange = "ex1";
string exchangeType = "direct";
string routingKey = "m1";
bool persistMode = true;
ConnectionFactory cf = new ConnectionFactory();
cf.UserName = "lee";
cf.Password = "123456";
cf.VirtualHost = "/";
cf.RequestedHeartbeat = 0;
cf.Endpoint = new AmqpTcpEndpoint(uri);
using (IConnection conn = cf.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
if (exchangeType != null)
{
ch.ExchangeDeclare(exchange, exchangeType);//,true,true,false,false, true,null);
//ch.QueueDeclare("q1", true);//true, true, true, false, false, null);
//ch.QueueBind("q1", "ex1", "m1", false, null);
ch.QueueDeclare("q1", false, false, true, null);
//string queue, string exchange, string routingKey, IDictionary arguments
ch.QueueBind("q1", "ex1", "m1",null);
// durable: 是否持久,如果是true,则在服务重启之后还是存在,否则不存在
//exclusive: 仅创建者可以使用的私有队列,断开后自动删除。(如果为true则只有创建者线程才可以使用,如果线程停止则queue也会自动删除)
//auto_delete: 当所有消费客户端连接断开后,是否自动删除队列
int i = 0;
while (true)
{
IMapMessageBuilder b = new MapMessageBuilder(ch);
IDictionary target = b.Headers;
target["header"] = "hello world";
IDictionary targetBody = b.Body;
targetBody["body"] = "lee"+i++.ToString();
if (persistMode)
{
((IBasicProperties)b.GetContentHeader()).DeliveryMode = 2;
}
ch.BasicPublish(exchange, routingKey,
(IBasicProperties)b.GetContentHeader(),
b.GetContentBody());
}
}
}
}
}
___________________________________________ 客户端publish代码
public static void InitCustmerMq()
{
string exchange = "ex1";
string exchangeType = "direct";
string routingKey = "m1";
// string serverAddress = "amqp://192.168.1.100:5672";
Uri uri = new Uri("amqp://localhost:5672/");
ConnectionFactory cf = new ConnectionFactory();
cf.Endpoint = new AmqpTcpEndpoint(uri);
cf.UserName = "lee";
cf.Password = "123456";
cf.VirtualHost = "/";
cf.RequestedHeartbeat = 0;
using (IConnection conn = cf.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
//普通使用方式BasicGet
//noAck = true,不需要回复,接收到消息后,queue上的消息就会清除
//noAck = false,需要回复,接收到消息后,queue上的消息不会被清除,直到调用channel.basicAck(deliveryTag, false); queue上的消息才会被清除 而且,在当前连接断开以前,其它客户端将不能收到此queue上的消息
// BasicGetResult res = ch.BasicGet("q1", false/*noAck*/);
//if (res != null)
//{
// bool t = res.Redelivered;
// t = true;
// Console.WriteLine(System.Text.UTF8Encoding.UTF8.GetString(res.Body));
// ch.BasicAck(res.DeliveryTag, false);
//}
//else
//{
// Console.WriteLine("No message!");
//}
//return 0;
while (true)
{
BasicGetResult res = ch.BasicGet("q1", false/*noAck*/);
if (res != null)
{
try
{
bool t = res.Redelivered;
t = true;
Console.WriteLine(System.Text.UTF8Encoding.UTF8.GetString(res.Body));
ch.BasicAck(res.DeliveryTag, false);
}
catch { }
}
//else
// break;
}
}
}
}
通常的操作流程是:
(1) 消费者: 创建信息通道。
(2) 消费者: 定义消息队列。
(3) 消费者: 定义特定类型的交换机。
(4) 消费者: 设定绑定规则 (包括交换机名称、队列名称以及路由键)。
(5) 消费者: 等待消息。
(6) 生产者: 创建消息。
(7) 生产者: 将消息投递给信息通道 (注明接收交换机名称和路由键)。
(8) 交换机: 获取消息,依据交换机类型决定是否匹配路由规则 (如需匹配,则对比消息路由键和绑定路由键)。
(9) 消费者: 获取并处理消息,发送反馈。
(10) 结束: 关闭通道和连接
——————————————————————————客户端消费者代码
队列定义参数:
durable: 是否持久,如果是true,则在服务重启之后还是存在,否则不存在
exclusive: 仅创建者可以使用的私有队列,断开后自动删除。(如果为true则只有创建者线程才可以使用,如果线程停止则queue也会自动删除)
auto_delete: 当所有消费客户端连接断开后,是否自动删除队列。
交换机定义参数:
type: 交换机类型,包括 fanout, direct 和 topic。
auto_delete: 当所有绑定队列都不再使用时,是否自动删除该交换机。
交换机类型:
Fanout: 不处理路由键,将消息广播给绑定到该交换机的所有队列。 不论消息的路由关键字是什么,这条消息都会被路由到所有与该交换器绑定的队列中。
广播式交换器类型的工作方式如下:
不使用任何参数将消息队列与交换器绑定在一起。 发布者(直接式交换器类型描述中的producer变成了publisher,已经隐含了二种交换器类型的区别)向交换器发送一条消息。 消息被无条件的传递到所有和这个交换器绑定的消息队列中。
Direct: 处理路由键,对消息路径进行全文匹配。消息路由键 "dog" 只能匹配 "dog" 绑定,不匹配 "dog.puppy" 这类绑定。
通过精确匹配消息的路由关键字,将消息路由到零个或者多个队列中,绑定关键字用来将队列和交换器绑定到一起。这让我们可以构建经典的点对点队列消息传输模型,不过和任何已定义的交换器类型一样,当消息的路由关键字与多个绑定关键字匹配时,消息可能会被发送到多个队列中。
Topic: 处理路由键,按模式匹配路由键。模式符号 "#" 表示一个或多个单词,"*" 仅匹配一个单词。如 "audit.#" 可匹配 "audit.irs.corporate",但 "audit.*" 只匹配 "audit.irs"。
主题式交换器类型提供了这样的路由机制:通过消息的路由关键字和绑定关键字的模式匹配,将消息路由到被绑定的队列中。这种路由器类型可以被用来支持经典的发布/订阅消息传输模型——使用主题名字空间作为消息寻址模式,将消息传递给那些部分或者全部匹配主题模式的多个消费者。
主题交换器类型的工作方式如下:
绑定关键字用零个或多个标记构成,每一个标记之间用“.”字符分隔。绑定关键字必须用这种形式明确说明,并支持通配符:“*”匹配一个词组,“#”零个或多个词组。
因此绑定关键字“*.stock.#”匹配路由关键字“usd.stock”和“eur.stock.db”,但是不匹配“stock.nasdaq”。
这种交换器类型是可选的。
生产者无需定义队列、交换机和绑定,只需将消息投递给信息通道即可。
如果定义一durable的queue,那么bind此queue的exchange也必须是durable的
定义用户权限
set_permissions [-p vhostpath] [-s scope] {user} {conf} {write} {read}
vhostpath
The name of the virtual host to which to grant the user access, defaulting to /.
scope
Scope of the permissions: either client (the default) or all. This determines whether permissions are checked for server-generated resource names (all) or only for client-specified resource names (client).
user
The name of the user to grant access to the specified virtual host.
conf
A regular expression matching resource names for which the user is granted configure permissions.
write
A regular expression matching resource names for which the user is granted write permissions.
read
A regular expression matching resource names for which the user is granted read permissions.
Sets user permissions.
参考链接:http://blog.csdn.net/ykdsg/archive/2010/11/16/6013448.aspx