package db import ( "background/utils" "encoding/json" "reflect" "github.com/pkg/errors" "golang.org/x/net/context" "gopkg.in/olivere/elastic.v7" "qiniupkg.com/x/log.v7" ) const ( ERROR_PTR = "null pointer error" INPUT_TYPE_ERROR = "wrong input parameter: " CREATED_ERROR = "create error" DELETE_ERROR = "delete error" INDEX_EXISTED = "index existed" ) type ElkEngine struct { cli *elastic.Client } func (p *ElkEngine) Create(index string, types string, id string, data interface{}) error { if nil != p { if (reflect.TypeOf(data).Kind() == reflect.String) || (reflect.TypeOf(data).Kind() == reflect.Struct) { resp, err := p.cli.Index(). Index(index). BodyJson(data). Do(context.Background()) if err != nil { log.Print("create error", err) return err } log.Print(resp) } else { log.Print(reflect.TypeOf(data).Kind()) return errors.New(INPUT_TYPE_ERROR) } } else { return errors.New(ERROR_PTR) } return nil } func (p *ElkEngine) Delete(query elastic.Query, index string) error { if nil != p { _, err := p.cli.DeleteByQuery().Index(index).Query(query). Do(context.Background()) if err != nil { log.Print(err) return err } } else { return errors.New(ERROR_PTR) } return nil } func (p *ElkEngine) QueryHighlight(index string, query elastic.Query, v interface{},hightlight *elastic.Highlight, limit int, offset int) ([]string, error) { if reflect.ValueOf(v).Kind() != reflect.Ptr { return nil, errors.New(INPUT_TYPE_ERROR + "shoulbe be Ptr") } if reflect.ValueOf(v).Elem().Kind() != reflect.Slice { return nil, errors.New(INPUT_TYPE_ERROR + "shoulbe be Slice") } if reflect.ValueOf(v).Elem().Type().Elem().Kind() != reflect.Struct { return nil, errors.New(INPUT_TYPE_ERROR + "shoulbe be Struct") } eletype := reflect.ValueOf(v).Elem().Type().Elem() obj := reflect.ValueOf(v).Elem() objAdd := make([]reflect.Value, 0) if nil != p { if limit == 0 { res, err := p.cli.Search().Index(index).Query(query).Highlight(hightlight).Do(context.TODO()) if err != nil { print(err) return nil, err } id := []string{} for _, vs := range res.Hits.Hits { log.Print(vs.Highlight) id = append(id, vs.Id) data, e := vs.Source.MarshalJSON() if nil != e { log.Print(e.Error()) } mapobj := map[string]interface{}{} mapobj["highlight"] = vs.Highlight["content"] e = json.Unmarshal(data, &mapobj) if nil != e { log.Print(e.Error()) } obj, e := utils.UnmarshalJson2StructGen(eletype, mapobj) if nil != e { log.Print(e.Error()) } objAdd = append(objAdd, reflect.ValueOf(obj)) } return id, nil } else { res, err := p.cli.Search(index).Query(query).Highlight(hightlight).Size(limit).From(limit * offset).Do(context.TODO()) if err != nil { print(err) return nil, err } id := []string{} for _, vs := range res.Hits.Hits { log.Print(vs.Highlight) id = append(id, vs.Id) data, e := vs.Source.MarshalJSON() // log.Print(string(data)) if nil != e { log.Print(e.Error()) } mapobj := map[string]interface{}{} e = json.Unmarshal(data, &mapobj) if nil != e { log.Print(e.Error()) } mapobj["content"] = "" mapobj["highlight"] = vs.Highlight["content"] obj, e := utils.UnmarshalJson2StructGen(eletype, mapobj) if nil != e { log.Print(e.Error()) } objAdd = append(objAdd, reflect.ValueOf(obj)) } addOp := reflect.Append(obj, objAdd...) obj.Set(addOp) return id, nil } } else { return nil, errors.New(ERROR_PTR) } } func (p *ElkEngine) Query(index string, query elastic.Query, v interface{}, limit int, offset int) ([]string, error) { if reflect.ValueOf(v).Kind() != reflect.Ptr { return nil, errors.New(INPUT_TYPE_ERROR + "shoulbe be Ptr") } if reflect.ValueOf(v).Elem().Kind() != reflect.Slice { return nil, errors.New(INPUT_TYPE_ERROR + "shoulbe be Slice") } if reflect.ValueOf(v).Elem().Type().Elem().Kind() != reflect.Struct { return nil, errors.New(INPUT_TYPE_ERROR + "shoulbe be Struct") } eletype := reflect.ValueOf(v).Elem().Type().Elem() obj := reflect.ValueOf(v).Elem() objAdd := make([]reflect.Value, 0) if nil != p { if limit == 0 { res, err := p.cli.Search(index).Query(query).Do(context.Background()) if err != nil { print(err) return nil, err } id := []string{} for _, vs := range res.Hits.Hits { id = append(id, vs.Id) data, e := vs.Source.MarshalJSON() if nil != e { log.Print(e.Error()) } mapobj := map[string]interface{}{} e = json.Unmarshal(data, &mapobj) if nil != e { log.Print(e.Error()) } obj, e := utils.UnmarshalJson2StructGen(eletype, mapobj) if nil != e { log.Print(e.Error()) } objAdd = append(objAdd, reflect.ValueOf(obj)) } return id, nil } else { res, err := p.cli.Search(index).Query(query).Size(limit).From(limit * offset).Do(context.Background()) if err != nil { print(err) return nil, err } id := []string{} for _, vs := range res.Hits.Hits { id = append(id, vs.Id) data, e := vs.Source.MarshalJSON() log.Print(string(data)) if nil != e { log.Print(e.Error()) } mapobj := map[string]interface{}{} e = json.Unmarshal(data, &mapobj) if nil != e { log.Print(e.Error()) } obj, e := utils.UnmarshalJson2StructGen(eletype, mapobj) if nil != e { log.Print(e.Error()) } objAdd = append(objAdd, reflect.ValueOf(obj)) } addOp := reflect.Append(obj, objAdd...) obj.Set(addOp) return id, nil } } else { return nil, errors.New(ERROR_PTR) } } func (p *ElkEngine) QueryGen(index string, query elastic.Query, typ reflect.Type, limit int, offset int) ([]interface{}, []string, error) { rets := []interface{}{} if nil != p { if limit == 0 { res, err := p.cli.Search(index).Query(query).Do(context.Background()) if err != nil { print(err) return nil, nil, err } id := []string{} for _, vs := range res.Hits.Hits { id = append(id, vs.Id) } return rets, id, nil } else { res, err := p.cli.Search(index).Query(query).Size(limit).From(limit * offset).Do(context.Background()) if err != nil { print(err) return nil, nil, err } id := []string{} for _, vs := range res.Hits.Hits { id = append(id, vs.Id) data, e := vs.Source.MarshalJSON() if nil != e { log.Print(e.Error()) } mapobj := map[string]interface{}{} e = json.Unmarshal(data, &mapobj) if nil != e { log.Print(e.Error()) } obj, e := utils.UnmarshalJson2StructGen(typ, mapobj) if nil != e { log.Print(e.Error()) } rets = append(rets, obj) } return rets, id, nil } } else { return nil, nil, errors.New(ERROR_PTR) } } func (p *ElkEngine) Update(index string, types string, id string, data map[string]interface{}) error { if nil != p { _, err := p.cli.Update(). Index(index). Id(id). Doc(data). Do(context.Background()) if err != nil { println(err.Error()) return err } } return errors.New(ERROR_PTR) } func (p *ElkEngine) CreateIndex(index string, typemaping string) error { if nil != p { exists, err := p.cli.IndexExists(index).Do(context.Background()) if err != nil { // Handle error log.Print(err) return err } if exists { return errors.New(INDEX_EXISTED) } createIndex, err := p.cli.CreateIndex(index).Body(typemaping).Do(context.Background()) if err != nil { log.Print(err) return err } if !createIndex.Acknowledged { return errors.New("create index error") // Not acknowledged } return nil } return errors.New(ERROR_PTR) } func (p *ElkEngine) IndexExisted(index string) (bool, error) { if nil != p { exists, err := p.cli.IndexExists(index).Do(context.Background()) if exists { return true, nil } if err != nil { // Handle error log.Print(err) return false, err } return false, nil } return false, nil }