基于commons-pool2的elasticsearch客户端连接池实现

在使用elasticsearch官网提供的Java High Level Rest Client来对es进行操作时,发现客户端API中没有连接池的概念,每次都要创建一个新的连接,这在高并发情况下明显性能会存在影响,因此这里准备对客户端API增加池化的概念。

幸运的是,我们不需要自己重头开始写连接池的实现,因为apache为我们提供了连接池的通用框架实现commons-pool2,而我们只需要根据框架设计简单实现一些逻辑即可。redis客户端API中所用到的JedisPool就是基于commons-pool2实现的。

下面我们就看看怎么实现。

首先我们要创建一个池类,这个池通过依赖的方式引入commons-pool2中的GenericObjectPool。在这个类中,我们定义了如何从池中借对象和返回对象。

public class Pool<T> implements Cloneable {

  protected GenericObjectPool<T> internalPool ;

  public Pool(){
    super();
  }

  public Pool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory){
    initPool(poolConfig, factory);
  }

  public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {

    if (this.internalPool != null) {
      try {
        closeInternalPool();
      } catch (Exception e) {
      }
    }

    this.internalPool = new GenericObjectPool<T>(factory, poolConfig);
  }

  protected void closeInternalPool(){
    try {
      internalPool.close();
    } catch (Exception e) {
      throw new ElasticSearchException("Could not destroy the pool", e);
    }
  }

  public T getResource(){
    try {
      return internalPool.borrowObject();
    } catch (Exception e) {
      throw new ElasticSearchConnectException("Could not get a resource from the pool", e);
    }
  }


  public void returnResource(final T resource){
    if (resource != null) {
      returnResourceObject(resource);
    }
  }

  private void returnResourceObject(final T resource){
    if (resource == null) {
      return;
    }
    try {
      internalPool.returnObject(resource);
    } catch (Exception e) {
      throw new ElasticSearchException("Could not return the resource to the pool", e);
    }
  }

  public void returnBrokenResource(final T resource){
    if (resource != null) {
      returnBrokenResourceObject(resource);
    }
  }

  private void returnBrokenResourceObject(T resource) {
    try {
      internalPool.invalidateObject(resource);
    } catch (Exception e) {
      throw new ElasticSearchException("Could not return the resource to the pool", e);
    }
  }

  public void destroy(){
    closeInternalPool();
  }


  public int getNumActive() {
    if (poolInactive()) {
      return -1;
    }

    return this.internalPool.getNumActive();
  }

  public int getNumIdle() {
    if (poolInactive()) {
      return -1;
    }

    return this.internalPool.getNumIdle();
  }

  public int getNumWaiters() {
    if (poolInactive()) {
      return -1;
    }

    return this.internalPool.getNumWaiters();
  }

  public long getMeanBorrowWaitTimeMillis() {
    if (poolInactive()) {
      return -1;
    }

    return this.internalPool.getMeanBorrowWaitTimeMillis();
  }

  public long getMaxBorrowWaitTimeMillis() {
    if (poolInactive()) {
      return -1;
    }

    return this.internalPool.getMaxBorrowWaitTimeMillis();
  }

  private boolean poolInactive() {
    return this.internalPool == null || this.internalPool.isClosed();
  }

  public void addObjects(int count) throws Exception {
    try {
      for (int i = 0; i < count; i++) {
        this.internalPool.addObject();
      }
    } catch (Exception e) {
      throw new Exception("Error trying to add idle objects", e);
    }
  }
}

然后创建ES的连接池:

public class ElasticSearchPool extends Pool<RestHighLevelClient> {

  private String clusterName;
  private Set<HostAndPort> clusterNodes;

  public ElasticSearchPool(ElasticSearchPoolConfig config){
    super(config, new ElasticSearchClientFactory(config.getClusterName(), config.getNodes()));
    this.clusterName = clusterName;
    this.clusterNodes = clusterNodes;
  }

  public String getClusterName() {
    return clusterName;
  }

  public void setClusterName(String clusterName) {
    this.clusterName = clusterName;
  }

  public Set<HostAndPort> getClusterNodes() {
    return clusterNodes;
  }

  public void setClusterNodes(Set<HostAndPort> clusterNodes) {
    this.clusterNodes = clusterNodes;
  }
  
}

紧接着,对于连接池我们需要配置连接池的属性,因此定义了一个ES的连接池配置类,在apache提供的commons-pool2中已经提供了一个池基本属性配置的类GenericObjectPoolConfig,我们可以直接继承此类。

public class ElasticSearchPoolConfig extends GenericObjectPoolConfig {

  private long connectTimeMillis;

  private String clusterName;

  Set<HostAndPort> nodes = new HashSet<HostAndPort>();

  public long getConnectTimeMillis() {
    return connectTimeMillis;
  }

  public void setConnectTimeMillis(long connectTimeMillis) {
    this.connectTimeMillis = connectTimeMillis;
  }

  public String getClusterName() {
    return clusterName;
  }

  public void setClusterName(String clusterName) {
    this.clusterName = clusterName;
  }

  public Set<HostAndPort> getNodes() {
    return nodes;
  }

  public void setNodes(Set<HostAndPort> nodes) {
    this.nodes = nodes;
  }
}

最后,我们还需要做的是给这个池提供一个工厂类,用于创建池中的对象和回收对象,我们只要实现PooledObjectFactory接口并实现接口中的几个方法即可。

public class ElasticSearchClientFactory implements PooledObjectFactory<RestHighLevelClient> {

  private AtomicReference<Set<HostAndPort>> nodesReference = new AtomicReference<Set<HostAndPort>>();

  private String clusterName;

  public ElasticSearchClientFactory(String clusterName, Set<HostAndPort> clusterNodes){
    this.clusterName = clusterName;
    this.nodesReference.set(clusterNodes);
  }

  public PooledObject<RestHighLevelClient> makeObject() throws Exception {
    HttpHost[] nodes = new HttpHost[nodesReference.get().size()];
    List<HttpHost> nodeList = new ArrayList<HttpHost>();
    for(HostAndPort each: nodesReference.get()){
      nodeList.add(new HttpHost(each.getHost(),each.getPort(),each.getSchema()));
    }
    nodes = nodeList.toArray(nodes);
    RestClientBuilder clientBuilder = RestClient.builder(nodes);
    RestHighLevelClient client = new RestHighLevelClient(clientBuilder);
    return new DefaultPooledObject(client);
  }

  public void destroyObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
    RestHighLevelClient client = pooledObject.getObject();
    if(client!=null&&client.ping()){
      try {
        client.close();
      }catch (Exception e){
        //ignore
      }
    }
  }

  public boolean validateObject(PooledObject<RestHighLevelClient> pooledObject) {
    RestHighLevelClient client = pooledObject.getObject();
    try {
      return client.ping();
    }catch(Exception e){
      return false;
    }
  }

  public void activateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
    RestHighLevelClient client = pooledObject.getObject();
    boolean response = client.ping();
  }

  public void passivateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
    //nothing
  }


}

至此,一个简单的ElaticSearch Rest Client API的连接池便完成了,测试代码如下:

public class ElasticSearchPoolTest {

  public static void main(String[] args) throws Exception {
    Set<HostAndPort> nodes = new HashSet<HostAndPort>();
    nodes.add(new HostAndPort("172.31.4.14:9200","172.31.4.14",9200,"http"));
    ElasticSearchPoolConfig config = new ElasticSearchPoolConfig();
    config.setConnectTimeMillis(8000);
    config.setMaxTotal(100);
    config.setClusterName("elasticsearch");
    config.setNodes(nodes);
    ElasticSearchPool pool = new ElasticSearchPool(config);

    long start = System.currentTimeMillis();
    for(int i=0;i<1000;i++){
      RestHighLevelClient client = pool.getResource();
      boolean response = client.ping();
      pool.returnResource(client);
    }
    long end = System.currentTimeMillis();
    System.out.println("耗时(ms):"+(end-start));

  }

}

备注:上面实现了一个简单的连接池,但是考虑到使用这个连接池时我们每次都要自己获取资源和释放资源,并不是十分方便,因此,一般还会在这个基础上进一步封装,将获取资源和释放资源的逻辑隐藏起来,可以参见我提交到github上的项目,访问地址如下:https://github.com/andamajing/elasticsearch-pool