添加mysql 到elasticsearch的自动迁移功能

master
a7458969 2020-04-30 17:33:47 +08:00
parent 9fea690418
commit 77896b9f65
3 changed files with 162 additions and 31 deletions

61
main.go
View File

@ -19,36 +19,7 @@ var (
fileController = controller.FileController{} fileController = controller.FileController{}
) )
func InitConfig() {
e := config.Init("user.yaml")
if nil != e {
log.Println(e.Error())
}
}
func InitMysql() {
c := config.GetMysqlConfig()
if c == nil {
logs.Error("cannnot connect mysql server")
} else {
db.Init()
}
}
func InitRedis() {
e := config.InitRedis()
if nil != e {
logs.Error(e.Error())
return
}
}
func InitElasticSearch(){
e := db.GetElastic().CreateIndex("hardware",model.HardwareTypeMapping())
if nil != e{
}
}
func InitLogs() {
logs.Init(config.GetLogConfig().Dir, config.GetLogConfig().File, config.GetLogConfig().Level, config.GetLogConfig().SaveFile)
}
func CORSMiddleware(c *gin.Context) { func CORSMiddleware(c *gin.Context) {
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE, UPDATE") c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE, UPDATE")
if config.ApiConfig().RunMode == "release"{ if config.ApiConfig().RunMode == "release"{
@ -75,10 +46,40 @@ func CORSMiddleware(c *gin.Context) {
} }
} }
func InitConfig() {
e := config.Init("user.yaml")
if nil != e {
log.Println(e.Error())
}
}
func InitMysql() {
c := config.GetMysqlConfig()
if c == nil {
logs.Error("cannnot connect mysql server")
} else {
db.Init()
}
}
func InitRedisConfig() {
e := config.InitRedis()
if nil != e {
logs.Error(e.Error())
return
}
}
func InitElasticSearch(){
e := db.GetElastic().CreateIndex("hardware",model.HardwareTypeMapping())
if nil != e{
}
}
func InitLogs() {
logs.Init(config.GetLogConfig().Dir, config.GetLogConfig().File, config.GetLogConfig().Level, config.GetLogConfig().SaveFile)
}
func main() { func main() {
InitConfig() InitConfig()
InitLogs() InitLogs()
InitRedis() InitRedisConfig()
InitMysql() InitMysql()
InitElasticSearch() InitElasticSearch()

View File

@ -1,4 +1,82 @@
package model package model
import (
"background/db"
"github.com/go-openapi/errors"
json "github.com/json-iterator/go"
"qiniupkg.com/x/log.v7"
"strings"
"ubntgo/logger"
)
type Field struct {
Field string `sql:"Field"`
Type string `sql:"Type"`
Key string `sql:"Key"`
}
func MysqlToElasticSearchMapping(types string,Key string) string{
if Key == "PRI"{
return "keyword"
}
if(strings.Contains(types,"int(")){
return "interger"
}
if(strings.Contains(types,"longblob")){
return "text"
}
if(strings.Contains(types,"varchar")){
return "text"
}
if(strings.Contains(types,"datetime")){
return "date"
}
return ""
}
/*
`"mappings":{
"hardware":{
"properties":{
"id":{"type":"keyword"},
"name":{"type":"keyword"},
"desc":{"type":"text"},
"pic":{"type":"doc"},
"doc":{"type":"doc"}
}
}
}`
*/
// 不同类型db之间进行缓存 // 不同类型db之间进行缓存
func PortDocumentToElasticsearch(tblname string ) error{
columns := []Field{}
e := db.GetMysqlClient().Query2("describe " + tblname,&columns)
if nil != e{
logger.Debug(e.Error())
return e
}
if existed,_ := db.GetElastic().IndexExisted("doc");existed{
return errors.New(203,"data existed")
}
mapping := map[string]interface{}{}
indexprops := map[string]interface{}{}
mapping["mapping"] = map[string]interface{}{}
mapping["mapping"].(map[string]interface{})[tblname] = map[string]interface{}{}
mapping["mapping"].(map[string]interface{})[tblname].(map[string]interface{})["properties"] = indexprops
for _,v := range columns{
indexprops[v.Field] = map[string]string{};
indexprops[v.Field].(map[string]string)["type"] = MysqlToElasticSearchMapping(v.Type,v.Key)
}
dat,e := json.Marshal(mapping)
if nil != e{
log.Print(e.Error())
}
e = db.GetElastic().CreateIndex(tblname,string(dat))
if nil != e{
log.Print(e.Error())
}
return nil
}

52
test/portData_test.go Normal file
View File

@ -0,0 +1,52 @@
package test
import (
"background/config"
"background/db"
"background/logs"
"background/model"
"log"
"testing"
)
func InitConfig() {
e := config.Init("user.yaml")
if nil != e {
log.Println(e.Error())
}
}
func InitMysql() {
c := config.GetMysqlConfig()
if c == nil {
logs.Error("cannnot connect mysql server")
} else {
db.Init()
}
}
func InitRedisConfig() {
e := config.InitRedis()
if nil != e {
logs.Error(e.Error())
return
}
}
func InitElasticSearch(){
e := db.GetElastic().CreateIndex("hardware",model.HardwareTypeMapping())
if nil != e{
}
}
func InitLogs() {
logs.Init(config.GetLogConfig().Dir, config.GetLogConfig().File, config.GetLogConfig().Level, config.GetLogConfig().SaveFile)
}
func TestPortDocToElastic(t *testing.T) {
InitConfig()
InitLogs()
InitRedisConfig()
InitMysql()
InitElasticSearch()
db.InitELK()
e := model.PortDocumentToElasticsearch("doc")
if nil != e{
t.Error(e)
}
}