新网创想网站建设,新征程启航
为企业提供网站建设、域名注册、服务器等服务
本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记。
专注于为中小企业提供成都网站设计、成都做网站服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业五河免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了近1000家企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。
上一篇介绍了AMQP消息通信,包括队列、交换器和绑定,通过虚拟主机还可以隔离数据和权限,消息持久化和发送方确认模式确保了消息不丢失。
本篇主要介绍如何运行和管理RabbitMQ,在介绍之前,会有个DEMO演示消息发送和接收,一方面对AMQP的元素有更直观的认识,一方面为后面介绍监控做数据来源。
通过介绍,你会了解到:
该Demo主要用于收集日志,消息发送者是各个应用子系统,消息接收者是日志收集服务,使用RabbitMQ可以很容易实现。
基于Spring Boot框架实现,主要类的作用如下:
消息模型如下:
首先,配置spring boot和rabbitmq依赖:
org.springframework.boot
spring-boot-starter-parent
2.0.1.RELEASE
org.springframework.boot
spring-boot-starter
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-starter-amqp
然后在application.properties文件中配置rabbitmq地址:
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
使用Spring的@Configuration定义配置类,可替换xml配置文件,被注解的类内部包含有一个或多个被@Bean注解的方法,用于构建bean定义,初始化Spring容器。
@Configuration
public class LogRabbitConfig {
final static String QUEUE_LOG_ERROR = "log.error";
final static String QUEUE_LOG_ALL = "log.all";
//创建log.error队列
@Bean
public Queue logError() {
return new Queue(QUEUE_LOG_ERROR);
}
//创建log.all队列
@Bean
public Queue logAll() {
return new Queue(QUEUE_LOG_ALL);
}
//创建exchange,命名为log
@Bean
TopicExchange exchange() {
return new TopicExchange("log");
}
//绑定log.error队列到exchange,routingkey为log.error
@Bean
Binding bindingExchangeError(Queue logError, TopicExchange exchange) {
return BindingBuilder.bind(logError).to(exchange).with("log.error");
}
//绑定log.all队列到exchange,routingkey为log.#
@Bean
Binding bindingExchangeAll(Queue logAll, TopicExchange exchange) {
return BindingBuilder.bind(logAll).to(exchange).with("log.#");
}
}
各个子系统向rabbitmq服务器发送消息:
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
//向mq服务端发送消息,exchange为log,routingkey为log.error
String context = "error log";
this.rabbitTemplate.convertAndSend("log", "log.error", context);
//向mq服务端发送消息,exchange为log,routingkey为log.info
context = "info log";
System.out.println("send msg : " + context);
this.rabbitTemplate.convertAndSend("log", "log.info", context);
//向mq服务端发送消息,exchange为log,routingkey为log.warn
context = "warn log";
System.out.println("send msg : " + context);
this.rabbitTemplate.convertAndSend("log", "log.warn", context);
}
}
从rabbitmq服务器接收消息。
AllReceiver从服务器的log.all队列获取消息,因为它绑定的routingkey为"log.#",所以,会收到所有级别的日志:
@Component
@RabbitListener(queues = "log.all")
public class AllReceiver {
@RabbitHandler
public void process(String context) {
System.out.println("receive log : " + context);
}
}
ErrorReceiver从服务器的log.error队列获取消息,因为它绑定的routingkey为"log.error",所以,只会收到error级别的日志:
@Component
@RabbitListener(queues = "log.error")
public class ErrorReceiver {
@RabbitHandler
public void process(String context) {
System.out.println("receive error : " + context);
}
}
测试用例很简单,就是调用Sender发送消息,观察消息的接收情况。
@RunWith(SpringRunner.class)
@SpringBootTest
@SpringBootApplication
public class LogSenderTest {
@Autowired
Sender sender;
@Test
public void sendLog() {
sender.send();
}
}
运行日志如下:
可以看到,error收到了2次,说明exchange同时分发给了log.all和log.error队列,其他级别的日志分发给了log.all队列。
RabbitMQ是用Erlang编写的,Erlang天生就能让应用程序无需知道对方是否在同一台机器上即可相互通信,这让集群和可靠的消息路由变得简单。
和Java有JVM虚拟机类似,Erlang也有虚拟机,虚拟机的每个实例称之为「节点」,不同的是,多个Erlang应用程序可以运行在同一个节点之上,如果应用程序崩溃了,Erlang节点会自动尝试自动重启应用程序。
节点的操作:
配置文件的格式本质上是原始的Erlang数据结构,是一个包含了嵌套哈希表的数组,如下:
[
[mnesia , [{dump_log_write_threshold , 1000}]],
[rabbit , [{vm_memory_high_wateremark , 0.4}]]
]
上面配置了2个应用,每个应用会有自己的哈希表来配置选项:
每个应用如果有多个选项,用逗号隔开。
RabbitMQ权限系统中,单个用户可以跨越多个vhost进行授权,而且可以对读、写、配置分别授权。
首先创建一个用户dongqingqing,密码为123456:
./rabbitmqctl add_user dongqingqing 123456
授予dongqingqing用户权限,可以读取所有队列和交换器,只可写log.*格式的队列和交换器,无法创建或删除队列和交换器
./rabbitmqctl set_permissions dongqingqing ".*" "log.*" ""
set_permissions 后面的参数分别为用户名、读权限、写权限、配置权限。
其他详细用法可查看文档。
可通过rabbitmqctl命令查看数据统计信息,比如队列和消息数目、交换器和绑定等。
查看所有队列,包含上面demo定义的log.all和log.error:
查看所有交换器,包含上面demo定义的log
另外,rabbitmq提供了管理界面插件,更方便的查看各种统计,可以通过下面的命令开启:
sudo ./rabbitmq-plugins enable rabbitmq_management
可以在文件系统中查看日志,启动rabbitmq后,会显示日志的路径:
另外,可以通过AMQP获取实时日志信息,有一个amq.rabbitmq.log的topic交换器,监听对应的队列即可。
下一篇将介绍消息通信模式和最佳实践,感谢大家持续关注。
欢迎扫描下方二维码,关注我的个人微信公众号 ~
创新互联www.cdcxhl.cn,专业提供香港、美国云服务器,动态BGP最优骨干路由自动选择,持续稳定高效的网络助力业务部署。公司持有工信部办法的idc、isp许可证, 机房独有T级流量清洗系统配攻击溯源,准确进行流量调度,确保服务器高可用性。佳节活动现已开启,新人活动云服务器买多久送多久。