tencent cloud

文档反馈

通过客户端访问集群

最后更新时间:2020-08-21 17:31:42

    官方和社区推出了各个语言版本的 SDK 方便用户使用。

    • Elasticsearch 提供了各种开发语言的客户端,例如 Java、Python 等,以满足不同开发者的需要。详情请参见 Elasticsearch Client
    • Elasticsearch 从5.6.0版本开始,发布了官方的 Java High Level REST Client,可用于执行 search、index、delete、update and bulk 操作,同 TransportClient 使用的核心 Java classes 一致,并且是为了替代 TransportClient。详情请参见 Java High Level REST Client 说明

    版本兼容性上,建议客户端版本同服务端版本保持一致,详情请参见 兼容性说明,腾讯云 ES 目前提供了多个可选的 Elasticsearch 版本,请注意选择客户端版本。

    ES 在用户的 VPC 下提供了一个访问 ES 集群的 VIP,通过负载均衡的方式挂载了 ES 集群内的所有节点。这样做的目的,一方面是为了适配集群的弹性伸缩并保证其高可用性,当 ES 集群的节点发生变化时,VIP 会自动更新节点信息;另一方面,简化用户的操作,用户不用再关注集群节点 IP、port 等信息的变更。

    不推荐通过 TransportClient 直接连接 Client 和 ES 节点。

    ES 的 SDK 支持在创建连接时只配置一个节点的地址,并通过“节点嗅探”的功能嗅探出所有的节点来连接使用。这种使用方式有悖于我们推出 VIP 功能的初衷,增加了使用 ES 集群的复杂度。下面我们会简单介绍各个编程语言 SDK 包的使用方法,并说明如何关闭“节点嗅探”功能。

    Java 客户端

    ES 官方推荐使用 Java REST 客户端连接集群并进行数据操作。Java REST Client 有 Low Level 和 High Level 两种:

    • Java Low Level REST Client:使用该客户端需要将 HTTP 请求的 body 手动拼成 JSON 格式,HTTP 响应也必须将返回的 JSON 数据手动封装成对象。
    • Java High Level REST Client:该客户端基于 Low Level 客户端实现,提供 API 解决 Low Level 客户端需要手动转换数据格式的问题。

    使用 Java High Level REST Client 访问集群,示例步骤与代码如下:

    添加 Maven 依赖

    建议 Java REST 客户端 API 版本,同 ES 集群服务端版本保持一致,腾讯云目前提供了不同版本的 ES,请注意选择相应版本的客户端。更多客户端 API 信息,可参考官方 ES API 版本兼容性说明

    <dependency>
           <groupId>org.elasticsearch.client</groupId>
           <artifactId>elasticsearch-rest-high-level-client</artifactId>
           <version>6.4.3</version>
    </dependency>
    • Client 版本需要与 ES 集群版本保持一致,否则可能会出现兼容性问题。此处的 Demo 适用于腾讯云 ES 6.4.3版本,其他版本的使用方法请参考 Java High Level REST Client
    • Java High REST Client 构建于 Java Low REST Client 之上,两种 Client 都是基于 HTTP 协议连接 ES 集群,Java High REST Client 可提供的 API 种类随着版本升级会越来越多,如果当前版本的 Java High REST Client 提供的 API 不满足需求,可以通过升级 ES 集群版本和 Client 版本解决。
    • 使用 TCP 协议连接 ES 集群的 Transport Client 官方已经不再维护,建议使用 HTTP 协议连接集群的 Java High REST Client 或者 Java Low REST Client 。详请参考 elastic 官方文档 由 TransportClient 迁移至 Java High REST Client

    示例代码

    import org.apache.http.HttpHost;
    import org.apache.http.auth.AuthScope;
    import org.apache.http.auth.UsernamePasswordCredentials;
    import org.apache.http.client.CredentialsProvider;
    import org.apache.http.impl.client.BasicCredentialsProvider;
    import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
    import org.apache.log4j.BasicConfigurator;
    import org.apache.log4j.Logger;
    import org.elasticsearch.ElasticsearchException;
    import org.elasticsearch.action.DocWriteResponse;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.action.index.IndexResponse;
    import org.elasticsearch.action.get.GetRequest;
    import org.elasticsearch.action.get.GetResponse;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.rest.RestStatus;
    
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    
    public class test_es_sdk {
        private static Logger logger = Logger.getLogger(test_es_sdk.class);
    
        public  static void main(String[]args){
            BasicConfigurator.configure();
            // 设置验证信息,填写账号及密码
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY,
                    new UsernamePasswordCredentials("user", "passwd"));
            // 初始化 RestClient, hostName 和 port 填写集群的内网 VIP 地址与端口
            RestClientBuilder builder = RestClient.builder(new HttpHost("xx.xx.xx.xx", 9200, "http"));
            // 设置认证信息
            builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                        @Override
                        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                            return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                        }
                    });
            // 设置超时时间
            builder.setMaxRetryTimeoutMillis(10000);
            // 由Low Level Client构造High Level Client
            RestHighLevelClient client = new RestHighLevelClient(builder);
    
            // 索引文档
            Map<String, Object> jsonMap = new HashMap<String, Object>();
            jsonMap.put("user", "bellen");
            jsonMap.put("name", new Date());
            jsonMap.put("message", "trying out Elasticsearch");
            IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
                    .source(jsonMap);
    
            try {
                // 获取响应结果
                IndexResponse indexResponse = client.index(indexRequest);
                String index = indexResponse.getIndex();
                String type = indexResponse.getType();
                String id = indexResponse.getId();
                long version = indexResponse.getVersion();
    
                if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                    logger.info("doc indexed, index: "+ index +", type:"+ type +",id:"+ id+",version:"+version);
                } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                    logger.info("doc updated, index: "+ index +", type:"+ type +",id:"+ id+",version:"+version);
                }
            }catch(ElasticsearchException e) {
                if (e.status() == RestStatus.CONFLICT) {
                    logger.error("version conflict");
    
                }
            }catch(Exception e){
                logger.error("execute index api failed, "+ e.toString());
            }
    
    
            // 查询文档
            GetRequest getRequest = new GetRequest(
                    "posts",
                    "doc",
                    "1");
            try {
                // 获取响应结果
                GetResponse getResponse = client.get(getRequest);
                String index = getResponse.getIndex();
                String type = getResponse.getType();
                String id = getResponse.getId();
                if (getResponse.isExists()) {
                    long version = getResponse.getVersion();
                    String sourceAsString = getResponse.getSourceAsString();
                    logger.info("get doc, index: "+ index +", type:"+ type +",id:"+ id+",version:"+version +", source:"+ sourceAsString);
                }
            }catch (ElasticsearchException e) {
                if (e.status() == RestStatus.NOT_FOUND) {
                    logger.warn("doc not found");
                }
            }
            catch(Exception e){
                logger.error("execute get api failed, "+ e.toString());
            }
    
            // 关闭客户端
            try {
                client.close();
            }catch (Exception e){
                logger.error("close rest client exception:"+ e.toString());
            }
        }
    }
    

    Python 客户端

    pip 安装

    pip install elasticsearch

    示例代码

    Elasticsearch函数的参数中设置如下3个参数关闭节点嗅探:

    • sniff_on_start=False
    • sniff_on_connection_fail=False
    • sniffer_timeout=None
    from elasticsearch import Elasticsearch
    
    es = Elasticsearch(["http://xx.xx.xx.xx:9200"],
          http_auth=('user', 'passwd'),
              sniff_on_start=False,
              sniff_on_connection_fail=False,
              sniffer_timeout=None)
    
    res = es.index(index="my_index", doc_type="my_type", id=1, body={"title": "One", "tags": ["ruby"]})
    print(res)
    res = es.get(index="my_index", doc_type="my_type", id=1)
    print(res['_source'])

    PHP 客户端

    为了避免节点的嗅探,您可以设置显示的连接池类为StaticConnectionPool不要使用节点嗅探连接池

    示例代码

    $client = ClientBuilder::create()
        ->setConnectionPool('\Elasticsearch\ConnectionPool\StaticConnectionPool', ["http://user:passwd@xx.xx.xx.xx:9200"])
        ->build();

    Go 客户端

    gopkg.in/olivere/elastic是在 Go 语言中广泛使用的一个由社区贡献的 SDK。此处的 Demo 适用于腾讯云 ES 6.4.3版本,其他版本的使用方法请参考 Elastic

    安装 elastic

    go get github.com/olivere/elastic

    示例代码

    NewClient函数的参数中设置elastic.SetSniff(false)关闭节点嗅探,设置elastic.SetHealthcheck(false)关闭对节点的健康检查。

    import (
        "context"
        "fmt"
    
        "github.com/olivere/elastic"
    )
    
    func main() {
        client, err := elastic.NewClient(elastic.SetURL("http://user:passwd@xx.xx.xx.xx:9200"),
            elastic.SetSniff(false),elastic.SetHealthcheck(false))
        if err != nil {
            panic(err)
        }
        exists, err := client.IndexExists("twitter").Do(context.Background())
        if err != nil {
            panic(err)
        }
        fmt.Println(exists)
    }
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持