Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock on PublishMsgAsync #1801

Open
Zach-Johnson opened this issue Feb 13, 2025 · 0 comments · May be fixed by #1812
Open

Deadlock on PublishMsgAsync #1801

Zach-Johnson opened this issue Feb 13, 2025 · 0 comments · May be fixed by #1812
Assignees
Labels
defect Suspected defect such as a bug or regression

Comments

@Zach-Johnson
Copy link

Observed behavior

I have code like this:

   futures := make([]jetstream.PubAckFuture, len(b))

   for i, msg := range b {
		jsMsg := nats.NewMsg(subj)

		msgBytes, err := msg.AsBytes()
		if err != nil {
			return err
		}

		jsMsg.Data = msgBytes

		future, err := js.PublishMsgAsync(jsMsg)
		if err != nil {
			return err
		}

		futures[i] = future
	}

	timeOut := time.After(5 * time.Second)

	select {
	case <-js.PublishAsyncComplete():
		for _, future := range futures {
			select {
			case <-future.Ok():
			case err := <-future.Err():
				return err
			case <-timeOut: // This will deadlock without this case here
				return errors.New("async publish complete, but futures did not return")
			}
		}
	case <-ctx.Done():
		return ctx.Err()
	case <-timeOut:
		return errors.New("did not receive async publish complete signal")
	}

It dead locks when reading the result of the futures if you remove the time out case.

This happens because there are ID collisions in the map that tracks acks. I updated the code that updates the map with a panic

func (js *jetStream) registerPAF(id string, paf *pubAckFuture) (int, int) {
	js.publisher.Lock()
	if js.publisher.acks == nil {
		js.publisher.acks = make(map[string]*pubAckFuture)
	}
	if _, ok := js.publisher.acks[id]; ok {
		panic(fmt.Sprintf("collision occurred on id: %s", id))
	}
	js.publisher.acks[id] = paf
	np := len(js.publisher.acks)
	maxpa := js.publisher.asyncPublisherOpts.maxpa
	js.publisher.Unlock()
	return np, maxpa
}

and I hit the panic occasionally.

I'm able to consistently reproduce this, although it takes a long time (up to an hour sometimes). I am using a batch size of 10k messages.

Expected behavior

No dead lock or some error condition to indicate a misconfiguration in the producer client.

Server and client version

server 2.10.25
go sdk v1.39.0

Host environment

No response

Steps to reproduce

No response

@Zach-Johnson Zach-Johnson added the defect Suspected defect such as a bug or regression label Feb 13, 2025
@piotrpio piotrpio self-assigned this Feb 13, 2025
@piotrpio piotrpio linked a pull request Mar 3, 2025 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants