Skip to content

Commit

Permalink
Merge branch 'master' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
ndyakov authored Mar 6, 2025
2 parents 50b6d55 + 162a154 commit f55a990
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 1 deletion.
9 changes: 9 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ type Error interface {

var _ Error = proto.RedisError("")

func isContextError(err error) bool {
switch err {
case context.Canceled, context.DeadlineExceeded:
return true
default:
return false
}
}

func shouldRetry(err error, retryTimeout bool) bool {
switch err {
case io.EOF, io.ErrUnexpectedEOF:
Expand Down
4 changes: 3 additions & 1 deletion osscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,9 @@ func (c *ClusterClient) processPipelineNode(
_ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
cn, err := node.Client.getConn(ctx)
if err != nil {
node.MarkAsFailing()
if !isContextError(err) {
node.MarkAsFailing()
}
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
setCmdsErr(cmds, err)
return err
Expand Down
33 changes: 33 additions & 0 deletions osscluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,39 @@ var _ = Describe("ClusterClient", func() {
AfterEach(func() {})

assertPipeline()

It("doesn't fail node with context.Canceled error", func() {
ctx, cancel := context.WithCancel(context.Background())
cancel()
pipe.Set(ctx, "A", "A_value", 0)
_, err := pipe.Exec(ctx)

Expect(err).To(HaveOccurred())
Expect(errors.Is(err, context.Canceled)).To(BeTrue())

clientNodes, _ := client.Nodes(ctx, "A")

for _, node := range clientNodes {
Expect(node.Failing()).To(BeFalse())
}
})

It("doesn't fail node with context.DeadlineExceeded error", func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()

pipe.Set(ctx, "A", "A_value", 0)
_, err := pipe.Exec(ctx)

Expect(err).To(HaveOccurred())
Expect(errors.Is(err, context.DeadlineExceeded)).To(BeTrue())

clientNodes, _ := client.Nodes(ctx, "A")

for _, node := range clientNodes {
Expect(node.Failing()).To(BeFalse())
}
})
})

Describe("with TxPipeline", func() {
Expand Down

0 comments on commit f55a990

Please sign in to comment.