编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

motan源码分析一:服务发布及注册(moloch 源码分析)

wxchong 2024-08-18 00:38:54 开源技术 13 ℃ 0 评论

motan是新浪微博开源的服务治理框架,具体介绍请看:http://tech.sina.com.cn/i/2016-05-10/doc-ifxryhhh1869879.shtml.

本系列的文章将分析它的底层源码,分析的源码版本为:0.1.2。第一篇文章将以服务的发布和注册开始,注册服务使用zookeeper来分析。源码地址:https://github.com/weibocom/motan

本文涉及到的主要类和接口:MotanApiExportDemo、MotanDemoService、MotanDemoServiceImpl、ServiceConfig、RegistryConfig、ProtocolConfig、DefaultProvider、ZookeeperRegistryFactory、ZookeeperRegistry、SimpleConfigHandler、ProtocolFilterDecorator等。

1.首先来看demo源码:MotanApiExportDemo

demo中先后创建了ServiceConfig、RegistryConfig和ProtocolConfig相关的对象,其中ServiceConfig是我们提供服务的相关配置(每个服务一个配置,例如一个服务接口一个配置,本文中的具体服务是:MotanDemoServiceImpl)、RegistryConfig是注册中心相关的配置信息、ProtocolConfig是应用协议相关的配置(在客户端还负责集群相关的配置)。

 ServiceConfig<MotanDemoService> motanDemoService = new ServiceConfig<MotanDemoService>();
 // 设置接口及实现类
 motanDemoService.setInterface(MotanDemoService.class);//设置服务接口,客户端在rpc调用时,会在协议中传递接口名称,从而实现与具体实现类一一对应
 motanDemoService.setRef(new MotanDemoServiceImpl());//设置接口实现类,实际的业务代码

 // 配置服务的group以及版本号
 motanDemoService.setGroup("motan-demo-rpc");//服务所属的组
 motanDemoService.setVersion("1.0");

 // 配置注册中心直连调用
 RegistryConfig registry = new RegistryConfig();
 //use local registry
 registry.setRegProtocol("local");
 // use ZooKeeper registry
// registry.setRegProtocol("zookeeper");
// registry.setAddress("127.0.0.1:2181");
 // registry.setCheck("false"); //是否检查是否注册成功
 motanDemoService.setRegistry(registry);

 // 配置RPC协议
 ProtocolConfig protocol = new ProtocolConfig();
 protocol.setId("motan");//使用motan应用协议
 protocol.setName("motan");
 motanDemoService.setProtocol(protocol);
 motanDemoService.setExport("motan:8002");//本服务的监控端口号是8002
 motanDemoService.export();//发布及在zookeeper上注册此服务
 MotanSwitcherUtil.setSwitcherValue(MotanConstants.REGISTRY_HEARTBEAT_SWITCHER, true);
 System.out.println("server start...");

2.从上面的代码可知ServiceConfig类是服务的发布及注册的核心是motanDemoService.export()方法,我们来看一下此方法的实现细节:

public synchronized void export() {
 if (exported.get()) {
 LoggerUtil.warn(String.format("%s has already been expoted, so ignore the export request!", interfaceClass.getName()));
 return;
 }
 checkInterfaceAndMethods(interfaceClass, methods);
 List<URL> registryUrls = loadRegistryUrls();//加载注册中心的url,支持多个注册中心
 if (registryUrls == null || registryUrls.size() == 0) {
 throw new IllegalStateException("Should set registry config for service:" + interfaceClass.getName());
 }
 Map<String, Integer> protocolPorts = getProtocolAndPort();
 for (ProtocolConfig protocolConfig : protocols) {
 Integer port = protocolPorts.get(protocolConfig.getId());
 if (port == null) {
 throw new MotanServiceException(String.format("Unknow port in service:%s, protocol:%s", interfaceClass.getName(), protocolConfig.getId()));
 }
 doExport(protocolConfig, port, registryUrls);//发布服务
 }
 afterExport();
 }

方法中调用了doExport和afterExport方法:

private void doExport(ProtocolConfig protocolConfig, int port, List<URL> registryURLs) {
 String protocolName = protocolConfig.getName();//获取协议名称,此处为motan
 if (protocolName == null || protocolName.length() == 0) {
 protocolName = URLParamType.protocol.getValue();
 }
 String hostAddress = host;//本机地址
 if (StringUtils.isBlank(hostAddress) && basicService != null) {
 hostAddress = basicService.getHost();
 }
 if (NetUtils.isInvalidLocalHost(hostAddress)) {
 hostAddress = getLocalHostAddress(registryURLs);
 }
 Map<String, String> map = new HashMap<String, String>();
 map.put(URLParamType.nodeType.getName(), MotanConstants.NODE_TYPE_SERVICE);
 map.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));
 collectConfigParams(map, protocolConfig, basicService, extConfig, this);
 collectMethodConfigParams(map, this.getMethods());
 URL serviceUrl = new URL(protocolName, hostAddress, port, interfaceClass.getName(), map);//组装serviceUrl信息
 if (serviceExists(serviceUrl)) {//判断服务之前是否已经加载过
 LoggerUtil.warn(String.format("%s configService is malformed, for same service (%s) already exists ", interfaceClass.getName(),
 serviceUrl.getIdentity()));
 throw new MotanFrameworkException(String.format("%s configService is malformed, for same service (%s) already exists ",
 interfaceClass.getName(), serviceUrl.getIdentity()), MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);//抛出同名服务异常
 }
 List<URL> urls = new ArrayList<URL>();
 // injvm 协议只支持注册到本地,其他协议可以注册到local、remote
 if (MotanConstants.PROTOCOL_INJVM.equals(protocolConfig.getId())) {
 URL localRegistryUrl = null;
 for (URL ru : registryURLs) {
 if (MotanConstants.REGISTRY_PROTOCOL_LOCAL.equals(ru.getProtocol())) {
 localRegistryUrl = ru.createCopy();
 break;
 }
 }
 if (localRegistryUrl == null) {
 localRegistryUrl =
 new URL(MotanConstants.REGISTRY_PROTOCOL_LOCAL, hostAddress, MotanConstants.DEFAULT_INT_VALUE,
 RegistryService.class.getName());
 }
 urls.add(localRegistryUrl);
 } else {
 for (URL ru : registryURLs) {
 urls.add(ru.createCopy());
 }
 }
 for (URL u : urls) {
 u.addParameter(URLParamType.embed.getName(), StringTools.urlEncode(serviceUrl.toFullStr()));
 registereUrls.add(u.createCopy());
 }
 //使用spi机制加载SimpleConfigHandler
 ConfigHandler configHandler = ExtensionLoader.getExtensionLoader(ConfigHandler.class).getExtension(MotanConstants.DEFAULT_VALUE);
 //调用SimpleConfigHandler的export方法
 exporters.add(configHandler.export(interfaceClass, ref, urls));
 }
 
 
 private void afterExport() {
 exported.set(true);
 for (Exporter<T> ep : exporters) {
 existingServices.add(ep.getProvider().getUrl().getIdentity());
 }
 }

再来看一下SimpleConfigHandler的export方法

@Override
public <T> Exporter<T> export(Class<T> interfaceClass, T ref, List<URL> registryUrls) {
 String serviceStr = StringTools.urlDecode(registryUrls.get(0).getParameter(URLParamType.embed.getName()));
 URL serviceUrl = URL.valueOf(serviceStr);
 // export service
 // 利用protocol decorator来增加filter特性
 String protocolName = serviceUrl.getParameter(URLParamType.protocol.getName(), URLParamType.protocol.getValue());
 Protocol orgProtocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(protocolName);//对于Protocol对象增强filter
 Provider<T> provider = getProvider(orgProtocol, ref, serviceUrl, interfaceClass);//服务的代理提供者,包装ref的服务
 Protocol protocol = new ProtocolFilterDecorator(orgProtocol);
 Exporter<T> exporter = protocol.export(provider, serviceUrl);//发布服务,将代理对象provider与具体的serviceUrl关联
 // register service
 register(registryUrls, serviceUrl);
 return exporter;
}

3.下面我们来看一下,motan如何对filter进行相应的增强处理

/**
 *
 * Decorate the protocol, to add more features.
 *
 * @author fishermen
 * @version V1.0 created at: 2013-5-30
 */
public class ProtocolFilterDecorator implements Protocol { //实现Protocol的接口,联系到上文中使用此类对实际的Protocol进行包装
 private Protocol protocol;
 public ProtocolFilterDecorator(Protocol protocol) {
 if (protocol == null) {
 throw new MotanFrameworkException("Protocol is null when construct ProtocolFilterDecorator",
 MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
 }
 this.protocol = protocol;//给实际的Protocol进行赋值
 }
 @Override
 public <T> Exporter<T> export(Provider<T> provider, URL url) {
 return protocol.export(decorateWithFilter(provider, url), url);//发布服务时,调用filter增强处理方法
 }
 private <T> Provider<T> decorateWithFilter(final Provider<T> provider, URL url) {
 List<Filter> filters = getFilters(url, MotanConstants.NODE_TYPE_SERVICE);//获取实际需要增强的filter
 if (filters == null || filters.size() == 0) {
 return provider;
 }
 Provider<T> lastProvider = provider;
 for (Filter filter : filters) { //对于代理对象provider进行包装,包装成一个provider链,返回最后一个provider
 final Filter f = filter;
 if (f instanceof InitializableFilter) {
 ((InitializableFilter) f).init(lastProvider);
 }
 final Provider<T> lp = lastProvider;
 lastProvider = new Provider<T>() {
 @Override
 public Response call(Request request) {
 return f.filter(lp, request);//对于后面调用的call方法时,首先调用最外层的filter,最后再调用实际的provider的call方法
 }
 @Override
 public String desc() {
 return lp.desc();
 }
 @Override
 public void destroy() {
 lp.destroy();
 }
 @Override
 public Class<T> getInterface() {
 return lp.getInterface();
 }
 @Override
 public Method lookupMethod(String methodName, String methodDesc) {
 return lp.lookupMethod(methodName, methodDesc);
 }
 @Override
 public URL getUrl() {
 return lp.getUrl();
 }
 @Override
 public void init() {
 lp.init();
 }
 @Override
 public boolean isAvailable() {
 return lp.isAvailable();
 }
 @Override
 public T getImpl() {
 return provider.getImpl();
 }
 };
 }
 return lastProvider;
 }
 /**
 * <pre>
 * 获取方式:
 * 1)先获取默认的filter列表;
 * 2)根据filter配置获取新的filters,并和默认的filter列表合并;
 * 3)再根据一些其他配置判断是否需要增加其他filter,如根据accessLog进行判断,是否需要增加accesslog
 * </pre>
 *
 * @param url
 * @param key
 * @return
 */
 private List<Filter> getFilters(URL url, String key) {
 // load default filters
 List<Filter> filters = new ArrayList<Filter>();
 List<Filter> defaultFilters = ExtensionLoader.getExtensionLoader(Filter.class).getExtensions(key);//使用spi机制初始化filer对象
 if (defaultFilters != null && defaultFilters.size() > 0) {
 filters.addAll(defaultFilters);
 }
 // add filters via "filter" config
 String filterStr = url.getParameter(URLParamType.filter.getName());
 if (StringUtils.isNotBlank(filterStr)) {
 String[] filterNames = MotanConstants.COMMA_SPLIT_PATTERN.split(filterStr);
 for (String fn : filterNames) {
 addIfAbsent(filters, fn);
 }
 }
 // add filter via other configs, like accessLog and so on
 boolean accessLog = url.getBooleanParameter(URLParamType.accessLog.getName(), URLParamType.accessLog.getBooleanValue());
 if (accessLog) {
 addIfAbsent(filters, AccessLogFilter.class.getAnnotation(SpiMeta.class).name());
 }
 // sort the filters
 Collections.sort(filters, new ActivationComparator<Filter>());
 Collections.reverse(filters);
 return filters;
 }
}

4.服务发布完成后,需要像注册中心注册此服务

private void register(List<URL> registryUrls, URL serviceUrl) {
 for (URL url : registryUrls) { //循环遍历多个注册中心的信息
 // 根据check参数的设置,register失败可能会抛异常,上层应该知晓
 RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(url.getProtocol());//文中使用的是zookeeper
 if (registryFactory == null) {
 throw new MotanFrameworkException(new MotanErrorMsg(500, MotanErrorMsgConstant.FRAMEWORK_REGISTER_ERROR_CODE,
 "register error! Could not find extension for registry protocol:" + url.getProtocol()
 + ", make sure registry module for " + url.getProtocol() + " is in classpath!"));
 }
 Registry registry = registryFactory.getRegistry(url);//获取registry
 registry.register(serviceUrl);//将服务注册到zookeeper,也就是把节点信息写入到zookeeper中
 }
}

我们来看一下zookeeper注册中心的工厂类:每个Registry都需要独立维护一个ZkClient与zookeeper的链接

import com.weibo.api.motan.common.URLParamType;
import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.registry.Registry;
import com.weibo.api.motan.registry.support.AbstractRegistryFactory;
import com.weibo.api.motan.rpc.URL;
import com.weibo.api.motan.util.LoggerUtil;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkException;

@SpiMeta(name = "zookeeper")
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
 @Override
 protected Registry createRegistry(URL registryUrl) {
 try {
 int timeout = registryUrl.getIntParameter(URLParamType.connectTimeout.getName(), URLParamType.connectTimeout.getIntValue());
 int sessionTimeout =
 registryUrl.getIntParameter(URLParamType.registrySessionTimeout.getName(),
 URLParamType.registrySessionTimeout.getIntValue());
 ZkClient zkClient = new ZkClient(registryUrl.getParameter("address"), sessionTimeout, timeout, new StringSerializer());//创建zookeeper的客户端
 return new ZookeeperRegistry(registryUrl, zkClient);//创建实际的Registry
 } catch (ZkException e) {
 LoggerUtil.error("[ZookeeperRegistry] fail to connect zookeeper, cause: " + e.getMessage());
 throw e;
 }
 }
}

我们再来分析ZookeeperRegistry中的代码

 public ZookeeperRegistry(URL url, ZkClient client) {
 super(url);
 this.zkClient = client;
 IZkStateListener zkStateListener = new IZkStateListener() {
 @Override
 public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
 // do nothing
 }
 @Override
 public void handleNewSession() throws Exception { //响应zkClient的事件
 LoggerUtil.info("zkRegistry get new session notify.");
 reconnectService();//重新注册服务
 reconnectClient();
 }
 };
 zkClient.subscribeStateChanges(zkStateListener);
 ShutDownHook.registerShutdownHook(this);
 }

 private void reconnectService() {
 Collection<URL> allRegisteredServices = getRegisteredServiceUrls();
 if (allRegisteredServices != null && !allRegisteredServices.isEmpty()) {
 try {
 serverLock.lock();
 for (URL url : getRegisteredServiceUrls()) {
 doRegister(url);//注册
 }
 LoggerUtil.info("[{}] reconnect: register services {}", registryClassName, allRegisteredServices);
 for (URL url : availableServices) {
 if (!getRegisteredServiceUrls().contains(url)) {
 LoggerUtil.warn("reconnect url not register. url:{}", url);
 continue;
 }
 doAvailable(url);//标识服务可以提供服务
 }
 LoggerUtil.info("[{}] reconnect: available services {}", registryClassName, availableServices);
 } finally {
 serverLock.unlock();
 }
 }
 }

@Override
protected void doRegister(URL url) {
 try {
 serverLock.lock();
 // 防止旧节点未正常注销
 removeNode(url, ZkNodeType.AVAILABLE_SERVER);
 removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
 createNode(url, ZkNodeType.UNAVAILABLE_SERVER);
 } catch (Throwable e) {
 throw new MotanFrameworkException(String.format("Failed to register %s to zookeeper(%s), cause: %s", url, getUrl(), e.getMessage()), e);
 } finally {
 serverLock.unlock();
 }
}

@Override
protected void doUnregister(URL url) {
 try {
 serverLock.lock();
 removeNode(url, ZkNodeType.AVAILABLE_SERVER);
 removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
 } catch (Throwable e) {
 throw new MotanFrameworkException(String.format("Failed to unregister %s to zookeeper(%s), cause: %s", url, getUrl(), e.getMessage()), e);
 } finally {
 serverLock.unlock();
 }
}

@Override
protected void doAvailable(URL url) {
 try{
 serverLock.lock();
 if (url == null) {
 availableServices.addAll(getRegisteredServiceUrls());
 for (URL u : getRegisteredServiceUrls()) {
 removeNode(u, ZkNodeType.AVAILABLE_SERVER);
 removeNode(u, ZkNodeType.UNAVAILABLE_SERVER);
 createNode(u, ZkNodeType.AVAILABLE_SERVER);
 }
 } else {
 availableServices.add(url);
 removeNode(url, ZkNodeType.AVAILABLE_SERVER);
 removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
 createNode(url, ZkNodeType.AVAILABLE_SERVER);
 }
 } finally {
 serverLock.unlock();
 }
}

@Override
protected void doUnavailable(URL url) {
 try{
 serverLock.lock();
 if (url == null) {
 availableServices.removeAll(getRegisteredServiceUrls());
 for (URL u : getRegisteredServiceUrls()) {
 removeNode(u, ZkNodeType.AVAILABLE_SERVER);
 removeNode(u, ZkNodeType.UNAVAILABLE_SERVER);
 createNode(u, ZkNodeType.UNAVAILABLE_SERVER);
 }
 } else {
 availableServices.remove(url);
 removeNode(url, ZkNodeType.AVAILABLE_SERVER);
 removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
 createNode(url, ZkNodeType.UNAVAILABLE_SERVER);
 }
 } finally {
 serverLock.unlock();
 }
}

private void createNode(URL url, ZkNodeType nodeType) {
 String nodeTypePath = ZkUtils.toNodeTypePath(url, nodeType);
 if (!zkClient.exists(nodeTypePath)) {
 zkClient.createPersistent(nodeTypePath, true);//对于服务的标识信息,创建持久化节点
 }
 //对于服务的ip和端口号信息使用临时节点,当服务断了后,zookeeper自动摘除目标服务器
 zkClient.createEphemeral(ZkUtils.toNodePath(url, nodeType), url.toFullStr());
}

private void removeNode(URL url, ZkNodeType nodeType) {
 String nodePath = ZkUtils.toNodePath(url, nodeType);
 if (zkClient.exists(nodePath)) {
 zkClient.delete(nodePath);
 }
}

本文分析了motan的服务发布及注册到zookeeper的流程相关的源码,主要涉及到的知识点:

1.利用相关的配置对象进行信息的存储及传递;

2.利用provider对具体的业务类进行封装代理;

3.利用filter链的结构,来包装实际的provider,把所有的过滤器都处理完毕后,最后调用实际的业务类,大家可以想象一下aop相关的原理,有些类似;

4.代码中大量使用jdk的标准spi技术进行类的加载;

5.支持多个注册中心,也就是同一个服务可以注册到不同的注册中心上,每个registry对应一个具体的zkclient;

6.利用了zookeeper的临时节点来维护服务器的host和port信息;

7.支持多个服务发布到同一个端口,在本文中并没分析netty使用相关的代码,后面会分析到。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表