Spring Cloud Eureka服务续约(Renew)源码分析

× 文章目录
  1. 1. Renew(服务续约)
    1. 1.1. 概述
    2. 1.2. 服务续约配置
  2. 2. Renew源码分析
    1. 2.1. 服务提供者实现细节
    2. 2.2. Netflix中的Eureka Core实现细节
    3. 2.3. 源码分析链接

摘要:在本篇文章中主要对Eureka的Renew(服务续约),从服务提供者发起续约请求开始分析,通过阅读源码和画时序图的方式,展示Eureka服务续约的整个生命周期。服务续约主要是把服务续约的信息更新到自身的Eureka Server中,然后再同步到其它Eureka Server中。

Renew(服务续约)

概述

Renew(服务续约)操作由Service Provider定期调用,类似于heartbeat。目的是隔一段时间Service Provider调用接口,告诉Eureka Server它还活着没挂,不要把它T了。通俗的说就是它们两之间的心跳检测,避免服务提供者被剔除掉。
请参考:Spring Cloud Eureka名词解释

服务续约配置

Renew操作会在Service Provider定时发起,用来通知Eureka Server自己还活着。 这里有两个比较重要的配置需要如下,可以在Run之前配置。

1
eureka.instance.leaseRenewalIntervalInSeconds

Renew频率。默认是30秒,也就是每30秒会向Eureka Server发起Renew操作。

1
eureka.instance.leaseExpirationDurationInSeconds

服务失效时间。默认是90秒,也就是如果Eureka Server在90秒内没有接收到来自Service Provider的Renew操作,就会把Service Provider剔除

Renew源码分析

服务提供者实现细节

服务提供者发发起服务续约的时序图,如下图所示,大家先直观的看一下时序图,等阅读完源码再回顾一下。
服务提供者发起续约时序图

  1. 在com.netflix.discovery.DiscoveryClient.initScheduledTasks()中的1272行,TimedSupervisorTask会定时发起服务续约,代码如下所示:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // Heartbeat timer
    scheduler.schedule(
    new TimedSupervisorTask(
    "heartbeat",
    scheduler,
    heartbeatExecutor,
    renewalIntervalInSecs,
    TimeUnit.SECONDS,
    expBackOffBound,
    new HeartbeatThread()
    ),
    renewalIntervalInSecs, TimeUnit.SECONDS);

2.在com.netflix.discovery.DiscoveryClient中的1393行,有一个HeartbeatThread线程发起续约操作

1
2
3
4
5
6
7
8
9
private class HeartbeatThread implements Runnable {
public void run() {
//调用eureka-client中的renew
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}

renew()调用eureka-client-1.4.11.jarcom.netflix.discovery.DiscoveryClient中829行renew()发起PUT Reset请求,调用com.netflix.eureka.resources.InstanceResource中的renewLease()续约。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
return register();
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}

Netflix中的Eureka Core实现细节

NetFlix中Eureka Core中的服务续约时序图,如下图所示。
服务续约时序图

  1. 打开com.netflix.eureka.resources.InstanceResource中的106行的renewLease()方法,代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private final PeerAwareInstanceRegistry registry
    @PUT
    public Response renewLease(
    @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
    @QueryParam("overriddenstatus") String overriddenStatus,
    @QueryParam("status") String status,
    @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    //调用
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
    //其余省略
    }
  2. 点开registry.renew(app.getName(), id, isFromReplicaNode);我们可以看到,调用了org.springframework.cloud.netflix.eureka.server.InstanceRegistry中的renew()方法,代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Override
    public boolean renew(final String appName, final String serverId,
    boolean isReplication) {
    log("renew " + appName + " serverId " + serverId + ", isReplication {}"
    + isReplication);
    List<Application> applications = getSortedApplications();
    for (Application input : applications) {
    if (input.getName().equals(appName)) {
    InstanceInfo instance = null;
    for (InstanceInfo info : input.getInstances()) {
    if (info.getHostName().equals(serverId)) {
    instance = info;
    break;
    }
    }
    publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
    instance, isReplication));
    break;
    }
    }
    //调用com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl中的renew方法
    return super.renew(appName, serverId, isReplication);
    }

3.从super.renew()看到调用了父类中的com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl420行的renew()方法,代码如下:

1
2
3
4
5
6
7
8
9
public boolean renew(final String appName, final String id, final boolean isReplication) {
//服务续约成功,
if (super.renew(appName, id, isReplication)) {
//然后replicateToPeers同步其它Eureka Server中的数据
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}

3.1 从上面代码中super.renew(appName, id, isReplication)可以看出调用的是com.netflix.eureka.registry.AbstractInstanceRegistry中345行的renew()方法,代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
Object[] args = {
instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId()
};
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", args);
instanceInfo.setStatus(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
leaseToRenew.renew();
return true;
}
}

其中 leaseToRenew.renew()是调用com.netflix.eureka.lease.Lease中的62行的renew()方法

1
2
3
4
5
6
7
8
9
/**
* Renew the lease, use renewal duration if it was specified by the
* associated {@link T} during registration, otherwise default duration is
* {@link #DEFAULT_DURATION_IN_SECS}.
*/
public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}

3.2 replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);调用自身的replicateToPeers()方法,在com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl中的618行,主要接口实现方式和register基本一致:首先更新自身Eureka Server中服务的状态,再同步到其它Eureka Server中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
// 同步把续约信息同步到其它的Eureka Server中
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
//根据action做相应操作的同步
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}

至此,Eureka服务续约源码分析结束,大家有兴趣可自行阅读。

源码分析链接

其它源码分析链接:
Spring Cloud中@EnableEurekaClient源码分析:
http://blog.xujin.org/sc/sc-enableEurekaClient-annonation/
Spring Cloud Eureka服务注册源码分析:
http://blog.xujin.org/sc/sc-eureka-register/

如果您觉得文章不错,可以打赏我喝一杯咖啡!