RabbitMQ 客户端源码系列 – Channel

2022-04-02 0 888
本次分享 RabbitMQ Client 与 RabbitMQ Broker 根据 AMQP 协议交互流程中 根据 channel 源码进行分析,其中还有很多 channel 源码细节感兴趣的读者可以进行深入了解

RabbitMQ 客户端源码系列 – Channel

前言

续上次分享 RabbitMQ 客户端源码系列 – Connection ,继续分享Channel相关的源码分析 (com.rabbitmq:amqp-client:4.8.3)。

友情提醒:本次分享适合的人群,需要对 RabbitMQ 有一定的了解

Channels

https://www.rabbitmq.com/channels.html。

RabbitMQ client Demo

基于上次 Java Client Connecting to RabbitMQ Demo 针对 RabbitMQ Channel 继续深入分析。

ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();
//本次重点分析内容
Channel channel = connection.createChannel();
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
channel.close();
connection.close();
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.

AMQP 协议交互 — Channel

可以看到简单地调用了 Channel channel = connection.createChannel(); 方法创建Channel,以及可以看到 Channel 相应的 AMQP 协议交互:「客户端发送 Channel.Open,broker 接收到后返回 Channel.OpenOk (客户端 创建通道;broker 收到并创建通道完成)」。

RabbitMQ 客户端源码系列 – Channel

整个 AMQP 协议的交互流程(172.30.0.74 为客户端即本机 ip;192.168.17.160 为 RabbitMQ Broker 的 ip)

RabbitMQ client 缓存模式为 Channel

本次分析 RabbitMQ client 采用缓存模式为 Channel:一个 Connection 对应多个 Channel(默认情况下 2048个 channel,其中一个是特殊 channel0)

  • 「Connection」:主要用于AMQP协议解析,信道复用。
  • 「Channel」:路由、安全性、协调。
  • 「Queue」:内存中的消息,永久队列索引(在 channel 与 queue 之间还有一个 exchange作为交换机,此处就不展开说了)。

RabbitMQ 客户端源码系列 – Channel

RabbitMQ client CacheMode为 Channel模式

channel 源码分析

上面简单地介绍 AMQP 协议交互流程中 Channel 连接、Connection 与 Channel的关系。

开始本次主要介绍 Channel 以及涉及到 Connection 相关的源码,从connection.createChannel开始深入分析。

/** Public API - {@inheritDoc} */
    @Override
    public Channel createChannel() throws IOException {
        // 确认 connection 为打开的状态
        ensureIsOpen();
       // 管理channel
        ChannelManager cm = _channelManager;
        if (cm == null) return null;
        // 创建 channel 核心的方法
        Channel channel = cm.createChannel(this);
        // 用于暴露指标
        metricsCollector.newChannel(channel);
        return channel;
    }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.

可以看到 channel 由 connection 调用并管理:

  • ensureIsOpen() — 确认 connection 为打开的状态,逻辑比较简单判断 shutdownCause 为空即可(connection关闭的话,shutdownCause同时会附带指示关闭的情况)。
  • channelManager — 统一由 connection 进行初始化及管理,在之前connection与broker 创建连接交互(Connection.Tune –> Connection.TuneOk)中初始化完成,默认 ChannelMax 为 2047 (2048 – 1,这个1对应的特殊的 channel0 )。

重点看下 channelManager.createChannel(this) 逻辑。

public ChannelN createChannel(AMQConnection connection) throws IOException {
        ChannelN ch;
     // 该 monitor 主要监控 _channelMap 和 channelNumberAllocator
        synchronized (this.monitor) {
           // 获取 channel 分配的编号
            int channelNumber = channelNumberAllocator.allocate();
            if (channelNumber == -1) {
                return null;
            } else {
                // 新增新的 channel
                ch = addNewChannel(connection, channelNumber);
            }
        }
     // 将新增的 channel 打开
        ch.open(); // now that it's been safely added
        return ch;
    }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

channelManager 管理着 channel 的创建连接释放等:

  • synchronized (this.monitor) — 首先获取 channelManager 的 monitor 锁,防止多线程并发操作。
  • channelNumberAllocator.allocate — 获取范围内未被分配的 channelNumber,返回 -1 则认为不可再分配新的 channel,内部主要的逻辑由 BitSet 实现的(感兴趣的可以了解下)。

后续重点分析 addNewChannel 和 open 逻辑。

private ChannelN addNewChannel(AMQConnection connection, int channelNumber) {
     // 判重
        if (_channelMap.containsKey(channelNumber)) {
            // That number's already allocated! Can't do it
            // This should never happen unless something has gone
            // badly wrong with our implementation.
            throw new IllegalStateException("We have attempted to "
                    + "create a channel with a number that is already in "
                    + "use. This should never happen. "
                    + "Please report this as a bug.");
        }
     // 构建
        ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);
     // 放入 _channelMap 统一管理
        _channelMap.put(ch.getChannelNumber(), ch);
        return ch;
    }
public ChannelN(AMQConnection connection, int channelNumber,
        ConsumerWorkService workService, MetricsCollector metricsCollector) {
        // AMQChannel 构造函数
        super(connection, channelNumber);
        // 构建 消费分配器
        this.dispatcher = new ConsumerDispatcher(connection, this, workService);
        this.metricsCollector = metricsCollector;
    }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.

这块逻辑比较简单,执行 instantiateChannel 构建和初始化 channel,主要涉及到 连接、channel编号、超时时间、dispatcher等等,每一个 channel 都拥有一个 dispatcher,但是 「连接和线程池」 是与同一个 connection 共享。

最终获取到新创建的 channel,进行打开 ch.open()。

public void open() throws IOException {
        // 对rabbitmq broker 发送Channel.Open,并等待broker返回 Channel.OpenOk
        exnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND));
    }
public AMQCommand exnWrappingRpc(Method m)
        throws IOException
    {
        try {
          // 针对该方法进行rpc调用
            return privateRpc(m);
        } catch (AlreadyClosedException ace) {
            // Do not wrap it since it means that connection/channel
            // was closed in some action in the past
            throw ace;
        } catch (ShutdownSignalException ex) {
            throw wrap(ex);
        }
    }
private AMQCommand privateRpc(Method m)
        throws IOException, ShutdownSignalException
    {
      // 用于 rpc调用过程中 阻塞等待
        SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
        rpc(m, k);
        // 不超时等待
        if(_rpcTimeout == NO_RPC_TIMEOUT) {
            return k.getReply();
        } else {
            try {
               // 超时等待
                return k.getReply(_rpcTimeout);
            } catch (TimeoutException e) {
                throw wrapTimeoutException(m, e);
            }
        }
    }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.

打开新的 channel 逻辑比较简单:主要是和 rabbitmq broker 进行 rpc 调用:「客户端发送 Channel.Open,broker 接收到后返回 Channel.OpenOk,这个过程完成后 创建通道完成,也就可以进行后续的 channel使用」。

最后

本次分享 RabbitMQ Client 与 RabbitMQ Broker 根据 AMQP 协议交互流程中 根据 channel 源码进行分析,其中还有很多 channel 源码细节感兴趣的读者可以进行深入了解。

申明:本文由第三方发布,内容仅代表作者观点,与本网站无关。对本文以及其中全部或者部分内容的真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。本网发布或转载文章出于传递更多信息之目的,并不意味着赞同其观点或证实其描述,也不代表本网对其真实性负责。

七爪网 行业资讯 RabbitMQ 客户端源码系列 – Channel https://www.7claw.com/7788.html

七爪网源码交易平台

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务