新网创想网站建设,新征程启航
为企业提供网站建设、域名注册、服务器等服务
本文介绍了java远程连接调用Rabbitmq,分享给大家,希望此文章对各位有所帮助。
富县网站建设公司创新互联公司,富县网站设计制作,有大型网站制作公司丰富经验。已为富县超过千家提供企业网站建设服务。企业网站搭建\外贸营销网站建设要多少钱,请找那个售后服务好的富县做网站的公司定做!
打开IDEA创建一个maven工程(Java就可以了)。
pom.xml文件如下
4.0.0 com.zhenqi rabbitmq-study 1.0-SNAPSHOT jar rabbitmq-study http://maven.apache.org UTF-8 junit junit 4.12 test com.rabbitmq amqp-client 4.1.0 org.slf4j slf4j-api org.slf4j slf4j-log4j12 1.7.21 commons-lang commons-lang 2.6
为了能远程访问rabbitmq,则需要编辑 /etc/rabbitmq/rabbitmq.conf,添加以下内容。
[ {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]} ]
添加administrator角色
rabbitmqctl set_user_tags openstack administrator
创建抽象队列 EndPoint.java
package com.zhenqi; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * Created by wuming on 2017/7/16. */ public abstract class EndPoint { protected Channel channel; protected Connection connection; protected String endPointName; public EndPoint(String endpointName) throws Exception { this.endPointName = endpointName; //创建一个连接工厂 connection factory ConnectionFactory factory = new ConnectionFactory(); //设置rabbitmq-server服务IP地址 factory.setHost("192.168.146.128"); factory.setUsername("openstack"); factory.setPassword("rabbitmq"); factory.setPort(5672); factory.setVirtualHost("/"); //得到 连接 connection = factory.newConnection(); //创建 channel实例 channel = connection.createChannel(); channel.queueDeclare(endpointName, false, false, false, null); } /** * 关闭channel和connection。并非必须,因为隐含是自动调用的。 * @throws IOException */ public void close() throws Exception{ this.channel.close(); this.connection.close(); } }
生产者Producer.java
生产者类的任务是向队列里写一条消息
package com.zhenqi; import org.apache.commons.lang.SerializationUtils; import java.io.Serializable; /** * Created by wuming on 2017/7/16. */ public class Producer extends EndPoint { public Producer(String endpointName) throws Exception { super(endpointName); } public void sendMessage(Serializable object) throws Exception { channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object)); } }
消费者QueueConsumer.java
消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。
package com.zhenqi; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; import org.apache.commons.lang.SerializationUtils; import org.apache.log4j.Logger; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * Created by wuming on 2017/7/16. */ public class QueueConsumer extends EndPoint implements Runnable, Consumer { private Logger LOG=Logger.getLogger(QueueConsumer.class); public QueueConsumer(String endpointName) throws Exception { super(endpointName); } public void handleConsumeOk(String s) { } public void handleCancelOk(String s) { } public void handleCancel(String s) throws IOException { } public void handleShutdownSignal(String s, ShutdownSignalException e) { } public void handleRecoverOk(String s) { LOG.info("Consumer "+s +" registered"); } public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { Map map = (HashMap) SerializationUtils.deserialize(bytes); LOG.info("Message Number "+ map.get("message number") + " received."); } public void run() { try{ channel.basicConsume(endPointName, true,this); }catch(IOException e){ e.printStackTrace(); } } }
测试
运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走
package com.zhenqi; import java.util.HashMap; /** * Created by wuming on 2017/7/16. */ public class TestRabbitmq { public static void main(String[] args){ try{ QueueConsumer consumer = new QueueConsumer("queue"); Thread consumerThread = new Thread(consumer); consumerThread.start(); Producer producer = new Producer("queue"); for (int i = 0; i < 100000; i++){ HashMap message = new HashMap(); message.put("message number", i); producer.sendMessage(message); System.out.println("Message Number "+ i +" sent."); } }catch(Exception e){ e.printStackTrace(); } } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持创新互联。