180 lines
4.3 KiB
Go
180 lines
4.3 KiB
Go
package handlers
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/keepalive"
|
|
|
|
"github.com/winc-link/hummingbird/internal/pkg/container"
|
|
"github.com/winc-link/hummingbird/internal/pkg/di"
|
|
"github.com/winc-link/hummingbird/internal/pkg/limit"
|
|
//"gitlab.com/tedge/edgex/internal/pkg/container"
|
|
//"gitlab.com/tedge/edgex/internal/pkg/di"
|
|
//"gitlab.com/tedge/edgex/internal/pkg/limit"
|
|
)
|
|
|
|
var basepath string
|
|
|
|
func init() {
|
|
_, currentFile, _, _ := runtime.Caller(0)
|
|
basepath = filepath.Dir(currentFile)
|
|
}
|
|
|
|
// Path returns the absolute path the given relative file or directory path,
|
|
// relative to the google.golang.org/grpc/examples/data directory in the
|
|
// user's GOPATH. If rel is already absolute, it is returned unmodified.
|
|
func path(rel string) string {
|
|
if filepath.IsAbs(rel) {
|
|
return rel
|
|
}
|
|
|
|
return filepath.Join(basepath, rel)
|
|
}
|
|
|
|
var kaep = keepalive.EnforcementPolicy{
|
|
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
|
|
PermitWithoutStream: true, // Allow pings even when there are no active streams
|
|
}
|
|
|
|
type RegisterFunc func(serve *grpc.Server)
|
|
|
|
type RPCServer struct {
|
|
S *grpc.Server
|
|
isRunning bool
|
|
}
|
|
|
|
var (
|
|
limitService limit.LimitService
|
|
methodLimitMap sync.Map
|
|
serverRpcLog = os.Getenv(SERVER_RPC_LOG_ENABLE)
|
|
serverTimeout time.Duration
|
|
)
|
|
|
|
func methodLimit(method string, lmc limit.LimitMethodConf) bool {
|
|
_, exist := lmc.GetLimitMethods()[method]
|
|
return exist
|
|
}
|
|
|
|
func requestLimit(ctx context.Context, method string) error {
|
|
value, exist := methodLimitMap.Load(method)
|
|
if !exist {
|
|
value = limitService.Clone()
|
|
}
|
|
var srv = value.(limit.LimitService)
|
|
var cancel context.CancelFunc
|
|
ctx, cancel = context.WithTimeout(ctx, serverTimeout)
|
|
defer cancel()
|
|
if err := srv.ConsumeWithContext(ctx); err != nil {
|
|
return err
|
|
}
|
|
methodLimitMap.Store(method, value)
|
|
return nil
|
|
}
|
|
|
|
const (
|
|
// 默认每个rpc接口100ms放入5个请求
|
|
DefaultLimitTime = 100
|
|
DefaultBurst = 5
|
|
// 默认服务端超时时间为5s
|
|
DefaultServiceTimeout = 5 * time.Second
|
|
SERVER_RPC_LOG_ENABLE = "SERVER_RPC_LOG_ENABLE"
|
|
)
|
|
|
|
func NewRPCServer(ctx context.Context, wg *sync.WaitGroup, dic *di.Container, register RegisterFunc) (*RPCServer, error) {
|
|
var rs RPCServer
|
|
lc := container.LoggingClientFrom(dic.Get)
|
|
rpcConfig := container.ConfigurationFrom(dic.Get).GetBootstrap().RpcServer
|
|
if rpcConfig.Address == "" {
|
|
lc.Error("required rpc address")
|
|
return nil, errors.New("required rpc address")
|
|
}
|
|
|
|
lis, err := net.Listen("tcp", rpcConfig.Address)
|
|
if err != nil {
|
|
lc.Errorf("failed to listen: %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
var (
|
|
//limitTime int64 = DefaultLimitTime
|
|
//burst = DefaultBurst
|
|
)
|
|
//if rpcConfig.LimitSetting.LimitTime > 0 {
|
|
// limitTime = rpcConfig.LimitSetting.LimitTime
|
|
//}
|
|
//if rpcConfig.LimitSetting.Burst > 0 {
|
|
// burst = rpcConfig.LimitSetting.Burst
|
|
//}
|
|
if rpcConfig.Timeout > 0 {
|
|
serverTimeout = time.Duration(rpcConfig.Timeout) * time.Millisecond
|
|
} else {
|
|
serverTimeout = DefaultServiceTimeout
|
|
}
|
|
|
|
limitService = limit.NewLimitService(limit.LimitOption{
|
|
//LimitMillisecond: limitTime,
|
|
//Burst: burst,
|
|
})
|
|
lmc := container.LimitMethodConfFrom(dic.Get)
|
|
if rpcConfig.UseTLS {
|
|
creds, err := credentials.NewServerTLSFromFile(path(rpcConfig.CertFile), path(rpcConfig.KeyFile))
|
|
if err != nil {
|
|
lc.Errorf("failed to create credentials: %v", err)
|
|
return nil, err
|
|
}
|
|
rs.S = grpc.NewServer(
|
|
grpc.Creds(creds),
|
|
grpc.KeepaliveEnforcementPolicy(kaep),
|
|
withServerInterceptor(lc, lmc, dic),
|
|
)
|
|
|
|
} else {
|
|
rs.S = grpc.NewServer(
|
|
grpc.KeepaliveEnforcementPolicy(kaep),
|
|
withServerInterceptor(lc, lmc, dic),
|
|
)
|
|
}
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
<-ctx.Done()
|
|
lc.Info("RPC server shutting down")
|
|
rs.S.Stop()
|
|
lc.Info("RPC server shut down")
|
|
}()
|
|
|
|
// registry Server
|
|
register(rs.S)
|
|
|
|
lc.Infof("RPC server starting ( %s )", rpcConfig.Address)
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer func() {
|
|
wg.Done()
|
|
rs.isRunning = false
|
|
}()
|
|
rs.isRunning = true
|
|
err = rs.S.Serve(lis)
|
|
if err != nil {
|
|
lc.Errorf("RPC server failed: %v", err)
|
|
cancel := container.CancelFuncFrom(dic.Get)
|
|
cancel()
|
|
} else {
|
|
lc.Info("RPC server stopped")
|
|
}
|
|
}()
|
|
return &rs, nil
|
|
}
|