package db import ( "github.com/pkg/errors" "gopkg.in/olivere/elastic.v3" "qiniupkg.com/x/log.v7" "reflect" ) const( ERROR_PTR = "nullpointer error" INPUT_TYPE_ERROR = "wrong input parameter" CREATED_ERROR = "create error" DELETE_ERROR = "delete error" ) type ElkEngine struct { cli *elastic.Client } func (p *ElkEngine)Create(index string,types string,id string,data interface{}) (error) { if nil != p{ if (reflect.TypeOf(data).Kind() == reflect.String) || (reflect.TypeOf(data).Kind() == reflect.Struct){ resp, err := p.cli.Index(). Index(index). Type(types). BodyJson(data). Do() if !resp.Created{ return errors.New(CREATED_ERROR) } if err != nil { log.Print(err) return err } }else{ log.Print(reflect.TypeOf(data).Kind()) return errors.New(INPUT_TYPE_ERROR) } }else{ return errors.New(ERROR_PTR) } return nil } func (p *ElkEngine)Delete(index string,types string,id string) error{ if nil != p{ res, err := p.cli.Delete().Index(index). Type(types). Id(id). Do() if err != nil { print(err) return err } if !res.Found{ return errors.New(DELETE_ERROR) } }else{ return errors.New(ERROR_PTR) } return nil } /* */ func (p *ElkEngine)Query(index string, types string,query elastic.Query,data interface{}, limit int,offset int) ([]interface{},error) { if nil != p{ res, err := p.cli. Search(index). Type(types). Query(query).Size(limit).From(limit*offset).Do() if err != nil { print(err) return nil,err } //var typ Employee typ := reflect.TypeOf(data) return res.Each(typ),nil }else{ return nil,errors.New(ERROR_PTR) } } func (p *ElkEngine)Update(index string,types string,id string,data map[string]interface{}) error { if nil != p { _, err := p.cli.Update(). Index(index). Type(types). Id(id). Doc(data). Do() if err != nil { println(err.Error()) return err } } return errors.New(ERROR_PTR) } // 创建 elasticSearch 的 Mapping func (p *ElkEngine)InitMapping(esIndexName string, esTypeName string, typeMapping string) error{ var err error exists, err := p.cli.IndexExists(esIndexName).Do() if err != nil { log.Println("IndexExists" + err.Error()) return err } //log.Println("es index: " + esIndexName) //log.Println("es type: " + esTypeName) //log.Println("es index mapping: " + indexMapping) //log.Println("es type mapping: " + typeMapping) if !exists { log.Println("es index not exists: " + esIndexName) // Create a new index. createIndex, err := p.cli.CreateIndex(esIndexName).Body(typeMapping).Do() if err != nil { log.Println("CreateIndex" + err.Error()) return err } if !createIndex.Acknowledged { // Not acknowledged return errors.New("create index:" + esIndexName + ", not Ack nowledged") } } /** * 判断 type 是否存在 exists, err = client.TypeExists().Index(esIndexName).Type(esTypeName).Do(ctx) if err != nil { return err } if !exists { } */ // PutMapping() *IndicesPutMappingService putresp, err := p.cli.PutMapping().Index(esIndexName).Type(esTypeName).BodyString(typeMapping).Do() // 新建 mapping //indicesCreateResult, err := elastic.NewIndicesCreateService(client).Index(esIndexName).BodyString(mapping).Do(ctx) if err != nil { log.Println("NewIndicesCreateService" + err.Error()) return err } if !putresp.Acknowledged { // Not acknowledged return errors.New("create mapping fail, esIndexName:" + esIndexName + ", esTypeName:" + esTypeName + ", not Ack nowledged") } return err }