Skip to content

Commit

Permalink
[ADDED] FetchChan method to utilize non-blocking pull subscription re…
Browse files Browse the repository at this point in the history
…quests
  • Loading branch information
piotrpio committed Feb 14, 2023
1 parent 2805753 commit 8c67be1
Show file tree
Hide file tree
Showing 2 changed files with 452 additions and 7 deletions.
215 changes: 208 additions & 7 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -2676,12 +2676,6 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
// Use the deadline of the context to base the expire times.
deadline, _ := ctx.Deadline()
ttl = time.Until(deadline)
checkCtxErr := func(err error) error {
if o.ctx == nil && err == context.DeadlineExceeded {
return ErrTimeout
}
return err
}

var (
msgs = make([]*Msg, 0, batch)
Expand Down Expand Up @@ -2766,11 +2760,218 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
}
// If there is at least a message added to msgs, then need to return OK and no error
if err != nil && len(msgs) == 0 {
return nil, checkCtxErr(err)
return nil, o.checkCtxErr(err)
}
return msgs, nil
}

type FetchResult interface {
Messages() <-chan *Msg
Error() error
}

type fetchResult struct {
msgs chan *Msg
err error
}

func (fr *fetchResult) Messages() <-chan *Msg {
return fr.msgs
}

func (fr *fetchResult) Error() error {
return fr.err
}

// FetchChan pulls a batch of messages from a stream for a pull consumer.
// Unlike [Subscription.Fetch], it is non blocking and returns [FetchResult],
// allowing to retrieve incoming messages from a channel.
// The returned channel is always close - it is safe to iterate over it using range.
//
// To avoid using default JetStream timeout as fetch expiry time, use [nats.MaxWait]
// or [nats.Context] (with deadline set).
//
// This method will not return error in case of pull request expiry (even if there are no messages).
// Any other error encountered when receiving messages will cause FetchChan to stop receiving new messages.
func (sub *Subscription) FetchChan(batch int, opts ...PullOpt) (FetchResult, error) {
if sub == nil {
return nil, ErrBadSubscription
}
if batch < 1 {
return nil, ErrInvalidArg
}

var o pullOpts
for _, opt := range opts {
if err := opt.configurePull(&o); err != nil {
return nil, err
}
}
if o.ctx != nil && o.ttl != 0 {
return nil, ErrContextAndTimeout
}
sub.mu.Lock()
jsi := sub.jsi
// Reject if this is not a pull subscription. Note that sub.typ is SyncSubscription,
// so check for jsi.pull boolean instead.
if jsi == nil || !jsi.pull {
sub.mu.Unlock()
return nil, ErrTypeSubscription
}

nc := sub.conn
nms := sub.jsi.nms
rply := sub.jsi.deliver
js := sub.jsi.js
pmc := len(sub.mch) > 0

// All fetch requests have an expiration, in case of no explicit expiration
// then the default timeout of the JetStream context is used.
ttl := o.ttl
if ttl == 0 {
ttl = js.opts.wait
}
sub.mu.Unlock()

// Use the given context or setup a default one for the span
// of the pull batch request.
var (
ctx = o.ctx
cancel context.CancelFunc
cancelContext = true
)
if ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), ttl)
} else if _, hasDeadline := ctx.Deadline(); !hasDeadline {
// Prevent from passing the background context which will just block
// and cannot be canceled either.
if octx, ok := ctx.(ContextOpt); ok && octx.Context == context.Background() {
return nil, ErrNoDeadlineContext
}

// If the context did not have a deadline, then create a new child context
// that will use the default timeout from the JS context.
ctx, cancel = context.WithTimeout(ctx, ttl)
}
defer func() {
// only cancel the context here if we are sure the fetching goroutine has not been started yet
if cancel != nil && cancelContext {
cancel()
}
}()

// Check if context not done already before making the request.
select {
case <-ctx.Done():
if o.ctx != nil { // Timeout or Cancel triggered by context object option
return nil, ctx.Err()
} else { // Timeout triggered by timeout option
return nil, ErrTimeout
}
default:
}

// Use the deadline of the context to base the expire times.
deadline, _ := ctx.Deadline()
ttl = time.Until(deadline)

result := &fetchResult{
msgs: make(chan *Msg, batch),
}
var msg *Msg
for pmc && len(result.msgs) < batch {
// Check next msg with booleans that say that this is an internal call
// for a pull subscribe (so don't reject it) and don't wait if there
// are no messages.
msg, err := sub.nextMsgWithContext(ctx, true, false)
if err != nil {
if err == errNoMessages {
err = nil
}
result.err = err
break
}
// Check msg but just to determine if this is a user message
// or status message, however, we don't care about values of status
// messages at this point in the Fetch() call, so checkMsg can't
// return an error.
if usrMsg, _ := checkMsg(msg, false, false); usrMsg {
result.msgs <- msg
}
}
if len(result.msgs) == batch || result.err != nil {
close(result.msgs)
return result, nil
}

deadline, _ = ctx.Deadline()
ttl = time.Until(deadline)

// Make our request expiration a bit shorter than the current timeout.
expires := ttl
if ttl >= 20*time.Millisecond {
expires = ttl - 10*time.Millisecond
}

requestBatch := batch - len(result.msgs)
req := nextRequest{
Expires: expires,
Batch: requestBatch,
MaxBytes: o.maxBytes,
}
reqJSON, err := json.Marshal(req)
if err != nil {
close(result.msgs)
result.err = err
return result, nil
}
if err := nc.PublishRequest(nms, rply, reqJSON); err != nil {
close(result.msgs)
result.err = err
return result, nil
}
cancelContext = false
go func() {
if cancel != nil {
defer cancel()
}
var requestMsgs int
for requestMsgs < requestBatch {
// Ask for next message and wait if there are no messages
msg, err = sub.nextMsgWithContext(ctx, true, true)
if err != nil {
break
}
var usrMsg bool

usrMsg, err = checkMsg(msg, true, false)
if err != nil {
if err == ErrTimeout {
err = nil
}
break
}
if usrMsg {
result.msgs <- msg
requestMsgs++
}
}
if err != nil {
result.err = o.checkCtxErr(err)
}
close(result.msgs)
}()
return result, nil
}

// checkCtxErr is used to determine whether ErrTimeout should be returned in case of context timeout
func (o *pullOpts) checkCtxErr(err error) error {
if o.ctx == nil && err == context.DeadlineExceeded {
return ErrTimeout
}
return err
}

func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), js.opts.wait)
defer cancel()
Expand Down
Loading

0 comments on commit 8c67be1

Please sign in to comment.