本次继续分享 RabbitMQ Client pulish -- 发送消息,客户先将 之前分享过的端源 RabbitMQ 客户端源码 - Connection 和 RabbitMQ 客户端源码 - Channel 和 发布消息 - Pulish Message 做个小总结(还是基于之前的 Java Client Connecting to RabbitMQ Demo )。 由图可知 RabbitMQ 发布消息流程:ConnectionFactory --> Connection --> Channel --> Pulish Message。码系 老套路 -- 分享源码之前先 抓包看看(「助于快速理解」)。客户 抓包可以看到:pulisher(RabbitMQ 消息发送者) 与 Broker(RabbitMQ Broker)打开 Channel 后,端源又发起了 Confirm.Select/Select-Ok -- 通知 Broker 接收到发布消息需要确认,码系因此也就有了后续的客户 Basic.Pulish/Ack。 梳理完交互流程 我们开始进入今天的端源主题 Pulish Message。 「发布消息总入口 -- ChannelN.basicPublish()」。码系 /** Public API - { @inheritDoc} */ @Override public void basicPublish(String exchange,客户 String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException { basicPublish(exchange, routingKey, mandatory, false, props, body); } /** Public API - { @inheritDoc} */ @Override public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException { // Pulisher 配置了 `Confirm.Select` nextPublishSeqNo 设置从 1 开始 // 将未确认的消息 放入 unconfirmedSet,并 自增加一 if (nextPublishSeqNo > 0) { unconfirmedSet.add(getNextPublishSeqNo()); nextPublishSeqNo++; } BasicProperties useProps = props; if (props == null) { useProps = MessageProperties.MINIMAL_BASIC; } // 构造 AMQCommand 并传输 transmit(new AMQCommand(new Basic.Publish.Builder() .exchange(exchange) .routingKey(routingKey) .mandatory(mandatory) .immediate(immediate) .build(),端源 useProps, body)); // 用于指标统计和监控,默认是码系 NoOpMetricsCollector,需要配置才会可以使用 提供的客户 MicrometerMetricsCollector 和 StandardMetricsCollector(引入对应的包和配置 开箱即可食用~) metricsCollector.basicPublish(this); 值得一提,亿华云计算RabbitMQ Client 应用消息的端源最小单位是 Frame (帧,在Connection篇提到过),码系Frame 主要由 type 类型、channel 通道、payload 消息内容字节、accumulator 写出数据、NON_BODY_SIZE 构成。 「Frame结构」public class Frame { /** Frame type code */ // FRAME_HEARTBEAT :心跳, FRAME_METHOD: 方法, FRAME_HEADER : 头部信息, FRAME_BODY 内容主题 public final int type; /** Frame channel number, 0-65535 */ // channel 序列号 public final int channel; /** Frame payload bytes (for inbound frames) */ // 消息内容字节 private final byte[] payload; /** Frame payload (for outbound frames) */ // 写出数据 private final ByteArrayOutputStream accumulator; private static final int NON_BODY_SIZE = 1 /* type */ + 2 /* channel */ + 4 /* payload size */ + 1 /* end character */; ... AMQP 0-9-1 特定的 「Command」 读取,是从一系列帧中累积 方法、头部和正文。 / * Construct a command with a specified method, header and body. * @param method the wrapped method * @param contentHeader the wrapped content header * @param body the message body data */ public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body) { this.assembler = new CommandAssembler((Method) method, contentHeader, body); } // AMQP 0-9-1 特定的Command,构造 方法、头部和正文 public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body) { this.method = method; this.contentHeader = contentHeader; this.bodyN = new ArrayList this.bodyLength = 0; this.remainingBodyBytes = 0; appendBodyFragment(body); if (method == null) { this.state = CAState.EXPECTING_METHOD; } else if (contentHeader == null) { this.state = method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE; } else { this.remainingBodyBytes = contentHeader.getBodySize() - this.bodyLength; updateContentBodyState(); } }「传输 AMQCommand -- Channel.transmit()」public void transmit(AMQCommand c) throws IOException { synchronized (_channelMutex) { // 确认 channel 是高防服务器否打开(逻辑比较简单:判断 shutdownCause 为空即是打开) ensureIsOpen(); quiescingTransmit(c); } } public void quiescingTransmit(AMQCommand c) throws IOException { // 防止并发同时使用 同一个channel synchronized (_channelMutex) { // 判断 该消息是否 携带content,如果有 需要判断该 channel是否是阻塞(如果channel state为 `FLOW` 即为 阻塞 _blockContent = true) if (c.getMethod().hasContent()) { while (_blockContent) { try { _channelMutex.wait(); } catch (InterruptedException ignored) { } // 防止 从阻塞中被唤醒时,channel 已经关闭(挺好的一个 多线程操作的案例) ensureIsOpen(); } } c.transmit(this); } }「AMQCommand.transmit」/ * Sends this command down the named channel on the channels * connection, possibly in multiple frames. * @param channel the channel on which to transmit the command * @throws IOException if an error is encountered */ public void transmit(AMQChannel channel) throws IOException { // 每个 channel 都有序列号 从 0开始,(0是特殊的channel) int channelNumber = channel.getChannelNumber(); AMQConnection connection = channel.getConnection(); synchronized (assembler) { // 方法:FRAME_HEARTBEAT :心跳, FRAME_METHOD: 方法, FRAME_HEADER : 头部信息, FRAME_BODY 内容主题 Method m = this.assembler.getMethod(); if (m.hasContent()) { byte[] body = this.assembler.getContentBody(); // FRAME_HEADER : 头部信息 Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length); int frameMax = connection.getFrameMax(); boolean cappedFrameMax = frameMax > 0; int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length; if (cappedFrameMax && headerFrame.size() > frameMax) { String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax); throw new IllegalArgumentException(msg); } // 1. 写 channelNumber帧 FRAME_METHOD connection.writeFrame(m.toFrame(channelNumber)); // 2. 写 头部信息帧 AMQP.FRAME_HEADER connection.writeFrame(headerFrame); // 3. 如果 body过多,会拆成多个帧 AMQP.FRAME_BODY for (int offset = 0; offset < body.length; offset += bodyPayloadMax) { int remaining = body.length - offset; int fragmentLength = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax; Frame frame = Frame.fromBodyFragment(channelNumber, body, offset, fragmentLength); connection.writeFrame(frame); } } else { // 1. 写 channelNumber帧 FRAME_METHOD connection.writeFrame(m.toFrame(channelNumber)); } } // 最后刷新 输出缓冲区 connection.flush(); }「最后分析下 connection.writeFrame(frame)」/ * Public API - sends a frame directly to the broker. */ public void writeFrame(Frame f) throws IOException { _frameHandler.writeFrame(f); // lastActivityTime _heartbeatSender.signalActivity(); } @Override public void writeFrame(Frame frame) throws IOException { synchronized (_outputStream) { frame.writeTo(_outputStream); } } / * Public API - writes this Frame to the given DataOutputStream */ public void writeTo(DataOutputStream os) throws IOException { // 1. 写type 类型 os.writeByte(type); // 2. 写channel 序列号 os.writeShort(channel); if (accumulator != null) { // 3. 写出数据大小 os.writeInt(accumulator.size()); // 4. 输出数据 accumulator.writeTo(os); } else { // 3. 写消息内容字节大小 os.writeInt(payload.length); // 4. 写消息内容 os.write(payload); } // 5. 帧结束标志位 os.write(AMQP.FRAME_END); 希望可以让你们 对于 RabbitMQ Client 与 RabbitMQ Broker 根据 AMQP协议 发布消息 有个清晰的认识,并有助于你们熟悉ChannelN.basicPublish 源码,当然其中还有很多细节源码需要读者慢慢品味。RabbitMQ pulisher 小总结
Basic Publish & Ack