Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pickfirst: Implement Happy Eyeballs #7725

Merged
merged 26 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
db0dda7
Implement happy eyeballs
arjan-bal Oct 8, 2024
826bb03
Use timeAfterFunc
arjan-bal Oct 11, 2024
4e68e58
Address review comments
arjan-bal Oct 11, 2024
fe69816
Move timer func to internal, improve log statement and address review…
arjan-bal Oct 16, 2024
3022304
Remove env var
arjan-bal Oct 16, 2024
0a3ffd3
Change to e2e style test
arjan-bal Oct 16, 2024
6697267
Fix vet
arjan-bal Oct 16, 2024
67f7a1a
Fix vet
arjan-bal Oct 16, 2024
9712ec5
refactor test
arjan-bal Oct 16, 2024
6c8fb41
Avoid creating a context
arjan-bal Oct 16, 2024
04a912f
address review comments
arjan-bal Oct 17, 2024
0bb745f
Use BlockingDialer instead of implementing a hanging server
arjan-bal Oct 17, 2024
99e2e89
Fix test synchronization
arjan-bal Oct 17, 2024
592ba0d
Test refactorings
arjan-bal Oct 18, 2024
84d6ed4
Cancel timer when processing new resolver update
arjan-bal Oct 19, 2024
8f63d8e
Improve whitespaces and comments
arjan-bal Oct 23, 2024
09f27c6
Merge branch 'master' of github.com:grpc/grpc-go into grpc-go-happy-e…
arjan-bal Oct 23, 2024
6610516
Refactor fake timer
arjan-bal Oct 23, 2024
d6bc007
Don't use expired context
arjan-bal Oct 23, 2024
19a3165
Remove unnecessary timer in test
arjan-bal Oct 23, 2024
598fdd0
Address review comments
arjan-bal Oct 24, 2024
d3bde50
Merge remote-tracking branch 'source/master' into grpc-go-happy-eyeballs
arjan-bal Nov 6, 2024
8b4b28e
Remove stale comment
arjan-bal Nov 6, 2024
6c16943
Use rand/v2
arjan-bal Nov 7, 2024
11fe515
Address review comments
arjan-bal Nov 8, 2024
5c4ff49
Rename to connectionFailedInFirstPass
arjan-bal Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions balancer/pickfirst/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,18 @@
// Package internal contains code internal to the pickfirst package.
package internal

import "math/rand"
import (
rand "math/rand/v2"
"time"
)

// RandShuffle pseudo-randomizes the order of addresses.
var RandShuffle = rand.Shuffle
var (
// RandShuffle pseudo-randomizes the order of addresses.
RandShuffle = rand.Shuffle
// TimeAfterFunc allows mocking the timer for testing connection delay
// related functionality.
TimeAfterFunc = func(d time.Duration, f func()) func() {
timer := time.AfterFunc(d, f)
return func() { timer.Stop() }
}
)
181 changes: 127 additions & 54 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"fmt"
"net"
"sync"
"time"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst/internal"
Expand Down Expand Up @@ -59,8 +60,13 @@
Name = "pick_first_leaf"
)

// TODO: change to pick-first when this becomes the default pick_first policy.
const logPrefix = "[pick-first-leaf-lb %p] "
const (
// TODO: change to pick-first when this becomes the default pick_first policy.
logPrefix = "[pick-first-leaf-lb %p] "
// connectionDelayInterval is the time to wait for during the happy eyeballs
// pass before starting the next connection attempt.
connectionDelayInterval = 250 * time.Millisecond
)

type ipAddrFamily int

Expand All @@ -76,11 +82,12 @@

func (pickfirstBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
b := &pickfirstBalancer{
cc: cc,
addressList: addressList{},
subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,
mu: sync.Mutex{},
cc: cc,
addressList: addressList{},
subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,
mu: sync.Mutex{},
cancelConnectionTimer: func() {},
}
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
return b
Expand Down Expand Up @@ -115,8 +122,9 @@
subConn balancer.SubConn
addr resolver.Address

state connectivity.State
lastErr error
state connectivity.State
lastErr error
connectionFailedInFirstPass bool
}

func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
Expand Down Expand Up @@ -148,10 +156,11 @@
mu sync.Mutex
state connectivity.State
// scData for active subonns mapped by address.
subConns *resolver.AddressMap
addressList addressList
firstPass bool
numTF int
subConns *resolver.AddressMap
addressList addressList
firstPass bool
numTF int
cancelConnectionTimer func()
}

// ResolverError is called by the ClientConn when the name resolver produces
Expand Down Expand Up @@ -186,6 +195,7 @@
func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
b.mu.Lock()
defer b.mu.Unlock()
b.cancelConnectionTimer()
if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
// Cleanup state pertaining to the previous resolver state.
// Treat an empty address list like an error by calling b.ResolverError.
Expand Down Expand Up @@ -239,12 +249,8 @@
// Not de-duplicating would result in attempting to connect to the same
// SubConn multiple times in the same pass. We don't want this.
newAddrs = deDupAddresses(newAddrs)

newAddrs = interleaveAddresses(newAddrs)

// Since we have a new set of addresses, we are again at first pass.
b.firstPass = true

// If the previous ready SubConn exists in new address list,
// keep this connection and don't create new SubConns.
prevAddr := b.addressList.currentAddress()
Expand All @@ -269,11 +275,11 @@
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
b.requestConnectionLocked()
b.startFirstPassLocked()
} else if b.state == connectivity.TransientFailure {
// If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until
// we're READY. See A62.
b.requestConnectionLocked()
b.startFirstPassLocked()
}
return nil
}
Expand All @@ -288,6 +294,7 @@
b.mu.Lock()
defer b.mu.Unlock()
b.closeSubConnsLocked()
b.cancelConnectionTimer()
b.state = connectivity.Shutdown
}

Expand All @@ -297,12 +304,21 @@
func (b *pickfirstBalancer) ExitIdle() {
b.mu.Lock()
defer b.mu.Unlock()
if b.state == connectivity.Idle && b.addressList.currentAddress() == b.addressList.first() {
b.firstPass = true
b.requestConnectionLocked()
if b.state == connectivity.Idle {
b.startFirstPassLocked()
}
}

func (b *pickfirstBalancer) startFirstPassLocked() {
b.firstPass = true
b.numTF = 0
// Reset the connection attempt record for existing SubConns.
for _, sd := range b.subConns.Values() {
sd.(*scData).connectionFailedInFirstPass = false
}
b.requestConnectionLocked()
}

func (b *pickfirstBalancer) closeSubConnsLocked() {
for _, sd := range b.subConns.Values() {
sd.(*scData).subConn.Shutdown()
Expand Down Expand Up @@ -413,6 +429,7 @@
// shutdownRemainingLocked shuts down remaining subConns. Called when a subConn
// becomes ready, which means that all other subConn must be shutdown.
func (b *pickfirstBalancer) shutdownRemainingLocked(selected *scData) {
b.cancelConnectionTimer()
for _, v := range b.subConns.Values() {
sd := v.(*scData)
if sd.subConn != selected.subConn {
Expand Down Expand Up @@ -456,30 +473,69 @@
switch scd.state {
case connectivity.Idle:
scd.subConn.Connect()
b.scheduleNextConnectionLocked()
return
case connectivity.TransientFailure:
// Try the next address.
// The SubConn is being re-used and failed during a previous pass
// over the addressList. It has not completed backoff yet.
// Mark it as having failed and try the next address.
scd.connectionFailedInFirstPass = true
lastErr = scd.lastErr
continue
case connectivity.Ready:
// Should never happen.
b.logger.Errorf("Requesting a connection even though we have a READY SubConn")
case connectivity.Shutdown:
// Should never happen.
b.logger.Errorf("SubConn with state SHUTDOWN present in SubConns map")
case connectivity.Connecting:
// Wait for the SubConn to report success or failure.
// Wait for the connection attempt to complete or the timer to fire
// before attempting the next address.
b.scheduleNextConnectionLocked()
return
default:
b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.state)
return

Check warning on line 492 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L490-L492

Added lines #L490 - L492 were not covered by tests

}
return
}

// All the remaining addresses in the list are in TRANSIENT_FAILURE, end the
// first pass.
b.endFirstPassLocked(lastErr)
// first pass if possible.
b.endFirstPassIfPossibleLocked(lastErr)
}

func (b *pickfirstBalancer) scheduleNextConnectionLocked() {
b.cancelConnectionTimer()
if !b.addressList.hasNext() {
return
}
curAddr := b.addressList.currentAddress()
cancelled := false // Access to this is protected by the balancer's mutex.
closeFn := internal.TimeAfterFunc(connectionDelayInterval, func() {
b.mu.Lock()
defer b.mu.Unlock()
// If the scheduled task is cancelled while acquiring the mutex, return.
if cancelled {
return
}

Check warning on line 515 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L514-L515

Added lines #L514 - L515 were not covered by tests
if b.logger.V(2) {
b.logger.Infof("Happy Eyeballs timer expired while waiting for connection to %q.", curAddr.Addr)
}

Check warning on line 518 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L517-L518

Added lines #L517 - L518 were not covered by tests
if b.addressList.increment() {
b.requestConnectionLocked()
}
})
// Access to the cancellation callback held by the balancer is guarded by
// the balancer's mutex, so it's safe to set the boolean from the callback.
b.cancelConnectionTimer = sync.OnceFunc(func() {
cancelled = true
closeFn()
})
}

func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
oldState := sd.state
// Record a connection attempt when exiting CONNECTING.
if newState.ConnectivityState == connectivity.TransientFailure {
sd.connectionFailedInFirstPass = true
}
sd.state = newState.ConnectivityState
// Previously relevant SubConns can still callback with state updates.
// To prevent pickers from returning these obsolete SubConns, this logic
Expand Down Expand Up @@ -545,17 +601,20 @@
sd.lastErr = newState.ConnectionError
// Since we're re-using common SubConns while handling resolver
// updates, we could receive an out of turn TRANSIENT_FAILURE from
// a pass over the previous address list. We ignore such updates.

if curAddr := b.addressList.currentAddress(); !equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) {
return
}
if b.addressList.increment() {
b.requestConnectionLocked()
return
// a pass over the previous address list. Happy Eyeballs will also
// cause out of order updates to arrive.

if curAddr := b.addressList.currentAddress(); equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) {
b.cancelConnectionTimer()
if b.addressList.increment() {
b.requestConnectionLocked()
return
}
}
// End of the first pass.
b.endFirstPassLocked(newState.ConnectionError)

// End the first pass if we've seen a TRANSIENT_FAILURE from all
// SubConns once.
b.endFirstPassIfPossibleLocked(newState.ConnectionError)
}
return
}
Expand All @@ -580,9 +639,22 @@
}
}

func (b *pickfirstBalancer) endFirstPassLocked(lastErr error) {
// endFirstPassIfPossibleLocked ends the first happy-eyeballs pass if all the
// addresses are tried and their SubConns have reported a failure.
func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) {
// An optimization to avoid iterating over the entire SubConn map.
if b.addressList.isValid() {
return
}
// Connect() has been called on all the SubConns. The first pass can be
// ended if all the SubConns have reported a failure.
for _, v := range b.subConns.Values() {
sd := v.(*scData)
if !sd.connectionFailedInFirstPass {
return
}
}
b.firstPass = false
b.numTF = 0
b.state = connectivity.TransientFailure

b.cc.UpdateState(balancer.State{
Expand Down Expand Up @@ -654,15 +726,6 @@
return al.addresses[al.idx]
}

// first returns the first address in the list. If the list is empty, it returns
// an empty address instead.
func (al *addressList) first() resolver.Address {
if len(al.addresses) == 0 {
return resolver.Address{}
}
return al.addresses[0]
}

func (al *addressList) reset() {
al.idx = 0
}
Expand All @@ -685,6 +748,16 @@
return false
}

// hasNext returns whether incrementing the addressList will result in moving
// past the end of the list. If the list has already moved past the end, it
// returns false.
func (al *addressList) hasNext() bool {
if !al.isValid() {
return false
}

Check warning on line 757 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L756-L757

Added lines #L756 - L757 were not covered by tests
return al.idx+1 < len(al.addresses)
}

// equalAddressIgnoringBalAttributes returns true is a and b are considered
// equal. This is different from the Equal method on the resolver.Address type
// which considers all fields to determine equality. Here, we only consider
Expand Down
Loading