本文转载自: https://www.cnblogs.com/wenbronk/p/6386043.html 作者:wenbronk 转载请注明该声明。

/**

* 系统环境: vm12 下的centos 7.2

* 当前安装版本: elasticsearch-2.4.0.tar.gz

*/

es 查询共有4种查询类型

QUERY_AND_FETCH:

主节点将查询请求分发到所有的分片中,各个分片按照自己的查询规则即词频文档频率进行打分排序,然后将结果返回给主节点,主节点对所有数据进行汇总排序然后再返回给客户端,此种方式只需要和es交互一次。

这种查询方式存在数据量和排序问题,主节点会汇总所有分片返回的数据这样数据量会比较大,二是各个分片上的规则可能不一致。

QUERY_THEN_FETCH:

主节点将请求分发给所有分片,各个分片打分排序后将数据的id和分值返回给主节点,主节点收到后进行汇总排序再根据排序后的id到对应的节点读取对应的数据再返回给客户端,此种方式需要和es交互两次。

这种方式解决了数据量问题但是排序问题依然存在而且是es的默认查询方式

DEF_QUERY_AND_FETCH: 和 DFS_QUERY_THEN_FETCH:

将各个分片的规则统一起来进行打分。解决了排序问题但是DFS_QUERY_AND_FETCH仍然存在数据量问题,DFS_QUERY_THEN_FETCH两种噢乖你问题都解决但是效率是最差的。

1, 获取client, 两种方式获取

@Beforepublic void before() throws Exception {Map<String, String> map = new HashMap<String, String>();  map.put("cluster.name", "elasticsearch_wenbronk");  Settings.Builder settings = Settings.builder().put(map);  client = TransportClient.builder().settings(settings).build()  .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300"))); }
@Beforepublic void before11() throws Exception {// 创建客户端, 使用的默认集群名, "elasticSearch"
//        client = TransportClient.builder().build()
//                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), 9300));// 通过setting对象指定集群配置信息, 配置的集群名Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch_wenbronk") // 设置集群名
//                .put("client.transport.sniff", true) // 开启嗅探 , 开启后会一直连接不上, 原因未知
//                .put("network.host", "192.168.50.37").put("client.transport.ignore_cluster_name", true) // 忽略集群名字验证, 打开后集群名字不对也能连接上
//                .put("client.transport.nodes_sampler_interval", 5) //报错,
//                .put("client.transport.ping_timeout", 5) // 报错, ping等待时间,
                .build();client = TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300)));// 默认5s// 多久打开连接, 默认5sSystem.out.println("success connect");}

PS: 官网给的2种方式都不能用, 需要合起来才能用, 浪费老子一下午...

其他参数的意义:

代码:

package com.wenbronk.javaes;import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkProcessor.Listener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.script.Script;
import org.junit.Before;
import org.junit.Test;import com.alibaba.fastjson.JSONObject;/*** 使用java API操作elasticSearch* * @author 231**/
public class JavaESTest {private TransportClient client;private IndexRequest source;/*** 获取连接, 第一种方式* @throws Exception*/
//    @Beforepublic void before() throws Exception {Map<String, String> map = new HashMap<String, String>();  map.put("cluster.name", "elasticsearch_wenbronk");  Settings.Builder settings = Settings.builder().put(map);  client = TransportClient.builder().settings(settings).build()  .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300"))); }

/*** 查看集群信息*/@Testpublic void testInfo() {List<DiscoveryNode> nodes = client.connectedNodes();for (DiscoveryNode node : nodes) {System.out.println(node.getHostAddress());}}/*** 组织json串, 方式1,直接拼接*/public String createJson1() {String json = "{" +"\"user\":\"kimchy\"," +"\"postDate\":\"2013-01-30\"," +"\"message\":\"trying out Elasticsearch\"" +"}";return json;}/*** 使用map创建json*/public Map<String, Object> createJson2() {Map<String,Object> json = new HashMap<String, Object>();json.put("user", "kimchy");json.put("postDate", new Date());json.put("message", "trying out elasticsearch");return json;}/*** 使用fastjson创建*/public JSONObject createJson3() {JSONObject json = new JSONObject();json.put("user", "kimchy");json.put("postDate", new Date());json.put("message", "trying out elasticsearch");return json;}/*** 使用es的帮助类*/public XContentBuilder createJson4() throws Exception {// 创建json对象, 其中一个创建json的方式XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("user", "kimchy").field("postDate", new Date()).field("message", "trying to out ElasticSearch").endObject();return source;}/*** 存入索引中* @throws Exception*/@Testpublic void test1() throws Exception {XContentBuilder source = createJson4();// 存json入索引中IndexResponse response = client.prepareIndex("twitter", "tweet", "1").setSource(source).get();
//        // 结果获取String index = response.getIndex();String type = response.getType();String id = response.getId();long version = response.getVersion();boolean created = response.isCreated();System.out.println(index + " : " + type + ": " + id + ": " + version + ": " + created);}/*** get API 获取指定文档信息*/@Testpublic void testGet() {
//        GetResponse response = client.prepareGet("twitter", "tweet", "1")
//                                .get();GetResponse response = client.prepareGet("twitter", "tweet", "1").setOperationThreaded(false)    // 线程安全.get();System.out.println(response.getSourceAsString());}/*** 测试 delete api*/@Testpublic void testDelete() {DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();String index = response.getIndex();String type = response.getType();String id = response.getId();long version = response.getVersion();System.out.println(index + " : " + type + ": " + id + ": " + version);}/*** 测试更新 update API* 使用 updateRequest 对象* @throws Exception */@Testpublic void testUpdate() throws Exception {UpdateRequest updateRequest = new UpdateRequest();updateRequest.index("twitter");updateRequest.type("tweet");updateRequest.id("1");updateRequest.doc(XContentFactory.jsonBuilder().startObject()// 对没有的字段添加, 对已有的字段替换.field("gender", "male").field("message", "hello").endObject());UpdateResponse response = client.update(updateRequest).get();// 打印String index = response.getIndex();String type = response.getType();String id = response.getId();long version = response.getVersion();System.out.println(index + " : " + type + ": " + id + ": " + version);}/*** 测试update api, 使用client* @throws Exception */@Testpublic void testUpdate2() throws Exception {// 使用Script对象进行更新
//        UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1")
//                .setScript(new Script("hits._source.gender = \"male\""))
//                .get();// 使用XContFactory.jsonBuilder() 进行更新
//        UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1")
//                .setDoc(XContentFactory.jsonBuilder()
//                        .startObject()
//                            .field("gender", "malelelele")
//                        .endObject()).get();// 使用updateRequest对象及script
//        UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")
//                .script(new Script("ctx._source.gender=\"male\""));
//        UpdateResponse response = client.update(updateRequest).get();// 使用updateRequest对象及documents进行更新UpdateResponse response = client.update(new UpdateRequest("twitter", "tweet", "1").doc(XContentFactory.jsonBuilder().startObject().field("gender", "male").endObject())).get();System.out.println(response.getIndex());}/*** 测试update* 使用updateRequest* @throws Exception * @throws InterruptedException */@Testpublic void testUpdate3() throws InterruptedException, Exception {UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1").script(new Script("ctx._source.gender=\"male\""));UpdateResponse response = client.update(updateRequest).get();}/*** 测试upsert方法* @throws Exception * */@Testpublic void testUpsert() throws Exception {// 设置查询条件, 查找不到则添加生效IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "2").source(XContentFactory.jsonBuilder().startObject().field("name", "214").field("gender", "gfrerq").endObject());// 设置更新, 查找到更新下面的设置UpdateRequest upsert = new UpdateRequest("twitter", "tweet", "2").doc(XContentFactory.jsonBuilder().startObject().field("user", "wenbronk").endObject()).upsert(indexRequest);client.update(upsert).get();}/*** 测试multi get api* 从不同的index, type, 和id中获取*/@Testpublic void testMultiGet() {MultiGetResponse multiGetResponse = client.prepareMultiGet().add("twitter", "tweet", "1").add("twitter", "tweet", "2", "3", "4").add("anothoer", "type", "foo").get();for (MultiGetItemResponse itemResponse : multiGetResponse) {GetResponse response = itemResponse.getResponse();if (response.isExists()) {String sourceAsString = response.getSourceAsString();System.out.println(sourceAsString);}}}/*** bulk 批量执行* 一次查询可以update 或 delete多个document*/@Testpublic void testBulk() throws Exception {BulkRequestBuilder bulkRequest = client.prepareBulk();bulkRequest.add(client.prepareIndex("twitter", "tweet", "1").setSource(XContentFactory.jsonBuilder().startObject().field("user", "kimchy").field("postDate", new Date()).field("message", "trying out Elasticsearch").endObject()));bulkRequest.add(client.prepareIndex("twitter", "tweet", "2").setSource(XContentFactory.jsonBuilder().startObject().field("user", "kimchy").field("postDate", new Date()).field("message", "another post").endObject()));BulkResponse response = bulkRequest.get();System.out.println(response.getHeaders());}/*** 使用bulk processor* @throws Exception */@Testpublic void testBulkProcessor() throws Exception {// 创建BulkPorcessor对象BulkProcessor bulkProcessor = BulkProcessor.builder(client, new Listener() {public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) {// TODO Auto-generated method stub
            }// 执行出错时执行public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) {// TODO Auto-generated method stub
            }public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) {// TODO Auto-generated method stub
            }})// 1w次请求执行一次bulk.setBulkActions(10000)// 1gb的数据刷新一次bulk.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))// 固定5s必须刷新一次.setFlushInterval(TimeValue.timeValueSeconds(5))// 并发请求数量, 0不并发, 1并发允许执行.setConcurrentRequests(1)// 设置退避, 100ms后执行, 最大请求3次
        .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();// 添加单次请求bulkProcessor.add(new IndexRequest("twitter", "tweet", "1"));bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));// 关闭bulkProcessor.awaitClose(10, TimeUnit.MINUTES);// 或者
        bulkProcessor.close();}
}

tes2代码:

package com.wenbronk.javaes;import java.net.InetSocketAddress;import org.apache.lucene.queryparser.xml.FilterBuilderFactory;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.sort.SortParseElement;
import org.junit.Before;
import org.junit.Test;/*** 使用java API操作elasticSearch* search API* @author 231**/
public class JavaESTest2 {private TransportClient client;/*** 获取client对象*/@Beforepublic void testBefore() {Builder builder = Settings.settingsBuilder();builder.put("cluster.name", "wenbronk_escluster");
//                .put("client.transport.ignore_cluster_name", true);Settings settings = builder.build();org.elasticsearch.client.transport.TransportClient.Builder transportBuild = TransportClient.builder();TransportClient client1 = transportBuild.settings(settings).build();client = client1.addTransportAddress((new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300))));System.out.println("success connect to escluster");}/*** 测试查询*/@Testpublic void testSearch() {
//        SearchRequestBuilder searchRequestBuilder = client.prepareSearch("twitter", "tweet", "1");
//        SearchResponse response = searchRequestBuilder.setTypes("type1", "type2")
//                            .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
//                            .setQuery(QueryBuilders.termQuery("user", "test"))
//                            .setPostFilter(QueryBuilders.rangeQuery("age").from(0).to(1))
//                            .setFrom(0).setSize(2).setExplain(true)
//                            .execute().actionGet();SearchResponse response = client.prepareSearch().execute().actionGet();
//        SearchHits hits = response.getHits();
//        for (SearchHit searchHit : hits) {
//            for(Iterator<SearchHitField> iterator = searchHit.iterator(); iterator.hasNext(); ) {
//                SearchHitField next = iterator.next();
//                System.out.println(next.getValues());
//            }
//        }System.out.println(response);}/*** 测试scroll api* 对大量数据的处理更有效*/@Testpublic void testScrolls() {QueryBuilder queryBuilder = QueryBuilders.termQuery("twitter", "tweet");SearchResponse response = client.prepareSearch("twitter").addSort(SortParseElement.DOC_FIELD_NAME, SortOrder.ASC).setScroll(new TimeValue(60000)).setQuery(queryBuilder).setSize(100).execute().actionGet();while(true) {for (SearchHit hit : response.getHits().getHits()) {System.out.println("i am coming");}SearchResponse response2 = client.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();if (response2.getHits().getHits().length == 0) {System.out.println("oh no=====");break;}}}/*** 测试multiSearch*/@Testpublic void testMultiSearch() {QueryBuilder qb1 = QueryBuilders.queryStringQuery("elasticsearch");SearchRequestBuilder requestBuilder1 = client.prepareSearch().setQuery(qb1).setSize(1);QueryBuilder qb2 = QueryBuilders.matchQuery("user", "kimchy");SearchRequestBuilder requestBuilder2 = client.prepareSearch().setQuery(qb2).setSize(1);MultiSearchResponse multiResponse = client.prepareMultiSearch().add(requestBuilder1).add(requestBuilder2).execute().actionGet();long nbHits = 0;for (MultiSearchResponse.Item item : multiResponse.getResponses()) {SearchResponse response = item.getResponse();nbHits = response.getHits().getTotalHits();SearchHit[] hits = response.getHits().getHits();System.out.println(nbHits);}}/*** 测试聚合查询*/@Testpublic void testAggregation() {SearchResponse response = client.prepareSearch().setQuery(QueryBuilders.matchAllQuery()) // 先使用query过滤掉一部分.addAggregation(AggregationBuilders.terms("term").field("user")).addAggregation(AggregationBuilders.dateHistogram("agg2").field("birth").interval(DateHistogramInterval.YEAR)).execute().actionGet();Aggregation aggregation2 = response.getAggregations().get("term");Aggregation aggregation = response.getAggregations().get("agg2");
//        SearchResponse response2 = client.search(new SearchRequest().searchType(SearchType.QUERY_AND_FETCH)).actionGet();
    }/*** 测试terminate*/@Testpublic void testTerminateAfter() {SearchResponse response = client.prepareSearch("twitter").setTerminateAfter(1000).get();if (response.isTerminatedEarly()) {System.out.println("ternimate");}}/*** 过滤查询: 大于gt, 小于lt, 小于等于lte, 大于等于gte*/@Testpublic void testFilter() {SearchResponse response = client.prepareSearch("twitter")  .setTypes("")  .setQuery(QueryBuilders.matchAllQuery()) //查询所有
                .setSearchType(SearchType.QUERY_THEN_FETCH)
//              .setPostFilter(FilterBuilders.rangeFilter("age").from(0).to(19)
//                      .includeLower(true).includeUpper(true))
//                .setPostFilter(FilterBuilderFactory .rangeFilter("age").gte(18).lte(22))  .setExplain(true) //explain为true表示根据数据相关度排序,和关键字匹配最高的排在前面  .get();  }/*** 分组查询*/@Testpublic void testGroupBy() {client.prepareSearch("twitter").setTypes("tweet").setQuery(QueryBuilders.matchAllQuery()).setSearchType(SearchType.QUERY_THEN_FETCH).addAggregation(AggregationBuilders.terms("user").field("user").size(0)        // 根据user进行分组// size(0) 也是10).get();}}

elasticsearch基本操作之--使用java操作elasticsearch相关推荐

  1. java操作elasticsearch实现query String

    1.CommonTersQuery: 指定字段进行模糊查询 //commonTermsQuery @Test public void test35() throws UnknownHostExcept ...

  2. java操作elasticsearch实现前缀查询、wildcard、fuzzy模糊查询、ids查询

    1.前缀查询(prefix) //prefix前缀查询 @Testpublic void test15() throws UnknownHostException {//1.指定es集群 cluste ...

  3. java操作elasticsearch实现批量添加数据(bulk)

    java操作elasticsearch实现批量添加主要使用了bulk 代码如下: //bulk批量操作(批量添加) @Testpublic void test7() throws IOExceptio ...

  4. Java操作Elasticsearch的所有方法

    使用Java操作Elasticsearch的所有方法 13.1 Elasticsearch简介 Elasticsearch是基于Lucene开发的一个分布式全文检索框架,向Elasticsearch中 ...

  5. (六)Java操作elasticSearch(2)

    Java操作elasticSearch(2) 一.DSL查询文档: 0.DSL: 1.DSL查询分类 2.全文检索查询 3.精准查询 4.地理坐标查询 5.组合查询 二.搜索结果的处理: 0.搜索结果 ...

  6. elasticsearch基本操作 --- 使用java操作elasticsearch

    随着大数据的兴起,面对越来越多的数据和越来越复杂的业务场景,系统对后端也提出了更高的要求,尤其是用户体验上,低延迟.快速响应已经成为检验后端程序是否高效很重要的标准,在后端的数据存储框架中,elast ...

  7. java操作ElasticSearch(es)进行增删查改操作

    ElasticSearch(名称太长,后面简称ES)作为一个搜索引擎,目前可谓是如日中天,几乎和solr齐驾并驱.关于他能做什么,跟云计算有什么关系,在此不再描述.但是ES的官方文档,特别是关于jav ...

  8. java操作elasticsearch出现:NoNodeAvailableException[None of the configured nodes are available

    使用java练习操作elasticsearch创建索引的时候报了个这个异常 抛出错误 :NoNodeAvailableException[None of the configured nodes ar ...

  9. Elasticsearch笔记五之java操作es

    Java操作es集群步骤1:配置集群对象信息:2:创建客户端:3:查看集群信息 1:集群名称 默认集群名为elasticsearch,如果集群名称和指定的不一致则在使用节点资源时会报错. 2:嗅探功能 ...

最新文章

  1. git中的fork应该这样使用
  2. 新上映的电影不在影院也一样能看到
  3. Python 面向对象编程 day7
  4. ASP.NET MVC 4中如何为不同的浏览器自适应布局和视图
  5. Ant Design入门之介绍
  6. 【视频】如何正确焊接贴片、直插元件?
  7. abstract class和interface有什么区别
  8. GridView网格布局
  9. 恭喜宿主获得鸿蒙,第四十章大殿讲道,十连抽获得鸿蒙至宝!
  10. 按下回车键自动切换焦点到下个控件
  11. asp.net电子影像相册_大连孕妈看过来 | 290元=孕中期四维排畸+孕妇写真+胎宝电子影集...
  12. 还不了解小程序?看这一篇文章就够了
  13. 人生感悟-是留丰碑还是墓碑
  14. EOS Utility无法安装的问题解决
  15. 魔改一波合成大西瓜!代码已开源~
  16. intel无线网络管理服务器,配置Intel无线网卡连接到管理帧保护(MFP) -启用网络
  17. python 12306登录_基于Python3的12306登录实现
  18. 自从用了这个良心浏览器后,我卸载了UC,太香了
  19. VS2019离线安装包下载方法
  20. 17 -> 详解 openWRT 的 gpio 配置关系说明

热门文章

  1. C语言之字符串函数一
  2. 假设mysql数据表t1有字段_Mysql 索引及优化
  3. 《动手学深度学习》(PyTorch版)代码注释 - 25 【NiN_Learning】
  4. SUV 中国造,救不了吉利的英国超跑LOTUS路特斯莲花
  5. java如何造假数据_给Prometheus造假数据的方法
  6. Win7 快捷方式变成浏览器图标解决
  7. Keil点击Project闪退
  8. 第055篇:陆地观测卫星数据服务平台高级检索方法
  9. 2020.12.15 ps临摹调色
  10. linux内核视频 网易,网易视频云技术分享:UML调试Linux内核