141 lines
3.0 KiB
Go
141 lines
3.0 KiB
Go
package redis
|
|
|
|
import (
|
|
"gopkg.in/redis.v3/internal/hashtag"
|
|
"gopkg.in/redis.v3/internal/pool"
|
|
)
|
|
|
|
// ClusterPipeline is not thread-safe.
|
|
type ClusterPipeline struct {
|
|
commandable
|
|
|
|
cluster *ClusterClient
|
|
|
|
cmds []Cmder
|
|
closed bool
|
|
}
|
|
|
|
// Pipeline creates a new pipeline which is able to execute commands
|
|
// against multiple shards. It's NOT safe for concurrent use by
|
|
// multiple goroutines.
|
|
func (c *ClusterClient) Pipeline() *ClusterPipeline {
|
|
pipe := &ClusterPipeline{
|
|
cluster: c,
|
|
cmds: make([]Cmder, 0, 10),
|
|
}
|
|
pipe.commandable.process = pipe.process
|
|
return pipe
|
|
}
|
|
|
|
func (c *ClusterClient) Pipelined(fn func(*ClusterPipeline) error) ([]Cmder, error) {
|
|
pipe := c.Pipeline()
|
|
if err := fn(pipe); err != nil {
|
|
return nil, err
|
|
}
|
|
cmds, err := pipe.Exec()
|
|
_ = pipe.Close()
|
|
return cmds, err
|
|
}
|
|
|
|
func (pipe *ClusterPipeline) process(cmd Cmder) {
|
|
pipe.cmds = append(pipe.cmds, cmd)
|
|
}
|
|
|
|
// Discard resets the pipeline and discards queued commands.
|
|
func (pipe *ClusterPipeline) Discard() error {
|
|
if pipe.closed {
|
|
return pool.ErrClosed
|
|
}
|
|
pipe.cmds = pipe.cmds[:0]
|
|
return nil
|
|
}
|
|
|
|
func (pipe *ClusterPipeline) Exec() (cmds []Cmder, retErr error) {
|
|
if pipe.closed {
|
|
return nil, pool.ErrClosed
|
|
}
|
|
if len(pipe.cmds) == 0 {
|
|
return []Cmder{}, nil
|
|
}
|
|
|
|
cmds = pipe.cmds
|
|
pipe.cmds = make([]Cmder, 0, 10)
|
|
|
|
cmdsMap := make(map[string][]Cmder)
|
|
for _, cmd := range cmds {
|
|
slot := hashtag.Slot(cmd.clusterKey())
|
|
addr := pipe.cluster.slotMasterAddr(slot)
|
|
cmdsMap[addr] = append(cmdsMap[addr], cmd)
|
|
}
|
|
|
|
for attempt := 0; attempt <= pipe.cluster.opt.getMaxRedirects(); attempt++ {
|
|
failedCmds := make(map[string][]Cmder)
|
|
|
|
for addr, cmds := range cmdsMap {
|
|
client, err := pipe.cluster.getClient(addr)
|
|
if err != nil {
|
|
setCmdsErr(cmds, err)
|
|
retErr = err
|
|
continue
|
|
}
|
|
|
|
cn, err := client.conn()
|
|
if err != nil {
|
|
setCmdsErr(cmds, err)
|
|
retErr = err
|
|
continue
|
|
}
|
|
|
|
failedCmds, err = pipe.execClusterCmds(cn, cmds, failedCmds)
|
|
if err != nil {
|
|
retErr = err
|
|
}
|
|
client.putConn(cn, err, false)
|
|
}
|
|
|
|
cmdsMap = failedCmds
|
|
}
|
|
|
|
return cmds, retErr
|
|
}
|
|
|
|
// Close closes the pipeline, releasing any open resources.
|
|
func (pipe *ClusterPipeline) Close() error {
|
|
pipe.Discard()
|
|
pipe.closed = true
|
|
return nil
|
|
}
|
|
|
|
func (pipe *ClusterPipeline) execClusterCmds(
|
|
cn *pool.Conn, cmds []Cmder, failedCmds map[string][]Cmder,
|
|
) (map[string][]Cmder, error) {
|
|
if err := writeCmd(cn, cmds...); err != nil {
|
|
setCmdsErr(cmds, err)
|
|
return failedCmds, err
|
|
}
|
|
|
|
var firstCmdErr error
|
|
for i, cmd := range cmds {
|
|
err := cmd.readReply(cn)
|
|
if err == nil {
|
|
continue
|
|
}
|
|
if isNetworkError(err) {
|
|
cmd.reset()
|
|
failedCmds[""] = append(failedCmds[""], cmds[i:]...)
|
|
break
|
|
} else if moved, ask, addr := isMovedError(err); moved {
|
|
pipe.cluster.lazyReloadSlots()
|
|
cmd.reset()
|
|
failedCmds[addr] = append(failedCmds[addr], cmd)
|
|
} else if ask {
|
|
cmd.reset()
|
|
failedCmds[addr] = append(failedCmds[addr], NewCmd("ASKING"), cmd)
|
|
} else if firstCmdErr == nil {
|
|
firstCmdErr = err
|
|
}
|
|
}
|
|
|
|
return failedCmds, firstCmdErr
|
|
}
|