Skip to content

Commit

Permalink
Interleave addresses for happy eyeballs
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Oct 15, 2024
1 parent ad81c20 commit f7dcbc9
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 3 deletions.
81 changes: 78 additions & 3 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net"
"sync"

"google.golang.org/grpc/balancer"
Expand Down Expand Up @@ -61,6 +62,14 @@ var (
// TODO: change to pick-first when this becomes the default pick_first policy.
const logPrefix = "[pick-first-leaf-lb %p] "

type ipAddrType int

const (
ipTypeUnknown ipAddrType = iota
ipv4
ipv6
)

type pickfirstBuilder struct{}

func (pickfirstBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
Expand Down Expand Up @@ -206,9 +215,6 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
// "Flatten the list by concatenating the ordered list of addresses for
// each of the endpoints, in order." - A61
for _, endpoint := range endpoints {
// "In the flattened list, interleave addresses from the two address
// families, as per RFC-8305 section 4." - A61
// TODO: support the above language.
newAddrs = append(newAddrs, endpoint.Addresses...)
}
} else {
Expand All @@ -232,6 +238,10 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
// SubConn multiple times in the same pass. We don't want this.
newAddrs = deDupAddresses(newAddrs)

// Interleave addresses of both families (IPv4 and IPv6) as per RFC-8305
// section 4.
newAddrs = interleaveAddresses(newAddrs)

// Since we have a new set of addresses, we are again at first pass.
b.firstPass = true

Expand Down Expand Up @@ -314,6 +324,71 @@ func deDupAddresses(addrs []resolver.Address) []resolver.Address {
return retAddrs
}

func interleaveAddresses(addrs []resolver.Address) []resolver.Address {
// Group the addresses by their type and determine the order in which to
// interleave the address families. The order of interleaving the families
// is the order in which the first address of a particular type appears in
// the address list.
familyAddrsMap := map[ipAddrType][]resolver.Address{}
interleavingOrder := []ipAddrType{}
for _, addr := range addrs {
typ := addressType(addr.Addr)
if _, found := familyAddrsMap[typ]; !found {
interleavingOrder = append(interleavingOrder, typ)
}
familyAddrsMap[typ] = append(familyAddrsMap[typ], addr)
}

interleavedAddrs := make([]resolver.Address, 0, len(addrs))
curTypeIndex := 0
for i := 0; i < len(addrs); i++ {
// Some IP types may have fewer addresses than others, so we look for
// the next type that has a remaining member to add to the interleaved
// list.
for {
curType := interleavingOrder[curTypeIndex]
remainingMembers := familyAddrsMap[curType]
if len(remainingMembers) > 0 {
break
}
curTypeIndex = (curTypeIndex + 1) % len(interleavingOrder)
}
curType := interleavingOrder[curTypeIndex]
remainingMembers := familyAddrsMap[curType]
interleavedAddrs = append(interleavedAddrs, remainingMembers[0])
familyAddrsMap[curType] = remainingMembers[1:]
curTypeIndex = (curTypeIndex + 1) % len(interleavingOrder)
}

return interleavedAddrs
}

func addressType(address string) ipAddrType {
// Try parsing addresses without a port specified.
ip := net.ParseIP(address)
if ip == nil {
// Try to parse the IP after removing the port.
host, _, err := net.SplitHostPort(address)
if err != nil {
return ipTypeUnknown
}
ip = net.ParseIP(host)
}

// If using the passthrough resolver, the hostnames would be unresolved
// and therefore not valid IP addresses.
if ip == nil {
return ipTypeUnknown
}

if ip.To4() != nil {
return ipv4
} else if ip.To16() != nil {
return ipv6
}
return ipTypeUnknown

Check warning on line 389 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L389

Added line #L389 was not covered by tests
}

// reconcileSubConnsLocked updates the active subchannels based on a new address
// list from the resolver. It does this by:
// - closing subchannels: any existing subchannels associated with addresses
Expand Down
109 changes: 109 additions & 0 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
Expand Down Expand Up @@ -257,3 +258,111 @@ func (s) TestPickFirstLeaf_TFPickerUpdate(t *testing.T) {
t.Fatalf("cc.WaitForPickerWithErr(%v) returned error: %v", newTfErr, err)
}
}

func (s) TestPickFirstLeaf_InterleavingIPV4Preffered(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := pickfirstBuilder{}.Build(cc, balancer.BuildOptions{})
defer bal.Close()
ccState := balancer.ClientConnState{
ResolverState: resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}, // no port
{Addresses: []resolver.Address{{Addr: "2.2.2.2:2"}}},
{Addresses: []resolver.Address{{Addr: "3.3.3.3:3"}}},
{Addresses: []resolver.Address{{Addr: "4.4.4.4:4"}}},
{Addresses: []resolver.Address{{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"}}}, // ipv6 with port
{Addresses: []resolver.Address{{Addr: "0002:0002:0002:0002:0002:0002:0002:0002"}}},
{Addresses: []resolver.Address{{Addr: "0003:0003:0003:0003:0003:0003:0003:0003"}}},
{Addresses: []resolver.Address{{Addr: "grpc.io:80"}}}, // not an IP.
},
},
}
if err := bal.UpdateClientConnState(ccState); err != nil {
t.Fatalf("UpdateClientConnState(%v) returned error: %v", ccState, err)
}

wantAddrs := []resolver.Address{
{Addr: "1.1.1.1"},
{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"},
{Addr: "grpc.io:80"},
{Addr: "2.2.2.2:2"},
{Addr: "0002:0002:0002:0002:0002:0002:0002:0002"},
{Addr: "3.3.3.3:3"},
{Addr: "0003:0003:0003:0003:0003:0003:0003:0003"},
{Addr: "4.4.4.4:4"},
}

gotAddrs, err := subConnAddresses(ctx, cc, 8)
if err != nil {
t.Fatalf("%v", err)
}
if diff := cmp.Diff(wantAddrs, gotAddrs); diff != "" {
t.Errorf("subconn creation order mismatch (-want +got):\n%s", diff)
}
}

func (s) TestPickFirstLeaf_InterleavingIPv6Preffered(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := pickfirstBuilder{}.Build(cc, balancer.BuildOptions{})
defer bal.Close()
ccState := balancer.ClientConnState{
ResolverState: resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"}}}, // ipv6 with port
{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}, // no port
{Addresses: []resolver.Address{{Addr: "2.2.2.2:2"}}},
{Addresses: []resolver.Address{{Addr: "3.3.3.3:3"}}},
{Addresses: []resolver.Address{{Addr: "4.4.4.4:4"}}},
{Addresses: []resolver.Address{{Addr: "0002:0002:0002:0002:0002:0002:0002:0002"}}},
{Addresses: []resolver.Address{{Addr: "0003:0003:0003:0003:0003:0003:0003:0003"}}},
{Addresses: []resolver.Address{{Addr: "grpc.io:80"}}}, // not an IP.
},
},
}
if err := bal.UpdateClientConnState(ccState); err != nil {
t.Fatalf("UpdateClientConnState(%v) returned error: %v", ccState, err)
}

wantAddrs := []resolver.Address{
{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"},
{Addr: "1.1.1.1"},
{Addr: "grpc.io:80"},
{Addr: "0002:0002:0002:0002:0002:0002:0002:0002"},
{Addr: "2.2.2.2:2"},
{Addr: "0003:0003:0003:0003:0003:0003:0003:0003"},
{Addr: "3.3.3.3:3"},
{Addr: "4.4.4.4:4"},
}

gotAddrs, err := subConnAddresses(ctx, cc, 8)
if err != nil {
t.Fatalf("%v", err)
}
if diff := cmp.Diff(wantAddrs, gotAddrs); diff != "" {
t.Errorf("subconn creation order mismatch (-want +got):\n%s", diff)
}
}
func subConnAddresses(ctx context.Context, cc *testutils.BalancerClientConn, subConnCount int) ([]resolver.Address, error) {
addresses := []resolver.Address{}
for i := 0; i < subConnCount; i++ {
select {
case <-ctx.Done():
return nil, fmt.Errorf("Context timed out waiting for SubConn")
case sc := <-cc.NewSubConnCh:
if len(sc.Addresses) != 1 {
return nil, fmt.Errorf("len(SubConn.Addresses) = %d, want 1", len(sc.Addresses))
}
addresses = append(addresses, sc.Addresses[0])
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc.UpdateState(balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: fmt.Errorf("test error"),
})
}
}
return addresses, nil
}
2 changes: 2 additions & 0 deletions internal/testutils/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type TestSubConn struct {
ConnectCh chan struct{}
stateListener func(balancer.SubConnState)
connectCalled *grpcsync.Event
Addresses []resolver.Address
}

// NewTestSubConn returns a newly initialized SubConn. Typically, subconns
Expand Down Expand Up @@ -131,6 +132,7 @@ func (tcc *BalancerClientConn) NewSubConn(a []resolver.Address, o balancer.NewSu
ConnectCh: make(chan struct{}, 1),
stateListener: o.StateListener,
connectCalled: grpcsync.NewEvent(),
Addresses: a,
}
tcc.subConnIdx++
tcc.logger.Logf("testClientConn: NewSubConn(%v, %+v) => %s", a, o, sc)
Expand Down

0 comments on commit f7dcbc9

Please sign in to comment.