Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: update watcher API as per gRFC A88 #7977

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 10 additions & 25 deletions xds/csds/csds_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,49 +70,37 @@ func Test(t *testing.T) {

type nopListenerWatcher struct{}

func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
func (nopListenerWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopRouteConfigWatcher struct{}

func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
func (nopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopClusterWatcher struct{}

func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopClusterWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (nopClusterWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopEndpointsWatcher struct{}

func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (nopEndpointsWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopEndpointsWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}

Expand All @@ -137,13 +125,10 @@ func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWa
}
}

func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (w *blockingListenerWatcher) OnResourceChanged(_ *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (w *blockingListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}

Expand Down
17 changes: 9 additions & 8 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@
if b.lbCfg != nil {
root = b.lbCfg.ClusterName
}
b.onClusterError(root, err)
b.onClusterAmbientError(root, err)
})
}

Expand Down Expand Up @@ -428,20 +428,20 @@
// If the security config is invalid, for example, if the provider
// instance is not found in the bootstrap config, we need to put the
// channel in transient failure.
b.onClusterError(name, fmt.Errorf("received Cluster resource contains invalid security config: %v", err))
b.onClusterAmbientError(name, fmt.Errorf("received Cluster resource contains invalid security config: %v", err))
return
}
}

clustersSeen := make(map[string]bool)
dms, ok, err := b.generateDMsForCluster(b.lbCfg.ClusterName, 0, nil, clustersSeen)
if err != nil {
b.onClusterError(b.lbCfg.ClusterName, fmt.Errorf("failed to generate discovery mechanisms: %v", err))
b.onClusterAmbientError(b.lbCfg.ClusterName, fmt.Errorf("failed to generate discovery mechanisms: %v", err))
return
}
if ok {
if len(dms) == 0 {
b.onClusterError(b.lbCfg.ClusterName, fmt.Errorf("aggregate cluster graph has no leaf clusters"))
b.onClusterAmbientError(b.lbCfg.ClusterName, fmt.Errorf("aggregate cluster graph has no leaf clusters"))
return
}
// Child policy is built the first time we resolve the cluster graph.
Expand Down Expand Up @@ -501,7 +501,7 @@
// TRANSIENT_FAILURE.
//
// Only executed in the context of a serializer callback.
func (b *cdsBalancer) onClusterError(name string, err error) {
func (b *cdsBalancer) onClusterAmbientError(name string, err error) {
b.logger.Warningf("Cluster resource %q received error update: %v", name, err)

if b.childLB != nil {
Expand All @@ -525,15 +525,16 @@
// TRANSIENT_FAILURE.
//
// Only executed in the context of a serializer callback.
func (b *cdsBalancer) onClusterResourceNotFound(name string) {
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Cluster not found in received response", name)
func (b *cdsBalancer) onClusterResourceChangedError(name string, err error) {
b.logger.Warningf("Cluster resource %q received error update: %v", name, err)

if b.childLB != nil {
b.childLB.ResolverError(err)
} else {
// If child balancer was never created, fail the RPCs with errors.
b.ccw.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(err),
Picker: base.NewErrPicker(fmt.Errorf("%q: %v", name, err)),

Check warning on line 537 in xds/internal/balancer/cdsbalancer/cdsbalancer.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/cdsbalancer/cdsbalancer.go#L537

Added line #L537 was not covered by tests
})
}
}
Expand Down
19 changes: 10 additions & 9 deletions xds/internal/balancer/cdsbalancer/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,22 @@ type clusterWatcher struct {
parent *cdsBalancer
}

func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() }
func (cw *clusterWatcher) OnResourceChanged(u *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
if u.Err != nil {
handleError := func(context.Context) { cw.parent.onClusterResourceChangedError(cw.name, u.Err); onDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone)
return
}
update := u.Data.(*xdsresource.ClusterResourceData)
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, update.Resource); onDone() }
cw.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone() }
func (cw *clusterWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { cw.parent.onClusterAmbientError(cw.name, err); onDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone)
}

func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone() }
cw.parent.serializer.ScheduleOr(handleNotFound, onDone)
}

// watcherState groups the state associated with a clusterWatcher.
type watcherState struct {
watcher *clusterWatcher // The underlying watcher.
Expand Down
48 changes: 22 additions & 26 deletions xds/internal/balancer/clusterresolver/resource_resolver_eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,39 @@
}

// OnUpdate is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone()
return
}

if update.Err != nil {
if er.logger.V(2) {
er.logger.Infof("EDS discovery mechanism for resource %q reported on resource changed error: %v", er.nameToWatch, update.Err)
}

Check warning on line 88 in xds/internal/balancer/clusterresolver/resource_resolver_eds.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/clusterresolver/resource_resolver_eds.go#L87-L88

Added lines #L87 - L88 were not covered by tests
// Report an empty update that would result in no priority child being
// created for this discovery mechanism. This would result in the priority
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
// localities) if this was the only discovery mechanism, or would result in
// the priority LB policy using a lower priority discovery mechanism when
// that becomes available.
er.mu.Lock()
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()

er.topLevelResolver.onUpdate(onDone)
return
}

er.mu.Lock()
er.update = &update.Resource
u := update.Data.(*xdsresource.EndpointsResourceData)
er.update = &u.Resource
er.mu.Unlock()

er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFunc) {
func (er *edsDiscoveryMechanism) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone()
return
Expand Down Expand Up @@ -119,26 +138,3 @@

er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone()
return
}

if er.logger.V(2) {
er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
}

// Report an empty update that would result in no priority child being
// created for this discovery mechanism. This would result in the priority
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
// localities) if this was the only discovery mechanism, or would result in
// the priority LB policy using a lower priority discovery mechanism when
// that becomes available.
er.mu.Lock()
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()

er.topLevelResolver.onUpdate(onDone)
}
38 changes: 20 additions & 18 deletions xds/internal/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,22 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch
return lw
}

func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() }
func (l *listenerWatcher) OnResourceChanged(update *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
if update.Err != nil {
handleError := func(context.Context) { l.parent.onListenerResourceChangedError(update.Err); onDone() }
l.parent.serializer.ScheduleOr(handleError, onDone)
return
}
u := update.Data.(*xdsresource.ListenerResourceData)
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(u.Resource); onDone() }
l.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (l *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone() }
func (l *listenerWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { l.parent.onListenerResourceAmbientError(err); onDone() }
l.parent.serializer.ScheduleOr(handleError, onDone)
}

func (l *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone() }
l.parent.serializer.ScheduleOr(handleNotFound, onDone)
}

func (l *listenerWatcher) stop() {
l.cancel()
l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName)
Expand All @@ -68,24 +69,25 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi
return rw
}

func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
func (r *routeConfigWatcher) OnResourceChanged(u *xdsresource.ResourceDataOrError, onDone xdsresource.OnDoneFunc) {
if u.Err != nil {
handleError := func(context.Context) { r.parent.onRouteConfigResourceChangedError(r.resourceName, u.Err); onDone() }
r.parent.serializer.ScheduleOr(handleError, onDone)
return
}
handleUpdate := func(context.Context) {
r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource)
update := u.Data.(*xdsresource.RouteConfigResourceData)
r.parent.onRouteConfigResourceUpdate(r.resourceName, update.Resource)
onDone()
}
r.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (r *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() }
func (r *routeConfigWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { r.parent.onRouteConfigResourceAmbientError(r.resourceName, err); onDone() }
r.parent.serializer.ScheduleOr(handleError, onDone)
}

func (r *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone() }
r.parent.serializer.ScheduleOr(handleNotFound, onDone)
}

func (r *routeConfigWatcher) stop() {
r.cancel()
r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName)
Expand Down
12 changes: 6 additions & 6 deletions xds/internal/resolver/xds_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,17 +518,17 @@
r.routeConfigWatcher = newRouteConfigWatcher(r.rdsResourceName, r)
}

func (r *xdsResolver) onListenerResourceError(err error) {
func (r *xdsResolver) onListenerResourceAmbientError(err error) {
if r.logger.V(2) {
r.logger.Infof("Received error for Listener resource %q: %v", r.ldsResourceName, err)
}
r.onError(err)
}

// Only executed in the context of a serializer callback.
func (r *xdsResolver) onListenerResourceNotFound() {
func (r *xdsResolver) onListenerResourceChangedError(err error) {
if r.logger.V(2) {
r.logger.Infof("Received resource-not-found-error for Listener resource %q", r.ldsResourceName)
r.logger.Infof("Received on-resource-changed error for Listener resource %q: %v", r.ldsResourceName, err)

Check warning on line 531 in xds/internal/resolver/xds_resolver.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/resolver/xds_resolver.go#L531

Added line #L531 was not covered by tests
}

r.listenerUpdateRecvd = false
Expand Down Expand Up @@ -559,17 +559,17 @@
}

// Only executed in the context of a serializer callback.
func (r *xdsResolver) onRouteConfigResourceError(name string, err error) {
func (r *xdsResolver) onRouteConfigResourceAmbientError(name string, err error) {
if r.logger.V(2) {
r.logger.Infof("Received error for RouteConfiguration resource %q: %v", name, err)
}
r.onError(err)
}

// Only executed in the context of a serializer callback.
func (r *xdsResolver) onRouteConfigResourceNotFound(name string) {
func (r *xdsResolver) onRouteConfigResourceChangedError(name string, err error) {
if r.logger.V(2) {
r.logger.Infof("Received resource-not-found-error for RouteConfiguration resource %q", name)
r.logger.Infof("Received on-resource-changed error for RouteConfiguration resource %q: %v", name, err)

Check warning on line 572 in xds/internal/resolver/xds_resolver.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/resolver/xds_resolver.go#L572

Added line #L572 was not covered by tests
}

if r.rdsResourceName != name {
Expand Down
Loading
Loading