Skip to content

Commit

Permalink
dns: stop polling for updates; use UpdateState API
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Nov 6, 2019
1 parent 76e6ad3 commit c7f58d9
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 203 deletions.
121 changes: 29 additions & 92 deletions internal/resolver/dns/dns_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ import (
"sync"
"time"

"google.golang.org/grpc/backoff"
"google.golang.org/grpc/grpclog"
internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver"
)
Expand All @@ -45,7 +43,6 @@ func init() {

const (
defaultPort = "443"
defaultFreq = time.Minute * 30
defaultDNSSvrPort = "53"
golang = "GO"
// txtPrefix is the prefix string to be prepended to the host name for txt record lookup.
Expand Down Expand Up @@ -95,13 +92,10 @@ var customAuthorityResolver = func(authority string) (netResolver, error) {

// NewBuilder creates a dnsBuilder which is used to factory DNS resolvers.
func NewBuilder() resolver.Builder {
return &dnsBuilder{minFreq: defaultFreq}
return &dnsBuilder{}
}

type dnsBuilder struct {
// minimum frequency of polling the DNS server.
minFreq time.Duration
}
type dnsBuilder struct{}

// Build creates and starts a DNS resolver that watches the name resolution of the target.
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
Expand All @@ -111,33 +105,20 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
}

// IP address.
if net.ParseIP(host) != nil {
host, _ = formatIP(host)
addr := []resolver.Address{{Addr: host + ":" + port}}
i := &ipResolver{
cc: cc,
ip: addr,
rn: make(chan struct{}, 1),
q: make(chan struct{}),
}
cc.NewAddress(addr)
go i.watcher()
return i, nil
if ipAddr, ok := formatIP(host); ok {
addr := []resolver.Address{{Addr: ipAddr + ":" + port}}
cc.UpdateState(resolver.State{Addresses: addr})
return deadResolver{}, nil
}

// DNS address (non-IP).
ctx, cancel := context.WithCancel(context.Background())
bc := backoff.DefaultConfig
bc.MaxDelay = b.minFreq
d := &dnsResolver{
freq: b.minFreq,
backoff: internalbackoff.Exponential{Config: bc},
host: host,
port: port,
ctx: ctx,
cancel: cancel,
cc: cc,
t: time.NewTimer(0),
rn: make(chan struct{}, 1),
disableServiceConfig: opts.DisableServiceConfig,
}
Expand All @@ -153,6 +134,7 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts

d.wg.Add(1)
go d.watcher()
d.ResolveNow(resolver.ResolveNowOption{})
return d, nil
}

Expand All @@ -167,53 +149,23 @@ type netResolver interface {
LookupTXT(ctx context.Context, name string) (txts []string, err error)
}

// ipResolver watches for the name resolution update for an IP address.
type ipResolver struct {
cc resolver.ClientConn
ip []resolver.Address
// rn channel is used by ResolveNow() to force an immediate resolution of the target.
rn chan struct{}
q chan struct{}
}

// ResolveNow resend the address it stores, no resolution is needed.
func (i *ipResolver) ResolveNow(opt resolver.ResolveNowOption) {
select {
case i.rn <- struct{}{}:
default:
}
}
// deadResolver is a resolver that does nothing.
type deadResolver struct{}

// Close closes the ipResolver.
func (i *ipResolver) Close() {
close(i.q)
}
func (deadResolver) ResolveNow(_ resolver.ResolveNowOption) {}

func (i *ipResolver) watcher() {
for {
select {
case <-i.rn:
i.cc.NewAddress(i.ip)
case <-i.q:
return
}
}
}
func (deadResolver) Close() {}

// dnsResolver watches for the name resolution update for a non-IP target.
type dnsResolver struct {
freq time.Duration
backoff internalbackoff.Exponential
retryCount int
host string
port string
resolver netResolver
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
host string
port string
resolver netResolver
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
// rn channel is used by ResolveNow() to force an immediate resolution of the target.
rn chan struct{}
t *time.Timer
// wg is used to enforce Close() to return after the watcher() goroutine has finished.
// Otherwise, data race will be possible. [Race Example] in dns_resolver_test we
// replace the real lookup functions with mocked ones to facilitate testing.
Expand All @@ -225,7 +177,7 @@ type dnsResolver struct {
}

// ResolveNow invoke an immediate resolution of the target that this dnsResolver watches.
func (d *dnsResolver) ResolveNow(opt resolver.ResolveNowOption) {
func (d *dnsResolver) ResolveNow(resolver.ResolveNowOption) {
select {
case d.rn <- struct{}{}:
default:
Expand All @@ -236,7 +188,6 @@ func (d *dnsResolver) ResolveNow(opt resolver.ResolveNowOption) {
func (d *dnsResolver) Close() {
d.cancel()
d.wg.Wait()
d.t.Stop()
}

func (d *dnsResolver) watcher() {
Expand All @@ -245,27 +196,11 @@ func (d *dnsResolver) watcher() {
select {
case <-d.ctx.Done():
return
case <-d.t.C:
case <-d.rn:
if !d.t.Stop() {
// Before resetting a timer, it should be stopped to prevent racing with
// reads on it's channel.
<-d.t.C
}
}

result, sc := d.lookup()
// Next lookup should happen within an interval defined by d.freq. It may be
// more often due to exponential retry on empty address list.
if len(result) == 0 {
d.retryCount++
d.t.Reset(d.backoff.Backoff(d.retryCount))
} else {
d.retryCount = 0
d.t.Reset(d.freq)
}
d.cc.NewServiceConfig(sc)
d.cc.NewAddress(result)
state := d.lookup()
d.cc.UpdateState(*state)

// Sleep to prevent excessive re-resolutions. Incoming resolution requests
// will be queued in d.rn.
Expand Down Expand Up @@ -343,15 +278,17 @@ func (d *dnsResolver) lookupHost() []resolver.Address {
return newAddrs
}

func (d *dnsResolver) lookup() ([]resolver.Address, string) {
newAddrs := d.lookupSRV()
func (d *dnsResolver) lookup() *resolver.State {
srv := d.lookupSRV()
state := &resolver.State{
Addresses: append(d.lookupHost(), srv...),
}
// Support fallback to non-balancer address.
newAddrs = append(newAddrs, d.lookupHost()...)
if d.disableServiceConfig {
return newAddrs, ""
if !d.disableServiceConfig {
sc := canaryingSC(d.lookupTXT())
state.ServiceConfig = d.cc.ParseServiceConfig(sc)
}
sc := d.lookupTXT()
return newAddrs, canaryingSC(sc)
return state
}

// formatIP returns ok = false if addr is not a valid textual representation of an IP address.
Expand Down
Loading

0 comments on commit c7f58d9

Please sign in to comment.