基于Spring Cloud的几个自研微服务组件

摘要:本文由宜信-技术研发中心-高级架构师-梁鑫投稿分享。投稿请加微信Software_King,之前分享了一下我在公司项目中搭建Spring Cloud框架的经验,在此基础之上,为了更好的满足业务功能需求和便捷运维的需要,我们开发了几个基于springcloud的微服务组件,在此做个总结跟大家共同探讨一下。

基于SpringCloud的几个自研微服务组件

一.应用管理中心

采用微服务架构以后,把原先单一的节点拆解成了多个微服务节点。公司虽然有一键发布平台,但是是针对每一个节点采取单独的发布,启动,停止操作,没有全局化统一管理功能。上线运维的工作量就变的非常庞大,因此我们开发了基于springcloud的应用管理中心来方便我们的工作。

1.1 实现机制

  • 每个微服务启动时,将自身的进程ID,当前路径,JDK路径,jar名称,系统用户,IP地址,端口号注册到zookeeper;
  • 应用管理中心从zookeeper中查询到所有的微服务进程信息;
  • 获取信息后构造启动命令,停止命令;
  • 在数据库保存系统的用户名和密码;
  • 在部署时指定git地址,分支,IP等调用jenkins,编译生成最新jar包拷贝到目标服务器指定位置;
  • 通过远程jsch远程执行shell命令对微服务进程进行操作。

1.2 功能原理

功能原理

1.3 效果图

Image text
Image text
Image text

二.微服务健康检测中心

基于actuator我们可以很好的针对每个微服务节点进行监控,当出现问题时及时报警。

2.1 实现机制

  • 保证所有的微服务节点都加载了actuator;
  • 从eureka中获取所有的微服务注册信息;
  • 定时任务轮询请求每个微服务的health信息;
  • Health返回status树壮结构信息;
  • 如果status状态为down发送报警并包含health完整信息。

2.2 功能原理

Image text

2.3 效果图

我们在健康检测中心中同时加载了spring boot admin,可以随时查看微服务节点的所有运行信息。

Image text

2.4 部分源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Scheduled(initialDelay = 10000L, fixedRate = 60000L)
public void scheduledTaskHandler() {
long begin = System.currentTimeMillis();
List<String> items = getIPAndPorts();
for (String IPAndPort : items) {
LOGGER.info("prepare checking " + IPAndPort);
if (IPAndPort.isEmpty()) {
LOGGER.warn("checking " + IPAndPort + " is empty");
continue;
}
try {
String[] item = IPAndPort.split(":");
String info = healthService.getHealthInfo(item[0], item[1]);
String prettyjson = getPrettyJSON(info);
ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("rawtypes")
Map<?, ?> infoMap = mapper.readValue(info, new TypeReference<Map>() {
});
if (infoMap.containsKey(STATUS)) {
String status = infoMap.get(STATUS).toString();
if (status.equals(UP)) {
LOGGER.info("checking " + IPAndPort + " is UP:\n" + prettyjson);
continue;
}
}
emailService.sendLimitedEmail(prettyjson, IPAndPort, IPAndPort, 10 * 60 * 1000L);
LOGGER.info("checking " + IPAndPort + " is DOWN:\n" + prettyjson);
}
catch (feign.RetryableException ex) {
String content = exception2String(ex);
emailService.sendEmail(content, "程序运行异常");
LOGGER.info("checking " + IPAndPort + " is DOWN");
LOGGER.error("", ex);
}
catch (FeignException fex) {
String body = getBodyFromFeignException(fex);
String prettybody = getPrettyJSON(body);
emailService.sendLimitedEmail(prettybody, IPAndPort, IPAndPort, 10 * 60 * 1000L);
LOGGER.info("checking " + IPAndPort + " is DOWN:\n" + prettybody);
LOGGER.error("", fex);
}
catch (Exception ex) {
String content = exception2String(ex);
emailService.sendEmail(content, "程序运行异常");
LOGGER.info("checking " + IPAndPort + " is DOWN");
LOGGER.error("", ex);
}
}
LOGGER.info("SkytrainHealthChecking cost: " + (System.currentTimeMillis() - begin) + " Millis");
}

三.定时任务管理中心

我们需要建立定时任务全局视图,并希望对每一个定时任务具备启停的能力。我们采用了通过继承统一的AbstractScheduledTask,暴露定时任务的启动,停止接口。让每一个定时任务自动具备的启停功能。

3.1 实现机制

  • 通过BeanPostProcessor在bean初始完成后拦截所有的@Scheduled注解,获取ip,port,applicationName,className,beanName,scheduled等信息注册到zookeeper;
  • 定时任务实现继承AbstractScheduledTaskInter接口,暴露startScheduledTask和stopScheduledTask,可以针对定时任务进行启动停止;
  • 实现基准TaskScheduledController类,通过beanName获取定时任务类,调用该类的启动停止方法;
  • 定时任务管理中心查询zookeeper展示全局注册的定时任务,提供启动停止操作对定时任务进行控制;
  • 定时任务分为全局唯一定时任务和非唯一定时任务。

3.2 功能原理

Image text

3.3 效果图

Image text
Image text

3.4 部分源码

  • 获取定时任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public Object postProcessAfter(Object bean, String beanName) {
Class<?> targetClass = AopUtils.getTargetClass(bean);
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
new MethodIntrospector.MetadataLookup<Set<Scheduled>>() {
public Set<Scheduled> inspect(Method method) {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method,
Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
}
});
if (!annotatedMethods.isEmpty()) {
String className = targetClass.getName();
className = SpringBeanUtil.getNormalClassName(className);
for (Map.Entry<Method, Set<Scheduled>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (Scheduled scheduled : entry.getValue()) {
String key = className + ":" + method.getName();
Map<String, String> scheduledMap = new HashMap<String, String>();
scheduledMap.put("className", className);
scheduledMap.put("methodName", method.getName());
scheduledMap.put("beanName", beanName);
scheduledMap.put("scheduled", scheduled.toString());
taskInfos.put(key, scheduledMap);
}
}
}
return null;
}
  • AbstractScheduledTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public abstract class AbstractScheduledTask implements AbstractScheduledTaskInter {
...
public boolean scheduledTaskStatus = true;
public Object startScheduledTask() {
scheduledTaskStatus = true;
updateRegisterTaskStatus(scheduledTaskStatus);
return String.valueOf(scheduledTaskStatus);
}
public Object stopScheduledTask() {
scheduledTaskStatus = false;
updateRegisterTaskStatus(scheduledTaskStatus);
return String.valueOf(scheduledTaskStatus);
}
public void updateRegisterTaskStatus(boolean status) {
try {
String className = this.getClass().getName();
className=SpringBeanUtil.getNormalClassName(className);
...
}
catch (JSONException e) {
LOGGER.error(e.getMessage(), e);
}
}
public boolean isRegister = false;
public boolean isRegister() {
if (isRegister) {
return true;
}
...
}
public void scheduledTaskController() {
if (isRegister()) {
if (scheduledTaskStatus) {
scheduledTaskHandler();
}
}
}
public abstract void scheduledTaskHandler();
}
  • 具备启停能力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Controller
public class TaskScheduledController {
@RequestMapping(value = "/start_scheduled_task", produces = "application/json;charset=UTF-8")
@ResponseBody
public Object start(@RequestParam() String beanName) {
AbstractScheduledTaskInter scheduledTask = SpringBeanUtil.getBean(beanName);
return scheduledTask.startScheduledTask();
}
@RequestMapping(value = "/stop_scheduled_task", produces = "application/json;charset=UTF-8")
@ResponseBody
public Object stop(@RequestParam() String beanName) {
AbstractScheduledTaskInter scheduledTask = SpringBeanUtil.getBean(beanName);
return scheduledTask.stopScheduledTask();
}
}

四.全局热备锁

无论定时任务还是普通跑批任务,我们需要对这些任务实现热备,以便在单点故障时任务依然可以顺利的执行。

4.1 实现机制

  • 全局热备锁包括普通任务(例如监听rabbitmq消息)和定时任务;
  • 任务以applicationName和className标识唯一,任务启动时把相关信息注册到zookeeper;
  • 其他节点的任务启动时发现已经有任务运行,则监听zookeeper;
  • 运行任务停止后,其它节点根据监听状态启动自身任务;
  • 定时任务和普通任务不同,定时任务需要注册非运行节点,并对子节点数目变化和子节点数据变化都做监听。

4.2 功能原理

Image text

4.3 效果图

Image text

4.4 部分源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Component
public abstract class AbstractGlobalLockTask implements CommandLineRunner {
...
public void register(final String... strings) {
String className = getClass().getName();
className = SpringBeanUtil.getNormalClassName(className);
final String alarmConfigPath = SKYTRAIN_GlOBAL_LOCK_TASK_PREFIX + applicationName + ":" + className;
boolean exists = zooKeeperExecutor.isExists(alarmConfigPath);
if (!exists) {
JSONObject info = new JSONObject();
try {
info.put("applicationName", applicationName);
info.put("className", className);
info.put("ip", ipAddress);
info.put("port", serverPort);
info.put("date", new Date());
}
catch (JSONException e) {
LOGGER.error(e.getMessage(), e);
}
createGlobalLockNode(alarmConfigPath, info.toString(), strings);
}
else {
try {
zooKeeperExecutor.getZooKeeper().exists(alarmConfigPath, new Watcher() {
public void process(WatchedEvent watchedEvent) {
LOGGER.info("事件类型" + watchedEvent.getType() + ",路径" + watchedEvent.getPath());
try {
register(strings);
}
catch (Exception e) {
LOGGER.error("", e);
}
}
});
}
catch (KeeperException e) {
LOGGER.error(e.getMessage(), e);
}
catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
}
}
public void createGlobalLockNode(String path, String value, String... strings) {
zooKeeperExecutor.createZKNode(path, value);
handler(strings);
}
public void run(String... strings) throws Exception {
register(strings);
}
public abstract void handler(String... strings);
}

作者:宜信-技术研发中心-高级架构师-梁鑫

如果您觉得文章不错,可以打赏我喝一杯咖啡!