ActiveMQ安装配置_编程入门ActiveMQ基础_ActiveMQ消息模型高级特性

消息队列核心概念解析:从入门到实战

迎接踏入,由消息队列(Message Queue)所构成的编程天地。针对那些全然没有基础的开发者而言,知晓消息队列这个事物,则成为构建起基于该种队列搭建之更为庞大体系应用支撑架构,以及面向高并发场景下应用程序之核心关键步骤。它好似一个具备超级职能的传递者,专门负责于不同程序之间,能够做到十足可靠地去传递包裹的数据单元,也就是被称作消息(Message)的这些内容。不论是微服务相互间的通信,还是应对海量用户请求时,消息队列都充当着通信中枢这一角色,它提供高效率、可扩展式的消息传递服务,支持多种编程语言的客户端,并且能够轻松实现不同应用程序的集成,是构建现代化、具备高弹性系统架构的根本基础。

# 示例命令,根据实际安装路径调整
D:apache-activemq-5.x.x> binactivemq start

两大核心模型:Queue与Topic的深度对比

于消息队列的范畴之内,存在着两种最为基础的消息模型,其一为Queue(队列),其二为Topic(主题),它们之间所存在的区别,直接对消息的流向以及系统的设计逻辑产生决定作用。

Queue(点对点模型):一条只有一位 消费者 能够成功接收进而处理的消息。此等模型在 任务分发 以及 负载均衡 方面天然适配而加以沿用且可有所应用。比如说,于一场电商秒杀活动里头,数量众多的订单创建请求能够当作消息放进Queue,后端所挂载的好些订单处理服务,即所谓的消费者,会去争抢这些任务,每一条订单消息最终仅仅会被当中的一个服务实例予以处理,借此避免重复的操作行为,达成高效并行的状态。

一个被称作发布 - 订阅模型的主题中,一条消息能够被多个订阅者同时接收到,这情形犹如广播电台一样,每当节目那种消息发出过后,并且前提是所有已打开收音机当作处于订阅这种准备状态的听众,都能够收到,这种模型适用于事件通知的场景,也适用于日志分发的场景。比如说,用户进行修改密码这一事件,被发布到了一个Topic,那么,负责发送短信通知的服务,能够独立地接收到这条消息,负责更新缓存的服务,也能够独立地接收到这条消息,负责记录审计日志的服务,同样能够独立地接收到这条消息,并且会依据自身职责,各自执行相应的操作。

数据安全基石:消息持久化机制揭秘

// 创建消息生产者
MessageProducer producer = session.createProducer(destination);
// 创建一个带有优先级和TTL的TextMessage
TextMessage message = session.createTextMessage("This is a high-priority message with TTL");
message.setJMSPriority(9); // 设置消息优先级
message.setExpiration(System.currentTimeMillis() + 30000); // 设置TTL为30秒
// 发送消息
producer.send(message);

系统重启之后,消息是不是还会存在呢?答案是确定的,条件是你开启了,被称作消息持久化的功能。这是确保数据不会丢失的关键机制。消息队列默认运用的存储方法,是一种专门针对消息队列构建的、具备事务性的日志数据进行存储的系统,就像是Kafka的底层存储,或者RocketMQ的CommitLog。那它可不是把消息零零散散地去保存,而是运用顺序追加写进去的方式,把消息持久化到磁盘文件当中,极大程度地增强了IO性能。

寻找适配的持久化机制相当关键,要是你追寻极致的吞吐量并且能够忍受一小部分数据遗失,那么能够挑选纯内存存储,相反的,在像金融、电商这些对数据一致性有着严格要求的情景里,就一定要开启同步刷盘等持久化策略。开发者要依据实际存在的性能方面的需求,以及数据恢复方面的需求,还要参照服务器具有的资源限制去做权衡,进而寻得性能跟可靠性之间的最佳平衡点。

进阶特性实战:优先级与过期时间(TTL)

消息模型不是单纯的“存”与“取”,它还给出了消息优先级,以及过期时间(TTL,Time To Live)等较高级的特性,从而使消息处理具备更大的灵活性。

// Java代码示例:消息发送与确认
Connection connection = null;
Session session = null;
MessageProducer producer = null;
try {
    connection = factory.createConnection();
    connection.start();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("TEST_QUEUE");
    producer = session.createProducer(destination);
    TextMessage message = session.createTextMessage("Hello ActiveMQ");
    producer.send(message);
    producer.close();
    session.commit();
} catch (Exception e) {
    e.printStackTrace();
} finally {
    if (producer != null) {
        try {
            producer.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    if (session != null) {
        try {
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    if (connection != null) {
        try {
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

这儿有个属于概念性的示例代码有着Java的编写形式,它能够表现呈现出在进行消息发送关联动作之际怎样去设定设置这些属性内容:

// 创建一个消息对象,假设使用某种消息队列的API
Message message = new Message("orderTopic", "orderCancelled".getBytes());
// 设置消息优先级为9(假设优先级范围是0-9,数值越高越优先)
message.setPriority(9);
// 设置消息的过期时间为30秒,30秒后如果未被消费,则自动从队列中清除
message.setExpiration(30000); // 单位毫秒
// 通过生产者发送消息
producer.send(message);

// Java代码示例:消息接收与处理
Connection connection = null;
Session session = null;
MessageConsumer consumer = null;
try {
    connection = factory.createConnection();
    connection.start();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("TEST_QUEUE");
    consumer = session.createConsumer(destination);
    Message message = consumer.receive();
    if (message instanceof TextMessage) {
        System.out.println(((TextMessage) message).getText());
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    if (consumer != null) {
        try {
            consumer.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    if (session != null) {
        try {
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    if (connection != null) {
        try {
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

ActiveMQ消息模型高级特性_编程入门ActiveMQ基础_ActiveMQ安装配置

对此例子而言,具有优先级为9的,名为订单取消消息的内容,会于队列里插队,被名为消费者的对象优先予以处理,这对那些需要快速做出响应的业务来讲是极其关键重要的。并且过期时间能够有效地防止无效消息出现堆积的情况,就像一个在30秒之内都没有完成支付操作的订单,与之相对应的取消消息在过期之后会自动被删除掉,如此便避免了逻辑上出现错误的情况。这些特性要是被合理地加以使用,那么系统灵活性是能够得到大幅提升的,然而要是滥用的话(就好比把数量较多的消息设置成最高优先级这种情况),反倒有可能致使普通消息出现“饿死”情形,进而造成性能瓶颈。

高并发利器:异步消息处理实战

身处互联网应用范畴内,异步这一概念乃是促使系统吞吐量得以提升的关键核心思想所在。异步消息处理这种方式,赋予了消息的生产者以及消费者彼此之间不存在直接等待依赖这样一种特性。

场景示例:订单处理系统

假设存在一个订单系统,用户提交订单属于高频操作,要是进行同步处理,也就是包括写数据库、调用库存以及发送短信这些操作,那么用户就得等待很长时间,在采用异步消息之后。

1. 用户递交订单,系统马上把订单信息打包成消息传送到队列那儿,随后径直返回“订单提交成功”。

2. 后台多个订单处理消费者监听这个队列。

3. 消息抵达之际,消息监听器容器(比如Spring的DefaultMessageListenerContainer),会自行调用监听器的回调方法(像是onMessage),去开展切实的业务处理(像扣减库存、发送短信之类的)。

采用此种方式,系统达成了生产者与消费者的解耦,前端响应极为迅速,后端处理能力能够借由增添消费者数量来进行横向扩展,从容面对高并发洪峰。

数据一致性保障:事务性会话详解

MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
        // 处理接收到的消息
    }
});

于金融转账之类场景当中,保障消息既不丢失,又不重复,是极其关键重要的,而这种情况就会运用使用到事务性会话。它是借鉴了数据库事务的ACID原则,去确保消息发送以及接收的原子性。

在消息中间件中,事务性会话通常涉及以下概念:

事务性生产者,于事务范畴之内,所发送的多条消息,存在这样的情况,要么是全部成功地提交至服务器,要么是完全进行回滚操作,进而导致消费者无法看到尚未提交的消息。

承担事务性职责的消费者,在处置完消息之后,得于事务范畴之内对消息予以确认,该确认即ACK,也就是Acknowledge。唯有服务器接收此确认之后,才会真的将消息从存储渠道中加以移除。

倘若于确认进程里出现故障,并且事务进行回滚,那么消息会再次被投递。虽说事务会话致使了性能方面的开销,可是在对于数据一致性有着极高要求的核心链路当中,它是不可或缺的可靠性保障举措。开发者需要对业务需求予以权衡考量,在必要的时候能够选用半事务消息等更为轻量的模式去达成最终的一致性。

Connection connection = null;
Session session = null;
try {
    connection = connectionFactory.createConnection();
    connection.start();
    session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("TEST.QUEUE");
    MessageProducer producer = session.createProducer(destination);
    TextMessage message = session.createTextMessage("Hello World");
    // 开启事务
    session.beginTransaction();
    producer.send(message);
    // 提交事务
    session.commitTransaction();
} catch (JMSException e) {
    // 异常情况下的回滚操作
    try {
        if (session != null) {
            session.rollback();
        }
    } catch (JMSException ex) {
        ex.printStackTrace();
    }
    e.printStackTrace();
} finally {
    if (session != null) try { session.close(); } catch (JMSException ex) { ex.printStackTrace(); }
    if (connection != null) try { connection.close(); } catch (JMSException ex) { ex.printStackTrace(); }
}