We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Given a nats-server configured with 2 accounts: KV1 and KV2. Then a key-value store is created in KV1 and exported and imported in KV2.
KV2 can perform Get but Watch (and related operations like WatchAll, ListKeys) will block with no messages delivered.
Also, the backing stream can successfully be read with the OrderedConsumer.
KeyValue.Watch should deliver all existing entries, but blocks with no messages delivered.
nats-server v2.10.24 nats.go v1.38.0
Ubuntu v24.04.1 LTS Docker v27.5.0 Docker compose v2.32.4
I have created a repository https://github.com/bredtape/nats_kv_watch_fail to recreate the issue. It includes an extra test TestKVWatchWhatWorks to document that the Get and the OrderedConsumer works.
TestKVWatchWhatWorks
nats server.conf:
jetstream { store_dir: /data/jetstream } accounts: { KV1 { jetstream: enable users = [{user: user1, password: password1}] exports = [ {service: "$JS.API.>"} {stream: "KV_bucket1.>"} ] } KV2 { jetstream: enable users = [{user: user2, password: password2}] imports = [ {service: {account: KV1, subject: "$JS.API.>"}, to: "import.>"} {stream: {account: KV1, subject: "KV_bucket1.>"}} ] } }
Golang unit test:
package nats_kv_watch import ( "context" "testing" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" ) const natsURL = "nats://nats:4222" func TestKVWatchMinimalFails(t *testing.T) { ctx := context.Background() js1 := getKV1(t) bucket := "bucket1" kvs1, err := js1.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: bucket}) if err != nil { t.Fatal(err) } t.Logf("kv1, created kv: %+v", kvs1) rev, err := kvs1.Put(ctx, "key1", []byte("value1")) if err != nil { t.Fatal(err) } t.Logf("kv1, put key1: %d", rev) t.Log("kv2, get") js2 := getKV2(t) kvs2, err := js2.KeyValue(ctx, bucket) if err != nil { t.Fatal(err) } watcher2, err := kvs2.WatchAll(ctx) if err != nil { t.Fatal(err) } t.Log("kv2, watch all keys") var result2 []jetstream.KeyValueEntry for kve := range watcher2.Updates() { // timeout here if kve == nil { break } result2 = append(result2, kve) t.Logf("kv2, watch received: %+v", kve) } if len(result2) != 1 { t.Fatalf("kvs2 watch all, expected 1 entry, got %d", len(result2)) } } func getKV1(t *testing.T) jetstream.JetStream { t.Helper() conn, err := nats.Connect(natsURL, nats.UserInfo("user1", "password1"), nats.Name("kv1"), nats.RetryOnFailedConnect(true)) if err != nil { t.Fatal("failed to connect", err) } js, err := jetstream.New(conn) if err != nil { t.Fatal(err) } return js } func getKV2(t *testing.T) jetstream.JetStream { t.Helper() conn, err := nats.Connect(natsURL, nats.UserInfo("user2", "password2"), nats.Name("kv2"), nats.RetryOnFailedConnect(true)) if err != nil { t.Fatal(err) } trace := jetstream.WithClientTrace(&jetstream.ClientTrace{ RequestSent: func(subj string, payload []byte) { t.Logf("kv2: request sent, subject=%s: %s", subj, string(payload)) }, ResponseReceived: func(subj string, payload []byte, hdr nats.Header) { t.Logf("kv2: response received, subject=%s, headers=%v: %s", subj, hdr, string(payload)) }}) js, err := jetstream.NewWithAPIPrefix(conn, "import", trace) if err != nil { t.Fatal(err) } return js }
logs.txt
The logs indicates that a message is pending "num_pending:" 1:
"num_pending:" 1
test-1 | kv_test.go:149: kv2: response received, subject=import.CONSUMER.CREATE.KV_bucket1.Vc2dPSpp.$KV.bucket1.>, headers=map[]: {"type":"io.nats.jetstream.api.v1.consumer_create_respo nse","stream_name":"KV_bucket1","name":"Vc2dPSpp","created":"2025-01-22T10:57:44.741509965Z","config":{"name":"Vc2dPSpp","deliver_policy":"last_per_subject","ack_policy":"none","ack_wait":79 200000000000,"max_deliver":1,"filter_subject":"$KV.bucket1.\u003e","replay_policy":"instant","idle_heartbeat":5000000000,"flow_control":true,"deliver_subject":"_INBOX.e1irHlMk1oZzdvw1JiopRe" ,"inactive_threshold":5000000000,"num_replicas":1,"mem_storage":true},"delivered":{"consumer_seq":0,"stream_seq":0},"ack_floor":{"consumer_seq":0,"stream_seq":0},"num_ack_pending":0,"num_red elivered":0,"num_waiting":0,"num_pending":1,"ts":"2025-01-22T10:57:44.741664189Z"}
The text was updated successfully, but these errors were encountered:
Hello @bredtape, thanks for the reproduction, I'll be looking at this.
Sorry, something went wrong.
@piotrpio any luck replicating? Did I configure the exports/imports correctly?
piotrpio
No branches or pull requests
Observed behavior
Given a nats-server configured with 2 accounts: KV1 and KV2. Then a key-value store is created in KV1 and exported and imported in KV2.
KV2 can perform Get but Watch (and related operations like WatchAll, ListKeys) will block with no messages delivered.
Also, the backing stream can successfully be read with the OrderedConsumer.
Expected behavior
KeyValue.Watch should deliver all existing entries, but blocks with no messages delivered.
Server and client version
nats-server v2.10.24
nats.go v1.38.0
Host environment
Ubuntu v24.04.1 LTS
Docker v27.5.0
Docker compose v2.32.4
Steps to reproduce
I have created a repository https://github.com/bredtape/nats_kv_watch_fail to recreate the issue. It includes an extra test
TestKVWatchWhatWorks
to document that the Get and the OrderedConsumer works.nats server.conf:
Golang unit test:
logs.txt
The logs indicates that a message is pending
"num_pending:" 1
:The text was updated successfully, but these errors were encountered: