Spring Cloud Eureka服务注册源码分析

× 文章目录
  1. 1. 服务注册
    1. 1.1. Eureka Client的定时任务调用Eureka Server的提供接口
    2. 1.2. Eureka server端接到请求后的处理
  2. 2. 总结

摘要:在上一篇中,介绍了Eureka的相关的知识,解释了Eureka为什么适合做服务发现和注册。接下来,在本篇文章将通过源码分析的方式,看一下Eureka是怎么work的。本章主要介绍Eureka的服务注册。那eureka client如何将本地服务的注册信息发送到远端的注册服务器eureka server上。通过下面的源码分析,看出Eureka Client的定时任务调用Eureka Server的REST接口,而Eureka接收到调用请求后会处理服务的注册以及Eureka Server中的数据同步的问题。

服务注册

服务注册,想必大家并不陌生,就是服务提供者启动的时候,把自己提供的服务信息,例如 服务名,IP,端口号,版本号等信息注册到注册中心,比如注册到ZK中。那eureka client如何将本地服务的注册信息发送到远端的注册服务器eureka server上。通过下面的源码分析,看出服务注册可以认为是Eureka client自己完成,不需要服务本身来关心。

Eureka Client的定时任务调用Eureka Server的提供接口

实现思路其实也挺简单,在com.netflix.discovery.DiscoveryClient启动的时候,会初始化一个定时任务,定时的把本地的服务配置信息,即需要注册到远端的服务信息自动刷新到注册服务器上。
首先看一下Eureka的代码,在spring-cloud-netflix-eureka-server工程中可以找到这个依赖eureka-client-1.4.11.jar查看代码可以看到,
com.netflix.discovery.DiscoveryClient.java中的1240行可以看到Initializes all scheduled tasks,在1277行,可以看到InstanceInfoReplicator定时任务。

  1. 在DiscoveryClient中初始化一个InstanceInfoReplicator,其实里面封装了以定时任务。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    /**
    * Initializes all scheduled tasks.
    */
    private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
    // registry cache refresh timer
    int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
    int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
    scheduler.schedule(
    new TimedSupervisorTask(
    "cacheRefresh",
    scheduler,
    cacheRefreshExecutor,
    registryFetchIntervalSeconds,
    TimeUnit.SECONDS,
    expBackOffBound,
    new CacheRefreshThread()
    ),
    registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    if (clientConfig.shouldRegisterWithEureka()) {
    int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
    int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
    logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
    // Heartbeat timer
    scheduler.schedule(
    new TimedSupervisorTask(
    "heartbeat",
    scheduler,
    heartbeatExecutor,
    renewalIntervalInSecs,
    TimeUnit.SECONDS,
    expBackOffBound,
    new HeartbeatThread()
    ),
    renewalIntervalInSecs, TimeUnit.SECONDS);
    // InstanceInfo replicator
    /**************************封装了定时任务**********************************/
    instanceInfoReplicator = new InstanceInfoReplicator(
    this,
    instanceInfo,
    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
    2); // burstSize
    statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
    @Override
    public String getId() {
    return "statusChangeListener";
    }
    @Override
    public void notify(StatusChangeEvent statusChangeEvent) {
    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
    InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
    // log at warn level if DOWN was involved
    logger.warn("Saw local status change event {}", statusChangeEvent);
    } else {
    logger.info("Saw local status change event {}", statusChangeEvent);
    }
    instanceInfoReplicator.onDemandUpdate();
    }
    };
    if (clientConfig.shouldOnDemandUpdateStatusChange()) {
    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
    }
    //点击可以查看start方法
    instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
    logger.info("Not registering with Eureka server per configuration");
    }
    }

2.以initialDelayMs为间隔调用。

1
2
3
4
5
6
7
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}

3.ScheduledExecutorService的task的具体业务定义在com.netflix.discovery.InstanceInfoReplicator.run()中,也就是InstanceInfoReplicator中的98-113行,可以看到调用了了client的register方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
//客户端发送hhtp注册请求的真正入口
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}

4.com.netflix.discovery.DiscoveryClient中的 register()方法,大概在811行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
//Eureka Client客户端,调用Eureka服务端的入口
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}

Eureka server端接到请求后的处理

打开spring-cloud-netflix-eureka-server工程或spring-cloud-netflix-eureka-client过程,找到相应的maven依赖jar,如下图所示
Eureka中register入口
1.Eureka server服务端请求入口
ApplicationResource.java文件中第183行,如下所示,可以看出Eureka是通过http post的方式去服务注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
//InstanceRegistry.java文件中的88行的405行register方法
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}

2.如下图所示可以看到,从ApplicationResource.java怎么进入到PeerAwareInstanceRegistryImpl中的register方法
调用PeerAwareInstanceRegistryImpl中的register
InstanceRegistry.java文件中的88行,可以看到调用PeerAwareInstanceRegistryImpl中的405行register方法

1
2
3
4
5
6
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
//调用PeerAwareInstanceRegistryImpl中的405行register方法
super.register(info, isReplication);
}

3.PeerAwareInstanceRegistryImpl中的405行register方法,代码如下所示。阅读方法上面的注释,就知道该方法是注册服务信息并把Eureka Server中的配置信息同步。执行注册的动作在com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl.register(InstanceInfo info, boolean isReplication)中,具体代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Registers the information about the {@link InstanceInfo} and replicates
* this information to all peer eureka nodes. If this is replication event
* from other replica nodes then it is not replicated.
*
* @param info
* the {@link InstanceInfo} to be registered and replicated.
* @param isReplication
* true if this is a replication event from other replica nodes,
* false otherwise.
*/
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//调用父类方法注册
super.register(info, leaseDuration, isReplication);
// 同步Eureka中的服务信息
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

4.AbstractInstanceRegistry.java中192行,可以看到Eureka真正的服务注册实现的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
* Registers a new instance with a given duration.
*
* @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
*/
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}

说明:注册信息其实就是存储在一个 ConcurrentHashMap>> registry的结构中。

1
2
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

更多的细节源码内容,大家可以自己阅读。

总结

ApplicationResource类接收Http服务请求,调用PeerAwareInstanceRegistryImpl的register方法,PeerAwareInstanceRegistryImpl完成服务注册后,调用replicateToPeers向其它Eureka Server节点(Peer)做状态同步。如下图所示。
Eureka Server注册时序图

如果您觉得文章不错,可以打赏我喝一杯咖啡!