MQ整合

# 消息队列

        消息队列也被称之为MQ,消息队列并不是什么高端的技术,只是微服务架构中的,解决应用的解耦、异步消息、流量削峰、消息分发的一个中间件;什么是消息队列呢?消息应该都理解是什么,队列呢,队列是大家在银行办理业务时人多了需要排队是一个概念,这也就不难理解消息队列了(可以理解成在排队的消息,并且里面有多个窗口)。 clipboard

# 消息队列的组成

# Broker

        消息服务器,作为server提供消息核心服务

# Producer

        消息生产者,业务的发起方,负责生产消息传输给broker,

# Consumer

        消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理

# Topic

        主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的 广播

# Queue

        队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收

# Message

        消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输

# 应用解耦

        以微服务中的订单系统,支付系统、库存系统为例,三个代表每个系统模块的微服务,当 用户下完订单后,需要告诉支付系统有用户已下单-->需要付款、需要减少库存等操作;如果耦合调用的话,当其中任何一个环节出现了问题,可能需要几分钟才能修复,都会订单失败,会导致服务在这段未修复的时间段中,服务不能用,可能会导致一部分用户的丢失或投诉等相关问题;如果使用消息队列降解耦后,订单系统只需要把下的订单发送到消息队列中,当其中某个环节在此出现上述的问题时,只需要先把消息存放在消息队列中,等系统修复完成之后,重新监听系统中的消息对消息进行消费操作即可。

应用解耦

# 流量削峰

        微服务中常见的订单系统为例,假如订单系统每秒钟只能处理10W条订单量,在正常的情况下是没有问题的,突然有一天每秒来了20W的订单需要处理,但是这时已经超出系统能承受的范围了,一般有时候会把订单限制在10W条之内,超出10W的直接做阻断的操作;这种虽然也可以解决,但是对用户可能就不是很友好了。消息队列中的流量削峰的概念可以解决以上出现的问题,用消息队列作为缓冲,当用户操作的时候先把用户的订单信息存放在消息队列中,当流量到达处理的顶峰的时候,可以使用消息堆积的方式,只处理到达顶峰之前的订单,另一部分等处理完成之后再进行处理。

# MQ保证消息的不丢失

# RabbitMQ

        1、选择MQ提供的事物功能,生产者在发布消息之前开启一个事物,然后进行消息的发送,mq接受到消息后,会自动提交事物,如果mq没有收到成功的消息,会异常报错,进行错误捕获,消息重新发送。

提示

缺点:效率低下、因为开启了mq的事物,会变成阻塞状态,等待处理结果结束后返回,再进行下一步的处理。

channel.txSelect();//开启事物
  try{
      //发送消息
  }catch(Exection e){
      channel.txRollback()//回滚事物
      //重新提交
  }

        2、在生产者哪里设置开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如何写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。

    //开启confirm
    channel.confirm();
    //发送成功回调
    public void ack(String messageId){
      
    }
    // 发送失败回调
    public void nack(String messageId){
        //重发该消息
    }

# 消费者弄丢数据解决方案

        使用rabbitmq提供的ack机制,首先关闭rabbitmq的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。

# kafka

# 消费端丢失

        1、消费端消息丢失是通过,关闭自定提交offset,在自己处理完毕之后手动提交offset,这样就可以保证消费端的不会把消息弄丢

# 自身消息丢失

        1、kafka自身消息丢失问题,可以同过设置参数来保证消息的不丢失: ① 给topic设置 replication.factor参数:这个值必须大于1,表示每个partition必须至少有两个副本; ②在kafka服务端设置min.isync.replicas参数:这个值必须大于1,表示 要求一个leader至少感知到有至少一个follower在跟自己保持联系正常同步数据,这样才能保证leader挂了之后还有一个follower。 ③在生产者端设置acks=all:表示 要求每条每条数据,必须是写入所有replica副本之后,才能认为是写入成功了 ④在生产者端设置retries=MAX(很大的一个值,表示无限重试):表示 这个是要求一旦写入事变,就无限重试。

# 生产消息丢失

        如果按照上面设置了ack=all,则一定不会丢失数据,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

# MQ之间的对比

特性 ActiveMQ RabbitMQ RocketMQ Kafka
API完备性
多语言支持 支持,Java优先 语言无关 只支持Java 支持,Java优先
单机吞吐量 万级 万级 万级 十万级
消息延迟 微秒级 毫秒级 毫秒级
可用性 (主从) 高(主从) 非常高(分布式)
消息丢失 理论上不会丢失 理论上不会丢失
最近更新时间: 8/24/2020, 11:25:59 AM