sofa-rpc是阿里开源的一款高性能的rpc框架,这篇文章主要是对sofa-rpc provider启动服务流程的一个代码走读,下面是我简单绘制的一个基本的关系流程图

下面我们根据sofa-rpc代码,对流程进行一个跟踪与走读。我们以BoltServer的为例

public static void main(String[] args) {

ApplicationConfig application = new ApplicationConfig().setAppName("test-server");

ServerConfig serverConfig = new ServerConfig()

.setPort(22000)

.setDaemon(false);

ProviderConfig providerConfig = new ProviderConfig()

.setInterfaceId(HelloService.class.getName())

.setApplication(application)

.setRef(new HelloServiceImpl())

.setServer(serverConfig)

.setRegister(false);

ProviderConfig providerConfig2 = new ProviderConfig()

.setInterfaceId(EchoService.class.getName())

.setApplication(application)

.setRef(new EchoServiceImpl())

.setServer(serverConfig)

.setRegister(false);

providerConfig.export();

providerConfig2.export();

LOGGER.warn("started at pid {}", RpcRuntimeContext.PID);

}

可以看到sofa-rpc通过ProviderConfig类对服务提供方Provider进行了配置信息的初始化,同时也提供了export做为服务启动的入口。

public synchronized void export() {        if (providerBootstrap == null) {

providerBootstrap = Bootstraps.from(this);

}

providerBootstrap.export();

}

根据ProviderConfig中setBootstrap()配置的Bootstrap类型,我们通过Bootstaps.from(this)可以获取到不同的Bootstrap引导服务,分别是DefaultProviderBootstrap与 DubboProviderBootstrap

/**

* 发布一个服务

*

* @param providerConfig 服务发布者配置

* @param             接口类型

* @return 发布启动类     */

public static  ProviderBootstrap from(ProviderConfig providerConfig) {

String bootstrap = providerConfig.getBootstrap();        if (StringUtils.isEmpty(bootstrap)) {            // Use default provider bootstrap 无的话就返回默认DefaultProviderBootstrap

bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_PROVIDER_BOOTSTRAP);

providerConfig.setBootstrap(bootstrap);

}

ProviderBootstrap providerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ProviderBootstrap.class)

.getExtension(bootstrap, new Class[] { ProviderConfig.class }, new Object[] { providerConfig });        return (ProviderBootstrap) providerBootstrap;

}

DefaultProviderBootstrap与 DubboProviderBootstrap 都继承自ProviderBootstrap。

DefaultProviderBootstrap又被BoltProviderBootstrap、Http2ClearTextProviderBootstrap、RestProviderBootstrap三个类所继承,这其实对应了sofa-rpc中的三种server服务方式。

我们看下DefaultProviderBootstrap服务启动源码

@Override    public void export() {        if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒

Thread thread = factory.newThread(new Runnable() {

@Override                public void run() {                    try {

Thread.sleep(providerConfig.getDelay());

} catch (Throwable ignore) { // NOPMD                    }

doExport();

}

});

thread.start();

} else {

doExport();

}

}    private void doExport() {        if (exported) {            return;

}        // 检查参数        checkParameters();

String appName = providerConfig.getAppName();        //key  is the protocol of server,for concurrent safe

Map hasExportedInCurrent = new ConcurrentHashMap();        // 将处理器注册到server

List serverConfigs = providerConfig.getServer();        for (ServerConfig serverConfig : serverConfigs) {

String protocol = serverConfig.getProtocol();

String key = providerConfig.buildKey() + ":" + protocol;            if (LOGGER.isInfoEnabled(appName)) {

LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());

}            // 注意同一interface,同一uniqleId,不同server情况

AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器

if (cnt == null) { // 没有发布过

cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));

}            int c = cnt.incrementAndGet();

hasExportedInCurrent.put(serverConfig.getProtocol(), true);            int maxProxyCount = providerConfig.getRepeatedExportLimit();            if (maxProxyCount > 0) {                if (c > maxProxyCount) {

decrementCounter(hasExportedInCurrent);                    // 超过最大数量,直接抛出异常

throw new SofaRpcRuntimeException("Duplicate provider config with key " + key                        + " has been exported more than " + maxProxyCount + " times!"

+ " Maybe it's wrong config, please check it."

+ " Ignore this if you did that on purpose!");

} else if (c > 1) {                    if (LOGGER.isInfoEnabled(appName)) {

LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!"

+ " Maybe it's wrong config, please check it."

+ " Ignore this if you did that on purpose!", key);

}

}

}

}        try {            // 构造请求调用器

providerProxyInvoker = new ProviderProxyInvoker(providerConfig);            // 初始化注册中心

if (providerConfig.isRegister()) {

List registryConfigs = providerConfig.getRegistry();                if (CommonUtils.isNotEmpty(registryConfigs)) {                    for (RegistryConfig registryConfig : registryConfigs) {

RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry                    }

}

}            // 将处理器注册到server

for (ServerConfig serverConfig : serverConfigs) {                try {                    //构建Server

Server server = serverConfig.buildIfAbsent();                    // 注册序列化接口                    server.registerProcessor(providerConfig, providerProxyInvoker);                    if (serverConfig.isAutoStart()) {                        //启动服务                        server.start();

}

} catch (SofaRpcRuntimeException e) {                    throw e;

} catch (Exception e) {

LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "

+ serverConfig.getId(), e);

}

}            // 注册到注册中心

providerConfig.setConfigListener(new ProviderAttributeListener());

register();

} catch (Exception e) {

decrementCounter(hasExportedInCurrent);            if (e instanceof SofaRpcRuntimeException) {                throw (SofaRpcRuntimeException) e;

} else {                throw new SofaRpcRuntimeException("Build provider proxy error!", e);

}

}        // 记录一些缓存数据

RpcRuntimeContext.cacheProviderConfig(this);

exported = true;

}

代码中通过serverConfig.buildIfAbsent()构建Server服务对象,而在buildIfAbsent()函数中我们可以看到,sever是通过SeverFactory工厂获取到的,在SeverFactory的getSever()中根据SeverConfig的配置获取Sever的具体实例,并执行Init()进行初始化。

/**

* 启动服务

*

* @return the server     */

public synchronized Server buildIfAbsent() {        if (server != null) {            return server;

}        // 提前检查协议+序列化方式        // ConfigValueHelper.check(ProtocolType.valueOf(getProtocol()),        //                SerializationType.valueOf(getSerialization()));

//在sever工厂中拿到sever实例

server = ServerFactory.getServer(this);        return server;

}

/**

* 初始化Server实例

*

* @param serverConfig 服务端配置

* @return Server     */

public synchronized static Server getServer(ServerConfig serverConfig) {        try {

Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort()));            if (server == null) {                // 算下网卡和端口                resolveServerConfig(serverConfig);

ExtensionClass ext = ExtensionLoaderFactory.getExtensionLoader(Server.class)

.getExtensionClass(serverConfig.getProtocol());                if (ext == null) {                    throw ExceptionUtils.buildRuntime("server.protocol", serverConfig.getProtocol(),                        "Unsupported protocol of server!");

}

server = ext.getExtInstance();                //服务初始化                server.init(serverConfig);

SERVER_MAP.put(serverConfig.getPort() + "", server);

}            return server;

} catch (SofaRpcRuntimeException e) {            throw e;

} catch (Throwable e) {            throw new SofaRpcRuntimeException(e.getMessage(), e);

}

}

sofa-rpc提供了三种server类型 BoltServer,RestServer与AbstractHttpServer

BoltServer中通讯底层通过RemotingServer实现的,RemotingServer是基于阿里sofa-bolt通信框架开发的。

/**

* Bolt服务端     */

protected RemotingServer       remotingServer;

@Override    public void start() {        if (started) {            return;

}        synchronized (this) {            if (started) {                return;

}            // 生成阿里基于netty的bolt服务Server对象

remotingServer = initRemotingServer();            try {                if (remotingServer.start(serverConfig.getBoundHost())) {                    if (LOGGER.isInfoEnabled()) {

LOGGER.info("Bolt server has been bind to {}:{}", serverConfig.getBoundHost(),

serverConfig.getPort());

}

} else {                    throw new SofaRpcRuntimeException("Failed to start bolt server, see more detail from bolt log.");

}

started = true;                if (EventBus.isEnable(ServerStartedEvent.class)) {

EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));

}

} catch (SofaRpcRuntimeException e) {                throw e;

} catch (Exception e) {                throw new SofaRpcRuntimeException("Failed to start bolt server!", e);

}

}

}

AbstractHttpServer 提供http服务,底层通信通过ServerTransport类实现的

/**

* 服务端通讯层     */

private ServerTransport         serverTransport;

@Override    public void init(ServerConfig serverConfig) {        this.serverConfig = serverConfig;        this.serverTransportConfig = convertConfig(serverConfig);        // 启动线程池

this.bizThreadPool = initThreadPool(serverConfig);        // 服务端处理器

this.serverHandler = new HttpServerHandler();        // set default transport config

this.serverTransportConfig.setContainer(container);        this.serverTransportConfig.setServerHandler(serverHandler);

}

@Override    public void start() {        if (started) {            return;

}        synchronized (this) {            if (started) {                return;

}            try {                // 启动线程池

this.bizThreadPool = initThreadPool(serverConfig);                this.serverHandler.setBizThreadPool(bizThreadPool);                //实例化服务,具体代码见

serverTransport = ServerTransportFactory.getServerTransport(serverTransportConfig);

started = serverTransport.start();                if (started) {                    if (EventBus.isEnable(ServerStartedEvent.class)) {

EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));

}

}

} catch (SofaRpcRuntimeException e) {                throw e;

} catch (Exception e) {                throw new SofaRpcRuntimeException("Failed to start HTTP/2 server!", e);

}

}

}

ServerTransport是个抽象类,具体实现为transport包下AbstractHttp2ServerTransport

/**

* 构造函数

*

* @param transportConfig 服务端配置     */

protected AbstractHttp2ServerTransport(ServerTransportConfig transportConfig) {        super(transportConfig);

}

@Override    public boolean start() {        if (serverBootstrap != null) {            return true;

}        synchronized (this) {            if (serverBootstrap != null) {                return true;

}            boolean flag = false;

SslContext sslCtx = SslContextBuilder.build();            // Configure the server.

EventLoopGroup bossGroup = NettyHelper.getServerBossEventLoopGroup(transportConfig);            //可以看到然是基于Netty

HttpServerHandler httpServerHandler = (HttpServerHandler) transportConfig.getServerHandler();

bizGroup = NettyHelper.getServerBizEventLoopGroup(transportConfig, httpServerHandler.getBizThreadPool());

serverBootstrap = new ServerBootstrap();

serverBootstrap.group(bossGroup, bizGroup)

.channel(transportConfig.isUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG, transportConfig.getBacklog())

.option(ChannelOption.SO_REUSEADDR, transportConfig.isReuseAddr())

.option(ChannelOption.RCVBUF_ALLOCATOR, NettyHelper.getRecvByteBufAllocator())

.option(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())

.childOption(ChannelOption.SO_KEEPALIVE, transportConfig.isKeepAlive())

.childOption(ChannelOption.TCP_NODELAY, transportConfig.isTcpNoDelay())

.childOption(ChannelOption.SO_RCVBUF, 8192 * 128)

.childOption(ChannelOption.SO_SNDBUF, 8192 * 128)

.handler(new LoggingHandler(LogLevel.DEBUG))

.childOption(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())

.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(

transportConfig.getBufferMin(), transportConfig.getBufferMax()))

.childHandler(new Http2ServerChannelInitializer(bizGroup, sslCtx,

httpServerHandler, transportConfig.getPayload()));            // 绑定到全部网卡 或者 指定网卡

ChannelFuture future = serverBootstrap.bind(                new InetSocketAddress(transportConfig.getHost(), transportConfig.getPort()));

ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() {

@Override                public void operationComplete(ChannelFuture future) throws Exception {                    if (future.isSuccess()) {                        if (LOGGER.isInfoEnabled()) {

LOGGER.info("HTTP/2 Server bind to {}:{} success!",

transportConfig.getHost(), transportConfig.getPort());

}

} else {

LOGGER.error("HTTP/2 Server bind to {}:{} failed!",

transportConfig.getHost(), transportConfig.getPort());

stop();

}

}

});            try {

channelFuture.await();                if (channelFuture.isSuccess()) {

flag = Boolean.TRUE;

} else {                    throw new SofaRpcRuntimeException("Server start fail!", future.cause());

}

} catch (InterruptedException e) {

LOGGER.error(e.getMessage(), e);

}            return flag;

}

}

RestServer 提供Rest服务,底层通信实现具体可见SofaNettyJaxrsServer。

/**

* Rest服务端     */

protected SofaNettyJaxrsServer httpServer;

@Override    public void init(ServerConfig serverConfig) {        this.serverConfig = serverConfig;

httpServer = buildServer();

}

SofaNettyJaxrsServer中服务启动的具体代码

@Override    public void start() {        // CHANGE: 增加线程名字

boolean daemon = serverConfig.isDaemon();        boolean isEpoll = serverConfig.isEpoll();

NamedThreadFactory ioFactory = new NamedThreadFactory("SEV-REST-IO-" + port, daemon);

NamedThreadFactory bizFactory = new NamedThreadFactory("SEV-REST-BIZ-" + port, daemon);

eventLoopGroup = isEpoll ? new EpollEventLoopGroup(ioWorkerCount, ioFactory)

: new NioEventLoopGroup(ioWorkerCount, ioFactory);

eventExecutor = isEpoll ? new EpollEventLoopGroup(executorThreadCount, bizFactory)

: new NioEventLoopGroup(executorThreadCount, bizFactory);        // Configure the server.

bootstrap = new ServerBootstrap()

.group(eventLoopGroup)

.channel(isEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)

.childHandler(createChannelInitializer())

.option(ChannelOption.SO_BACKLOG, backlog)

.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isKeepAlive()); // CHANGE: setKeepAlive

for (Map.Entry entry : channelOptions.entrySet()) {

bootstrap.option(entry.getKey(), entry.getValue());

}        for (Map.Entry entry : childChannelOptions.entrySet()) {

bootstrap.childOption(entry.getKey(), entry.getValue());

}        final InetSocketAddress socketAddress;        if (null == hostname || hostname.isEmpty()) {

socketAddress = new InetSocketAddress(port);

} else {

socketAddress = new InetSocketAddress(hostname, port);

}

bootstrap.bind(socketAddress).syncUninterruptibly();

}

OK,以上就是sofa-rpc服务端启动的一个基本的流程,这里关注的只是简单的服务启动流程,没有深入代码功能进行分析,在此基础上,我们可以进一步探究代码的具体实现。

sofa协议服务器,sofa-rpc 服务端源码流程走读相关推荐

  1. java sofa rpc_sofa-rpc服务端源码的详细分析(附流程图)

    本篇文章给大家带来的内容是关于sofa-rpc服务端源码的详细分析(附流程图),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. sofa-rpc是阿里开源的一款高性能的rpc框架,这篇 ...

  2. 畅玩mt3单机游戏服务器维护,【梦幻西游】MT3仿端手工游戏服务端源码[教程+授权物品后台]...

    [梦幻西游]MT3仿端手工游戏服务端源码[教程+授权物品后台] 架设教程 系统:CentOS 6.8  64位 1.关闭防火墙 chkconfig iptables off service iptab ...

  3. Netty源码阅读(2)之——服务端源码梗概

    上文我们把客户端源码梗概大致了解了一下,这样再了解服务端源码就轻松一点,我们将从服务端和客户端的区别着手去解析. 目录 区别 ④ ③ ① ⑤ 区别 ④ 客户端:.option(ChannelOptio ...

  4. 五子棋服务端程序java_9网上五子棋对战(java)服务端源码

    9网上五子棋对战(java)服务端源码 网上五子棋对战(java)服务端源码 /* 五子棋游戏是本人在学习java swing时写的一个程序,程序分两部分:服务器端和客户端.运行程序时先运行服务器端, ...

  5. java计算机毕业设计社区养老综合服务平台服务端源码+系统+数据库+lw文档+mybatis+运行部署

    java计算机毕业设计社区养老综合服务平台服务端源码+系统+数据库+lw文档+mybatis+运行部署 java计算机毕业设计社区养老综合服务平台服务端源码+系统+数据库+lw文档+mybatis+运 ...

  6. java毕业设计融呗智慧金融微资讯移动平台服务端源码+lw文档+mybatis+系统+mysql数据库+调试

    java毕业设计融呗智慧金融微资讯移动平台服务端源码+lw文档+mybatis+系统+mysql数据库+调试 java毕业设计融呗智慧金融微资讯移动平台服务端源码+lw文档+mybatis+系统+my ...

  7. java计算机毕业设计融呗智慧金融微资讯移动平台服务端源码+系统+数据库+lw文档+mybatis+运行部署

    java计算机毕业设计融呗智慧金融微资讯移动平台服务端源码+系统+数据库+lw文档+mybatis+运行部署 java计算机毕业设计融呗智慧金融微资讯移动平台服务端源码+系统+数据库+lw文档+myb ...

  8. java计算机毕业设计教育辅导班信息网服务端源码+mysql数据库+系统+lw文档+部署

    java计算机毕业设计教育辅导班信息网服务端源码+mysql数据库+系统+lw文档+部署 java计算机毕业设计教育辅导班信息网服务端源码+mysql数据库+系统+lw文档+部署 本源码技术栈: 项目 ...

  9. java毕业设计社区养老综合服务平台服务端源码+lw文档+mybatis+系统+mysql数据库+调试

    java毕业设计社区养老综合服务平台服务端源码+lw文档+mybatis+系统+mysql数据库+调试 java毕业设计社区养老综合服务平台服务端源码+lw文档+mybatis+系统+mysql数据库 ...

最新文章

  1. Debug常用命令 精简版本
  2. 【Git 第2课】 GitHub是什么?
  3. eBay数据科学家李睿:自然语言处理在eBay的技术实践 数据 网络 类别 技术 分类器 阅读1593 近日,在飞马网主办的“FMI人工智能大数据高峰论坛”上,来自eBay的数据科学家李睿
  4. Python内置函数zip map filter的使用
  5. 视频 + PPT | 数字化运营,从理论到实践
  6. WINDOWS键盘事件的挂钩监控原理及其应用技术
  7. windows下的虚拟机中的ubuntu系统如何连接无线网(wifi)
  8. arduino nano 蓝牙_贸泽开售结合蓝牙5.2与USB 2.0的 Nordic Semiconductor nRF52820多协议SoC...
  9. Java 基础【01】 This 用法
  10. 软考高项历年作文真题
  11. 利用spring集成redis使用
  12. 安全事件关联分析方法
  13. 程序员合同日期不到想辞职_在职场,辞职有时是难免的,要怎样写辞职信才好呢...
  14. imageJ执行宏脚本出现了灰蒙蒙的图片。
  15. 2020年中国各省GDP简析
  16. 朋也bbs开源学习(一)
  17. execjs 模块 call() 方法报错 AttributeError: ‘NoneType‘ object has no attribute ‘replace‘
  18. 黑马程序员--JavaJAVA 正则表达式 (超详细)
  19. 苏炳添博士论文研究自己,奥运学术两兼顾,还是暨大副教授,网友:真正的Run数据...
  20. 全屏Dialog的几种实现

热门文章

  1. C语言版--单链表排序,冒泡排序,选择排序,插入排序,快速排序,应有尽有,保证看懂,没有bug!交换节点版本!
  2. android实现计算器功能吗,简单实现Android计算器功能
  3. 你还在找免费的室内家装SU模型吗?
  4. 1skp素材 su模型在enscape不显示?怎么办?
  5. 可靠的UDP (RUDP)
  6. 负载均衡Load Balance(F5 \ nginx \ LVS \ DNS轮询)
  7. Python爬取哔哩哔哩视频的相关信息后续
  8. VR系列——Oculus Rift 开发者指南:五、色差
  9. 读书之一--《程序员修炼之道》
  10. C语言单链表基本操作总结