package db import ( "github.com/pkg/errors" "golang.org/x/net/context" "gopkg.in/olivere/elastic.v7" "qiniupkg.com/x/log.v7" "reflect" ) const( ERROR_PTR = "null pointer error" INPUT_TYPE_ERROR = "wrong input parameter" CREATED_ERROR = "create error" DELETE_ERROR = "delete error" INDEX_EXISTED = "index existed" ) type ElkEngine struct { cli *elastic.Client } func (p *ElkEngine)Create(index string,types string,id string,data interface{}) (error) { if nil != p{ if (reflect.TypeOf(data).Kind() == reflect.String) || (reflect.TypeOf(data).Kind() == reflect.Struct){ resp, err := p.cli.Index(). Index(index). BodyJson(data). Do(context.Background()) if err != nil { log.Print("create error",err) return err } log.Print(resp) }else{ log.Print(reflect.TypeOf(data).Kind()) return errors.New(INPUT_TYPE_ERROR) } }else{ return errors.New(ERROR_PTR) } return nil } func (p *ElkEngine)Delete(query elastic.Query,index string) error{ if nil != p{ _, err := p.cli.DeleteByQuery().Index(index).Query(query). Do(context.Background()) if err != nil { log.Print(err) return err } }else{ return errors.New(ERROR_PTR) } return nil } /* */ func (p *ElkEngine)Query(index string,query elastic.Query,data interface{}, limit int,offset int) ([]interface{},[]string,error) { if nil != p{ if(limit == 0){ res, err := p.cli.Search(index).Query(query).Do(context.Background()) if err != nil { print(err) return nil,nil,err } //var typ Employee typ := reflect.TypeOf(data) rets := res.Each(typ) id := []string{} for _,vs := range res.Hits.Hits{ id = append(id,vs.Id) } return rets,id,nil }else{ res, err := p.cli.Search(index).Query(query).Size(limit).From(limit*offset).Do(context.Background()) if err != nil { print(err) return nil,nil,err } //var typ Employee typ := reflect.TypeOf(data) rets := res.Each(typ) id := []string{} for _,vs := range res.Hits.Hits{ id = append(id,vs.Id) } return rets,id,nil } }else{ return nil,nil,errors.New(ERROR_PTR) } } func (p *ElkEngine)Update(index string,types string,id string,data map[string]interface{}) error { if nil != p { _, err := p.cli.Update(). Index(index). Id(id). Doc(data). Do(context.Background()) if err != nil { println(err.Error()) return err } } return errors.New(ERROR_PTR) } func (p *ElkEngine)CreateIndex(index string,typemaping string) error{ if nil != p { exists, err := p.cli.IndexExists(index).Do(context.Background()) if err != nil { // Handle error log.Print(err) return err } if exists{ return errors.New(INDEX_EXISTED) } createIndex, err := p.cli.CreateIndex(index).Body(typemaping).Do(context.Background()) if err != nil { log.Print(err) return err } if !createIndex.Acknowledged { return errors.New("create index error") // Not acknowledged } return nil } return errors.New(ERROR_PTR) } func (p *ElkEngine)IndexExisted(index string) (bool,error ){ if nil != p { exists, err := p.cli.IndexExists(index).Do(context.Background()) if exists{ return true,nil } if err != nil { // Handle error log.Print(err) return false,err } return false,nil } return false,nil }