本文共 7065 字,大约阅读时间需要 23 分钟。
按照原定的计划,我将分三个部分来分析 Eureka 的源码:
今天,我们来研究第三部分的源码。
分析的思路和第二部分的一样,先明确 Eureka Server 需要具备哪些功能,再从源码层面分析如何实现这些功能,最后补充 Eureka Server 的配置解读。
os:win 10
jdk:1.8.0_231
eureka:1.10.11
tomcat:9.0.21
还是来回顾 Eureka 的整个交互过程。
首先,Eureka Server 需要和 Eureka Client 交互,所以它需要能够处理 Eureka Client 的各种请求,这些请求包括:
除此之外,在集群中,它需要和对等节点交互,交互内容主要包括:
其实,一个完整的 Eureka Server 项目本身也包含了 Eureka Client 的部分,也就是说,它可以注册自己和消费包括自己在内的服务,可以在 eureka-client.properties 增加以下配置来关闭掉这两个部分的功能(不建议这么做):
# 当前实例是否注册到Eureka Server。默认trueeureka.registration.enabled=false# 当前实例是否需要从Eureka Server获取服务注册表eureka.shouldFetchRegistry=false
知道了 Eureka Server 需要具备哪些功能,接下来我们就从源码的角度来看看怎样实现这些功能。
和之前一样,我更多的会从设计的层面来分析,而不会顺序地去看每个过程的代码,即重设计、轻实现。
那么,还是从一个 UML 图开始吧。有了它,相信大家看源码时会更轻松一些。
AbstractInstanceRegistry
里放了一张注册表,用来存放所有的实例对象,通过它可以处理 Eureka Client 或者其他 Eureka Server 的请求,包括注册、续约、注销实例以及获取注册表等。
它的子类PeerAwareInstanceRegistryImpl
提供了多节点的支持,这里以续约实例的方法为例,相同的操作还会被同步到其他节点(对等节点的请求除外)。
public boolean renew(final String appName, final String id, final boolean isReplication) { // 先调用父类AbstractInstanceRegistry的方法 if (super.renew(appName, id, isReplication)) { // 再将操作同步到其他节点,最终是调用PeerEurekaNode的方法进行同步 replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); return true; } return false; }
除此之外,PeerAwareInstanceRegistryImpl
还启动了三个定时任务:
PeerEurekaNode
列表。例如,当我们使用 DNS 配合 serviceUrl 时,对等节点的地址可能会变化,所以需要及时更新。这个定时任务用于支持集群的故障转移和扩容。# 期望实例多久续约一次eureka.expectedClientRenewalIntervalSeconds=30# 续约实例的阈值,未达到将开启自我保护模式eureka.renewalPercentThreshold=0.85# 是否启用保护模式eureka.enableSelfPreservation=true
Eureka Server 是作为一个 Web 应用运行的,要看源码比较难找到入口。打开 例子里的 web.xml,可以看到配置了一个监听器,这个类就是 Eureka Server 初始化的入口。
com.netflix.eureka.EurekaBootStrap
在这个类里面,我们主要关注这一段代码(代码有删减)。
protected void initEurekaServerContext() throws Exception { // 下面这一段是为了初始化Eureka Client所需要的对象,上一篇博客讲过了 EurekaInstanceConfig instanceConfig = new MyDataCenterInstanceConfig(); ApplicationInfoManager applicationInfoManager = new ApplicationInfoManager( instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get()); EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig(); eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig); // 加载eureka-server.properties的配置 EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig(); ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig); // 初始化注册表对象(支持多节点) PeerAwareInstanceRegistry registry = new PeerAwareInstanceRegistryImpl( eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, eurekaClient ); // 初始化PeerEurekaNodes对象 PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes( registry, eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, applicationInfoManager ); // 1. 初始化PeerEurekaNode列表, // 2. 启动定时任务:更新PeerEurekaNode列表 peerEurekaNodes.start(); // 1. 将PeerEurekaNode列表的指针给到PeerEurekaNodes对象对象 // 2. 启动定时任务:更新参数numberOfRenewsPerMinThreshold--每分钟至少要有多少实例续约,它是判断是否开启自我保护模式的依据 registry.init(peerEurekaNodes); // 从其他节点获取实例列表并注册到本地的注册表 int registryCount = registry.syncUp(); // 1. 初始化参数numberOfRenewsPerMinThreshold--每分钟要求多少实例续约 // 2. 开启定时任务:淘汰未能正常续约的实例 registry.openForTraffic(applicationInfoManager, registryCount); }
完成初始化后,Eureka Server 就可以处理 Eureka Client 的请求了。因为 Eureka Server 使用 jersey 作 Web 框架(jersey 和 struts2、springMVC 作用差不多,没接触过也不碍事),所以,只要找到添加了javax.ws.rs.Path
注解的类,就能找到这部分代码的入口。
回顾下的内容,在 Eureka 里,配置分成了三种:
这里我们来讲讲EurekaServerConfig
的配置参数,对应的是 eureka-server.properties 里的配置。
# 期望实例多久续约一次eureka.expectedClientRenewalIntervalSeconds=30# 续约实例的阈值,未达到将开启自我保护模式eureka.renewalPercentThreshold=0.85# 是否启用保护模式eureka.enableSelfPreservation=true# 更新参数numberOfRenewsPerMinThreshold的定时任务多久执行一次renewalThresholdUpdateIntervalM=900000# 更新PeerEurekaNode列表的定时任务多久执行一次peerEurekaNodesUpdateIntervalMs=600000# 淘汰未能正常续约实例的定时任务多久执行一次evictionIntervalTimerInMs=60000# 这几个一般不用,我就不展开了。有需要的话可以#awsAccessId=#awsSecretKey=eipBindRebindRetries=3eipBindRebindRetryIntervalMsWhenUnbound=60000eipBindRebindRetryIntervalMs=300000waitTimeInMsWhenSyncEmpty=300000shouldBatchReplication=falsedisableDelta=falsenumberRegistrySyncRetries=5registrySyncRetryWaitMs=30000enableReplicatedRequestCompression=falseminAvailableInstancesForPeerReplication=-1peerEurekaStatusRefreshTimeIntervalMs=30000peerNodeConnectTimeoutMs=1000peerNodeReadTimeoutMs=5000peerNodeTotalConnections=1000peerNodeTotalConnectionsPerHost=500numberOfReplicationRetries=5maxElementsInPeerReplicationPool=10000maxIdleThreadAgeInMinutesForPeerReplication=15minThreadsForPeerReplication=5maxThreadsForPeerReplication=20maxTimeForReplication=30000primeAwsReplicaConnections=truemaxIdleThreadAgeInMinutesForStatusReplication=10minThreadsForStatusReplication=1maxThreadsForStatusReplication=1maxElementsInStatusReplicationPool=10000disableDeltaForRemoteRegions=falseremoteRegionConnectTimeoutMs=2000remoteRegionReadTimeoutMs=5000remoteRegionTotalConnections=1000remoteRegionTotalConnectionsPerHost=500remoteRegionConnectionIdleTimeoutSeconds=30remoteRegion.gzipContent=true#remoteRegionUrlsWithName=#remoteRegion.appWhiteList=remoteRegion.registryFetchIntervalInSeconds=30remoteRegion.fetchThreadPoolSize=20#remoteRegion.trustStoreFileName=remoteRegion.trustStorePassword=changeitremoteRegion.disable.transparent.fallback=falseshouldUseAwsAsgApi=trueasgQueryTimeoutMs=300asgUpdateIntervalMs=300000asgCacheExpiryTimeoutMs=600000retentionTimeInMSInDeltaQueue=180000deltaRetentionTimerIntervalInMs=30000responseCacheAutoExpirationInSeconds=180responseCacheUpdateIntervalMs=30000shouldUseReadOnlyResponseCache=truesyncWhenTimestampDiffers=trueauth.shouldLogIdentityHeaders=trueroute53BindRebindRetries=3route53BindRebindRetryIntervalMs=300000route53DomainTTL=30initialCapacityOfResponseCache=1000jsonCodecName=com.netflix.discovery.converters.wrappers.CodecWrappers.LegacyJacksonJsonxmlCodecName=com.netflix.discovery.converters.wrappers.CodecWrappers.XStreamXml
以上比较宏观地讲完了 Eureka Server 的源码和配置,具体的细节欢迎私信交流。
最后,感谢您的阅读。
https://github.com/Netflix/eureka/wiki/Eureka-at-a-glance
相关源码请移步:
本文为原创文章,转载请附上原文出处链接: