Elastic Search java客户端封装使用

× 文章目录
  1. 1. 使用TransportClient连接ES
  2. 2. 封装ES Client工具类
  3. 3. 设计ESClientHelper
    1. 3.1. ESClientHelper.java
    2. 3.2. 使用ESClientHelper

摘要:ES所提供的Http服务适合用作集群状态和数据的监控,而不适合直接用于数据操作。ES提供了多种语言(包括Java、Python、PHP、Ruby等)版本的Client API,可以使用这些Client API编程实现数据操作功能。在这里主要介绍使用Java版本的Client来操作数据。ES中所有的Java API调用都要使用Client对象,ES为API调用者提供了两类Client对象:NodeClient和TransportClient。TransportClient适合用于生产环境中,本文主要介绍TransportClient。

使用TransportClient连接ES

使用elastic search Client 为5.2.2版本,引入如下依赖。

1
2
3
4
5
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.2.2</version>
</dependency>

Tips: 建议API的版本与ES集群所使用的版本保持一致,以免出现因版本不一致而导致的冲突。由于org.elasticsearch.client依赖Log4j,因此还需要配置如下依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.7</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.7</version>
</dependency>

Tips https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.2/_maven_repository.html

封装ES Client工具类

1.在实际使用中,我们会把ES的相关配置信息,抽取到elasticsearch.yaml中

1
2
3
4
5
port: 9300
#address: "192.168.67.200,192.168.67.149,192.168.67.215,192.168.67.156,192.168.67.178,192.168.67.240,192.168.67.153,192.168.67.90,192.168.67.228,192.168.67.125"
address: "192.168.113.250"
cluster: "elasticsearch"
index: "mt-apm-*"
  1. 根据Java SDK的写法,需要做写如下硬编码操作的Code。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    String deviceId = "23566d22-6a30-30a9-a874-e1bf75ab688b";
    Settings settings =Settings.builder().put("cluster.name","vpc-ops-elk-elastic-cluster").build();
    Client client = new PreBuiltTransportClient(settings)
    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.67.200"), 9300))
    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.67.149"), 9300))
    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.67.215"), 9300))
    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.67.156"), 9300))
    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.67.178"), 9300))
    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.67.240"), 9300))
    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.67.153"), 9300))
    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.67.90"), 9300))
    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.67.228"), 9300))
    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.67.125"), 9300));
    QueryBuilder qb = matchQuery(
    "@message.deviceInfo.deviceId",
    deviceId
    );
    SearchResponse response = client.prepareSearch("mt-apm-*")
    .setQuery(qb)
    .execute()
    .actionGet();
    System.out.println(response);
    client.close();

上述写法存在很多硬编码,无法做到配置和程序代码分离,因此需要设计一个ES工具类用于生成调用Client,下面ESClientHelper工具类如下。

设计ESClientHelper

ESClientHelper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
*
* ESClientHelper 按照Elasticsearch API,在Java端使用是ES服务需要创建Java
* Client,但是每一次连接都实例化一个client,对系统的消耗很大, 即使在使用完毕之后将client
* close掉,由于服务器不能及时回收socket资源,极端情况下会导致服务器达到最大连接数。为了解决上述问题并提高client利用率,可以参考使用池化技术复用client。
*
* @author xujin
*
*/
public class ESClientHelper {
private static final Log logger = LogFactory.getLog(ESClientHelper.class);
private static ESClientHelper instance;
private Settings setting;
private Map<String, Client> clientMap = new ConcurrentHashMap<String, Client>();
// HostName与Port
private Map<String, Integer> ips = new HashMap<String, Integer>();
private String clusterName = Configs.ES_CONFIG.getCluster();
public static synchronized ESClientHelper getInstance() {
if (instance == null) {
instance = new ESClientHelper();
}
return instance;
}
private ESClientHelper() {
init();
}
/**
* 初始化默认的client
*/
public void init() {
String address = Configs.ES_CONFIG.getAddress();
if (StringUtils.isNotEmpty(address)) {
StringTokenizer stokenizer = new StringTokenizer(address, ",");
while (stokenizer.hasMoreTokens()) {
String ip = stokenizer.nextToken();
ips.put(ip, Configs.ES_CONFIG.getPort());
}
}
setting = Settings.builder().put("cluster.name", Configs.ES_CONFIG.getCluster()).build();
addClient(setting, getAllAddress(ips));
}
/**
* 获得所有的地址端口
*
* @return
*/
public List<InetSocketTransportAddress> getAllAddress(Map<String, Integer> ips) {
List<InetSocketTransportAddress> addressList = new ArrayList<InetSocketTransportAddress>();
for (String ip : ips.keySet()) {
try {
addressList.add(new InetSocketTransportAddress(InetAddress.getByName(ip), ips.get(ip)));
} catch (UnknownHostException e) {
logger.error(" add InetSocketTransportAddress exception:[{}],ip:[{}]", e.getMessage(), ip);
}
}
return addressList;
}
public Client getClient() {
return getClient(clusterName);
}
public Client getClient(String clusterName) {
return clientMap.get(clusterName);
}
public void addClient(Settings setting, List<InetSocketTransportAddress> transportAddress) {
Client client = new PreBuiltTransportClient(setting).addTransportAddresses(
transportAddress.toArray(new InetSocketTransportAddress[transportAddress.size()]));
clientMap.put(setting.get("cluster.name"), client);
}
}

相对于 java API 2.4.4 版本来说,升级依赖之后,需要做如下改动。将下面的代码修改上述中的代码。

1
2
Client client = TransportClient.builder().settings(setting).build().addTransportAddresses(
transportAddress.toArray(new InetSocketTransportAddress[transportAddress.size()]));

使用ESClientHelper

使用ESClientHelper.getInstance()获取ESClient

1
Client client = ESClientHelper.getInstance().getClient(Configs.ES_CONFIG.getCluster());

1.匹配查询

1
2
3
4
5
6
7
8
9
Client client = ESClientHelper.getInstance().getClient(Configs.ES_CONFIG.getCluster());
String index = Configs.ES_CONFIG.getIndex();
QueryBuilder qb = matchQuery("@message.deviceInfo.deviceId", deviceId);
SearchResponse response = client
.prepareSearch(index).setQuery(qb)
.setPostFilter(
QueryBuilders.rangeQuery("@message.apmLog.network.time.beginTime")
.gte(DateUtils.StringToDate(beginTime)).lte(DateUtils.StringToDate(endTime)))
.execute().actionGet();

2.根据时间段聚合查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Override
public AppReqHistoryInfo getAppReqHistoryInfoByDeviceIdAndTime(String deviceId, int hourNum)
throws ServiceException {
String day = "now-" + String.valueOf(hourNum) + "h";
String index = Configs.ES_CONFIG.getIndex();
Client client = ESClientHelper.getInstance().getClient(Configs.ES_CONFIG.getCluster());
QueryBuilder qb = matchQuery("@message.deviceInfo.deviceId", deviceId);
MultiSearchRequestBuilder req = client.prepareMultiSearch().add(client.prepareSearch(index)
.setPostFilter(boolQuery().must(qb).must(QueryBuilders.rangeQuery("@timestamp").gt(day).lt("now")))
.addAggregation(AggregationBuilders.sum("REQ_SUM").field("@message.apmLog.network.reqLength"))
.addAggregation(AggregationBuilders.sum("RSP_SUM").field("@message.apmLog.network.complete.respLength"))
.setSize(0))
.add(client.prepareSearch(index)
.setPostFilter(
boolQuery().must(qb).must(QueryBuilders.rangeQuery("@timestamp").gt(day).lt("now"))
.must(QueryBuilders.rangeQuery("@message.apmLog.network.complete.statusCode")
.gt(399).lt(999)))
.setSize(0))
.add(client.prepareSearch(index).setPostFilter(
boolQuery().must(qb).must(QueryBuilders.rangeQuery("@timestamp").gt(day).lt("now"))
.must(QueryBuilders.existsQuery("@message.apmLog.network.netError.errorDomain")))
.setSize(0));
AppReqHistoryInfo appReqHisInfo = new AppReqHistoryInfo();
MultiSearchResponse rsp;
try {
rsp = req.execute().get();
MultiSearchResponse.Item[] items = rsp.getResponses();
MultiSearchResponse.Item totalItem = items[0];
long total = totalItem.getResponse().getHits().getTotalHits();
appReqHisInfo.setRequestNum(total);
Double REQ_SUM = (Double) (totalItem.getResponse().getAggregations().get("REQ_SUM").getProperty("value"));
Double RSP_SUM = (Double) (totalItem.getResponse().getAggregations().get("RSP_SUM").getProperty("value"));
Double totalFlow = REQ_SUM + RSP_SUM;
appReqHisInfo.setTotalFlow(totalFlow.longValue());
MultiSearchResponse.Item statusCodeItem = items[1];
appReqHisInfo.setHttpError(statusCodeItem.getResponse().getHits().getTotalHits());
MultiSearchResponse.Item errorDomainItem = items[2];
appReqHisInfo.setNetError(errorDomainItem.getResponse().getHits().getTotalHits());
} catch (InterruptedException | ExecutionException e) {
logger.info("getAppReqHistoryInfoByDeviceIdAndTime exception:[{}]", e.getMessage());
throw new ServiceException("500", "getAppVisitTraceInfo exception", e.getCause());
}
return appReqHisInfo;
}

如果您觉得文章不错,可以打赏我喝一杯咖啡!