概述
JMS(Java Message Service) 是Java平台上有关面向消息中间件的技术规范。它提供标准的产生、发送、接收消息的接口简化企业应用的开发。它支持两种消息通信模型:点到点(point-to-point)(P2P)模型和发布/订阅(Pub/Sub)模型。
消息中间件
(1)ActiveMQ
ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完 全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现。我们在本次课程中介绍 ActiveMQ 的使 用。
(2)RabbitMQ
AMQP 协议的领导实现,支持多种场景。淘宝的 MySQL 集群内部有使用它进行通讯, OpenStack 开源云平台的通信组件,最先在金融行业得到运用。
(3)ZeroMQ
史上最快的消息队列系统
(4)Kafka
Apache 下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到 10W/s 的吞吐速率;完全的分布式系统。适合处理海量数据。
正文格式
TextMessage--一个字符串对象
MapMessage--一套名称-值对
ObjectMessage--一个序列化的 Java 对象
BytesMessage--一个字节的数据流
StreamMessage -- Java 原始值的数据流
XMLMessage:一个XML类型的消息。
最常用的是TextMessage和ObjectMessage。
JMS 消息传递类型
对于消息的传递有两种类型:
Queue,一种是点对点的,即一个生产者和一个消费者一一对应;
Topic,另一种是发布/ 订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收;
ActiveMQ使用
消息生产者(点对点)
//1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.30.155:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建队列对象 Queue queue = session.createQueue("test-queue"); //6.创建消息生产者 MessageProducer producer = session.createProducer(queue); //7.创建消息 TextMessage textMessage = session.createTextMessage("点对点生产者的消息"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close();
消息消费者(点对点)
//1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.30.155:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建队列对象 Queue queue = session.createQueue("test-queue"); //6.创建消息消费 MessageConsumer consumer = session.createConsumer(queue); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close();
消息生产者(发布订阅)
//1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.30.155:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 Topic topic = session.createTopic("test-topic"); //6.创建消息生产者 MessageProducer producer = session.createProducer(topic); //7.创建消息 TextMessage textMessage = session.createTextMessage("发布订阅生产者消息"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close();
消息消费者(发布订阅)
//1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.30.155:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 //Queue queue = session.createQueue("test-queue"); Topic topic = session.createTopic("test-topic"); //6.创建消息消费 MessageConsumer consumer = session.createConsumer(topic); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close();
session 的两个参数:
第 1 个参数 是否使用事务
第 2 个参数 消息的确认模式
SESSION_TRANSACTED = 0 事务提交并确认
AUTO_ACKNOWLEDGE = 1 自动确认
CLIENT_ACKNOWLEDGE = 2 客户端手动确认
DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
Q.E.D.