52.源代码解读-RocketMQ消息写入机制

一. 前言

RocketMQ采用内存和磁盘存储来存储消息。那现在来分析一下消息存储的流程

成都创新互联公司主要从事网站设计、网站建设、网页设计、企业做网站、公司建网站等业务。立足成都服务白碱滩,10余年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:028-86922220

二. 代码流程

在Broker启动的时候会拉起相关服务
流程如下:

52.源代码解读-RocketMQ消息写入机制

流程图引用网址
http://blog.csdn.net/akfly/article/details/53447000

三. 代码流程

由于是Broker来存储消息,那么消息入口的代码应该是在Broker里面,而Broker的入口是BrokerStartup,以及重要的BrokerController。
具体流程可以参考Broker启动源代码分析。

Broker启动流程

以发送消息为例

1. Broker启动注册发送消息处理器

Broker启动的时候,会注册一个SendMessageProcesser来响应netty的发送消息请求,如下:

public void registerProcessor() {
        /**
         * SendMessageProcessor
         */
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
}

2. 消息处理器处理发送者发送过来的消息

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

    @Override
    public RemotingCommand proce***equest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        SendMessageContext mqtraceContext;
                ...
        switch (request.getCode()) {
            response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
        }
    }
}       

继续看sendMessage..

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
        ...
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}               

调用MessageStore.putMessage(msgInner)


分享名称:52.源代码解读-RocketMQ消息写入机制
分享链接:http://bzwzjz.com/article/pojohp.html

其他资讯

Copyright © 2007-2020 广东宝晨空调科技有限公司 All Rights Reserved 粤ICP备2022107769号
友情链接: 泸州网站建设 定制级高端网站建设 网站设计制作 成都网站制作 成都网站建设推广 成都网站建设 攀枝花网站设计 成都网站制作 成都网站设计 移动网站建设 品牌网站建设 成都网站建设 自适应网站设计 营销网站建设 营销型网站建设 成都网站设计制作公司 专业网站设计 成都网站设计 专业网站设计 H5网站制作 成都网站建设 响应式网站设计