java+rabbitMQ实现一对一聊天
源碼地址: https://download.csdn.net/download/weixin_40461281/10321780
上一篇文章講了RabbitMQ的安裝
接下來介紹一下具體的應(yīng)用
使用java + rabbitMQ實現(xiàn)聊天功能的demo , 非常有助于理解和上手rabbitMQ , 該demo僅限于用來學習rabbitMQ , 實際工作中實現(xiàn)聊天功能不推薦使用rabbitMQ
首先創(chuàng)建一個maven項目,然后在pom.xml文件中導入RabbitMQ的jar包
地址如下:
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.0</version></dependency> </dependencies>工作模式采用-工作隊列 接下來具體講解一下代碼實現(xiàn)
首先創(chuàng)建一個類A 并創(chuàng)建連接工廠和創(chuàng)建一個新的連接
//創(chuàng)建連接工廠ConnectionFactory factory = new ConnectionFactory();//設(shè)置RabbitMQ地址factory.setHost("localhost");//連接地址factory.setUsername("guest");//用戶名factory.setPassword("guest");//密碼factory.setPort(5672);//端口號//創(chuàng)建一個新的連接final Connection connection = factory.newConnection();然后創(chuàng)建線程T1用來發(fā)送消息:
//發(fā)送消息線程 Thread t1 = new Thread(new Runnable() {public void run() {//創(chuàng)建一個頻道Channel channel = null;try {channel = connection.createChannel();//聲明要關(guān)注的頻道channel.exchangeDeclare("logs", "fanout");//channel.queueDeclare(QUEUE_NAME, false, false, false, null);} catch (Exception e) {e.printStackTrace();}while(true) {Scanner scan = new Scanner(System.in);System.out.println("請輸入消息");String message = scan.nextLine();//發(fā)送消息到隊列中try {channel.basicPublish("logs", QUEUE_NAME, null, message.getBytes());//channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));} catch (Exception e) {e.printStackTrace();}System.out.println("B發(fā)送消息:" + message);}} });創(chuàng)建T2用來監(jiān)聽接收消息:
//接收消息線程 Thread t2 = new Thread(new Runnable() {public void run() {Channel channel = null;try {channel = connection.createChannel();//聲明要關(guān)注的頻道channel.exchangeDeclare("logs", "fanout");//channel.queueDeclare(QUEUE_NAME,false,false,false,null);channel.queueBind(QUEUE_NAME, "logs", "");//創(chuàng)建消費者 ---- 得到消息后會自動觸發(fā)Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//body為消息體String message = new String(body, "UTF-8");System.out.println("B接收消息:" + message);}};//消息消費完成確認channel.basicConsume(QUEUE_NAME, true, consumer);} catch (Exception e) {e.printStackTrace();}} });最后別忘了啟動兩個線程:
t1.start(); t2.start();然后我們在創(chuàng)建一個一模一樣的類B
public class B {private final static String QUEUE_NAME = "test";public static void main(String[] args) throws Exception{//創(chuàng)建連接工廠ConnectionFactory factory = new ConnectionFactory();//設(shè)置RabbitMQ地址factory.setHost("localhost");//連接地址factory.setUsername("guest");//用戶名factory.setPassword("guest");//密碼factory.setPort(5672);//端口號//創(chuàng)建一個新的連接final Connection connection = factory.newConnection();//發(fā)送消息線程Thread t1 = new Thread(new Runnable() {public void run() {//創(chuàng)建一個頻道Channel channel = null;try {channel = connection.createChannel();//聲明要關(guān)注的頻道channel.exchangeDeclare("logs", "fanout");//channel.queueDeclare(QUEUE_NAME, false, false, false, null);} catch (Exception e) {e.printStackTrace();}while(true) {Scanner scan = new Scanner(System.in);System.out.println("請輸入消息");String message = scan.nextLine();//發(fā)送消息到隊列中try {channel.basicPublish("logs", QUEUE_NAME, null, message.getBytes());//channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));} catch (Exception e) {e.printStackTrace();}System.out.println("B發(fā)送消息:" + message);}}});//接收消息線程Thread t2 = new Thread(new Runnable() {public void run() {Channel channel = null;try {channel = connection.createChannel();//聲明要關(guān)注的頻道channel.exchangeDeclare("logs", "fanout");//channel.queueDeclare(QUEUE_NAME,false,false,false,null);channel.queueBind(QUEUE_NAME, "logs", "");//創(chuàng)建消費者 ---- 得到消息后會自動觸發(fā)Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//body為消息體String message = new String(body, "UTF-8");System.out.println("B接收消息:" + message);}};//消息消費完成確認channel.basicConsume(QUEUE_NAME, true, consumer);} catch (Exception e) {e.printStackTrace();}}});t1.start();t2.start();} }然后分別運行兩個類:
最后由于本文的demo需要持續(xù)監(jiān)聽 , 所以未做關(guān)閉連接
如果大家在實際中使用一定記得要關(guān)閉連接,不然小心你的內(nèi)存
}finally {try {if (channel != null) {channel.close();}if (connection != null) {connection.close();}}catch (Exception e) {e.printStackTrace();} }?
好了 , 這樣一個簡單的一對一聊天功能就完成了
?
我們也可以用 -- 發(fā)布訂閱模式 實現(xiàn)多人在線聊天 , 在這里我就不演示了,有興趣的小伙伴可以自己做一下
好了,本篇文章就到這了
?
總結(jié)
以上是生活随笔為你收集整理的java+rabbitMQ实现一对一聊天的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python 局域网即时通讯工具
- 下一篇: 【bypass】403绕过