background/db/elasticEngine.go

321 lines
7.8 KiB
Go

package db
import (
"background/utils"
"encoding/json"
"reflect"
"github.com/pkg/errors"
"golang.org/x/net/context"
"gopkg.in/olivere/elastic.v7"
"qiniupkg.com/x/log.v7"
)
const (
ERROR_PTR = "null pointer error"
INPUT_TYPE_ERROR = "wrong input parameter: "
CREATED_ERROR = "create error"
DELETE_ERROR = "delete error"
INDEX_EXISTED = "index existed"
)
type ElkEngine struct {
cli *elastic.Client
}
func (p *ElkEngine) Create(index string, types string, id string, data interface{}) error {
if nil != p {
if (reflect.TypeOf(data).Kind() == reflect.String) || (reflect.TypeOf(data).Kind() == reflect.Struct) {
resp, err := p.cli.Index().
Index(index).
BodyJson(data).
Do(context.Background())
if err != nil {
log.Print("create error", err)
return err
}
log.Print(resp)
} else {
log.Print(reflect.TypeOf(data).Kind())
return errors.New(INPUT_TYPE_ERROR)
}
} else {
return errors.New(ERROR_PTR)
}
return nil
}
func (p *ElkEngine) Delete(query elastic.Query, index string) error {
if nil != p {
_, err := p.cli.DeleteByQuery().Index(index).Query(query).
Do(context.Background())
if err != nil {
log.Print(err)
return err
}
} else {
return errors.New(ERROR_PTR)
}
return nil
}
func (p *ElkEngine) QueryHighlight(index string, query elastic.Query, v interface{}, hightlight *elastic.Highlight,
limit int, offset int) ([]string, error) {
if reflect.ValueOf(v).Kind() != reflect.Ptr {
return nil, errors.New(INPUT_TYPE_ERROR + "shoulbe be Ptr")
}
if reflect.ValueOf(v).Elem().Kind() != reflect.Slice {
return nil, errors.New(INPUT_TYPE_ERROR + "shoulbe be Slice")
}
if reflect.ValueOf(v).Elem().Type().Elem().Kind() != reflect.Struct {
return nil, errors.New(INPUT_TYPE_ERROR + "shoulbe be Struct")
}
eletype := reflect.ValueOf(v).Elem().Type().Elem()
obj := reflect.ValueOf(v).Elem()
objAdd := make([]reflect.Value, 0)
if nil != p {
if limit == 0 {
res, err := p.cli.Search().Index(index).Query(query).Highlight(hightlight).Do(context.TODO())
if err != nil {
print(err)
return nil, err
}
id := []string{}
for _, vs := range res.Hits.Hits {
id = append(id, vs.Id)
data, e := vs.Source.MarshalJSON()
if nil != e {
log.Print(e.Error())
}
mapobj := map[string]interface{}{}
mapobj["highlight"] = vs.Highlight["content"]
e = json.Unmarshal(data, &mapobj)
if nil != e {
log.Print(e.Error())
}
obj, e := utils.UnmarshalJson2StructGen(eletype, mapobj)
if nil != e {
log.Print(e.Error())
}
objAdd = append(objAdd, reflect.ValueOf(obj))
}
return id, nil
} else {
res, err := p.cli.Search(index).Query(query).Highlight(hightlight).Size(limit).From(limit * offset).Do(context.TODO())
if err != nil {
print(err)
return nil, err
}
id := []string{}
for _, vs := range res.Hits.Hits {
id = append(id, vs.Id)
data, e := vs.Source.MarshalJSON()
// log.Print(string(data))
if nil != e {
log.Print(e.Error())
}
mapobj := map[string]interface{}{}
e = json.Unmarshal(data, &mapobj)
if nil != e {
log.Print(e.Error())
}
mapobj["content"] = ""
mapobj["highlight"] = vs.Highlight["content"]
obj, e := utils.UnmarshalJson2StructGen(eletype, mapobj)
if nil != e {
log.Print(e.Error())
}
objAdd = append(objAdd, reflect.ValueOf(obj))
}
addOp := reflect.Append(obj, objAdd...)
obj.Set(addOp)
return id, nil
}
} else {
return nil, errors.New(ERROR_PTR)
}
}
func (p *ElkEngine) Query(index string, query elastic.Query, v interface{},
limit int, offset int) ([]string, error) {
if reflect.ValueOf(v).Kind() != reflect.Ptr {
return nil, errors.New(INPUT_TYPE_ERROR + "shoulbe be Ptr")
}
if reflect.ValueOf(v).Elem().Kind() != reflect.Slice {
return nil, errors.New(INPUT_TYPE_ERROR + "shoulbe be Slice")
}
if reflect.ValueOf(v).Elem().Type().Elem().Kind() != reflect.Struct {
return nil, errors.New(INPUT_TYPE_ERROR + "shoulbe be Struct")
}
eletype := reflect.ValueOf(v).Elem().Type().Elem()
obj := reflect.ValueOf(v).Elem()
objAdd := make([]reflect.Value, 0)
if nil != p {
if limit == 0 {
res, err := p.cli.Search(index).Query(query).Do(context.Background())
if err != nil {
print(err)
return nil, err
}
id := []string{}
for _, vs := range res.Hits.Hits {
id = append(id, vs.Id)
data, e := vs.Source.MarshalJSON()
if nil != e {
log.Print(e.Error())
}
mapobj := map[string]interface{}{}
e = json.Unmarshal(data, &mapobj)
if nil != e {
log.Print(e.Error())
}
obj, e := utils.UnmarshalJson2StructGen(eletype, mapobj)
if nil != e {
log.Print(e.Error())
}
objAdd = append(objAdd, reflect.ValueOf(obj))
}
return id, nil
} else {
res, err := p.cli.Search(index).Query(query).Size(limit).From(limit * offset).Do(context.Background())
if err != nil {
log.Print(err.Error())
return nil, err
}
id := []string{}
for _, vs := range res.Hits.Hits {
id = append(id, vs.Id)
data, e := vs.Source.MarshalJSON()
log.Print(string(data))
if nil != e {
log.Print(e.Error())
}
mapobj := map[string]interface{}{}
e = json.Unmarshal(data, &mapobj)
if nil != e {
log.Print(e.Error())
}
obj, e := utils.UnmarshalJson2StructGen(eletype, mapobj)
if nil != e {
log.Print(e.Error())
}
objAdd = append(objAdd, reflect.ValueOf(obj))
}
addOp := reflect.Append(obj, objAdd...)
obj.Set(addOp)
return id, nil
}
} else {
return nil, errors.New(ERROR_PTR)
}
}
func (p *ElkEngine) QueryGen(index string, query elastic.Query, typ reflect.Type,
limit int, offset int) ([]interface{}, []string, error) {
rets := []interface{}{}
if nil != p {
if limit == 0 {
res, err := p.cli.Search(index).Query(query).Do(context.Background())
if err != nil {
print(err)
return nil, nil, err
}
id := []string{}
for _, vs := range res.Hits.Hits {
id = append(id, vs.Id)
}
return rets, id, nil
} else {
res, err := p.cli.Search(index).Query(query).Size(limit).From(limit * offset).Do(context.Background())
if err != nil {
print(err)
return nil, nil, err
}
id := []string{}
for _, vs := range res.Hits.Hits {
id = append(id, vs.Id)
data, e := vs.Source.MarshalJSON()
if nil != e {
log.Print(e.Error())
}
mapobj := map[string]interface{}{}
e = json.Unmarshal(data, &mapobj)
if nil != e {
log.Print(e.Error())
}
obj, e := utils.UnmarshalJson2StructGen(typ, mapobj)
if nil != e {
log.Print(e.Error())
}
rets = append(rets, obj)
}
return rets, id, nil
}
} else {
return nil, nil, errors.New(ERROR_PTR)
}
}
func (p *ElkEngine) Update(index string, types string, id string, data map[string]interface{}) error {
if nil != p {
_, err := p.cli.Update().
Index(index).
Id(id).
Doc(data).
Do(context.Background())
if err != nil {
println(err.Error())
return err
}
}
return errors.New(ERROR_PTR)
}
func (p *ElkEngine) CreateIndex(index string, typemaping string) error {
if nil != p {
exists, err := p.cli.IndexExists(index).Do(context.Background())
if err != nil {
// Handle error
log.Print(err)
return err
}
if exists {
return errors.New(INDEX_EXISTED)
}
createIndex, err := p.cli.CreateIndex(index).Body(typemaping).Do(context.Background())
if err != nil {
log.Print(err)
return err
}
if !createIndex.Acknowledged {
return errors.New("create index error")
// Not acknowledged
}
return nil
}
return errors.New(ERROR_PTR)
}
func (p *ElkEngine) IndexExisted(index string) (bool, error) {
if nil != p {
exists, err := p.cli.IndexExists(index).Do(context.Background())
if exists {
return true, nil
}
if err != nil {
// Handle error
log.Print(err)
return false, err
}
return false, nil
}
return false, nil
}