Skip to content

Commit

Permalink
xds/internal/xdsclient: Add least request support in xDS (#6517)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq authored Aug 16, 2023
1 parent e5d8eac commit aca07ce
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 12 deletions.
2 changes: 1 addition & 1 deletion balancer/leastrequest/leastrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"google.golang.org/grpc/serviceconfig"
)

// Global to stub out in tests.
// grpcranduint32 is a global to stub out in tests.
var grpcranduint32 = grpcrand.Uint32

// Name is the name of the least request balancer.
Expand Down
4 changes: 4 additions & 0 deletions internal/envconfig/envconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ var (
// PickFirstLBConfig is set if we should support configuration of the
// pick_first LB policy.
PickFirstLBConfig = boolFromEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG", true)
// LeastRequestLB is set if we should support the least_request_experimental
// LB policy, which can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_ENABLE_LEAST_REQUEST" to "true".
LeastRequestLB = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_LEAST_REQUEST", false)
// ALTSMaxConcurrentHandshakes is the maximum number of concurrent ALTS
// handshakes that can be performed.
ALTSMaxConcurrentHandshakes = uint64FromEnv("GRPC_ALTS_MAX_CONCURRENT_HANDSHAKES", 100, 1, 100)
Expand Down
30 changes: 29 additions & 1 deletion test/xds/xds_client_custom_lb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"google.golang.org/grpc"
_ "google.golang.org/grpc/balancer/leastrequest" // To register least_request
_ "google.golang.org/grpc/balancer/weightedroundrobin" // To register weighted_round_robin
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/envconfig"
Expand All @@ -41,6 +42,7 @@ import (
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3clientsideweightedroundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3"
v3leastrequestpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/least_request/v3"
v3roundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/round_robin/v3"
v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -96,6 +98,11 @@ func (s) TestWrrLocality(t *testing.T) {
defer func() {
envconfig.XDSCustomLBPolicy = oldCustomLBSupport
}()
oldLeastRequestLBSupport := envconfig.LeastRequestLB
envconfig.LeastRequestLB = true
defer func() {
envconfig.LeastRequestLB = oldLeastRequestLBSupport
}()

backend1 := stubserver.StartTestService(t, nil)
port1 := testutils.ParsePort(t, backend1.Address)
Expand Down Expand Up @@ -194,12 +201,33 @@ func (s) TestWrrLocality(t *testing.T) {
{addr: backend5.Address, count: 8},
},
},
{
name: "custom_lb_least_request",
wrrLocalityConfiguration: wrrLocality(&v3leastrequestpb.LeastRequest{
ChoiceCount: wrapperspb.UInt32(2),
}),
// The test performs a Unary RPC, and blocks until the RPC returns,
// and then makes the next Unary RPC. Thus, over iterations, no RPC
// counts are present. This causes least request's randomness of
// indexes to sample to converge onto a round robin distribution per
// locality. Thus, expect the same distribution as round robin
// above.
addressDistributionWant: []struct {
addr string
count int
}{
{addr: backend1.Address, count: 6},
{addr: backend2.Address, count: 6},
{addr: backend3.Address, count: 8},
{addr: backend4.Address, count: 8},
{addr: backend5.Address, count: 8},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
managementServer, nodeID, _, r, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()

routeConfigName := "route-" + serviceName
clusterName := "cluster-" + serviceName
endpointsName := "endpoints-" + serviceName
Expand Down
1 change: 1 addition & 0 deletions xds/internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package balancer

import (
_ "google.golang.org/grpc/balancer/leastrequest" // Register the least_request_experimental balancer
_ "google.golang.org/grpc/balancer/weightedtarget" // Register the weighted_target balancer
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the CDS balancer
_ "google.golang.org/grpc/xds/internal/balancer/clusterimpl" // Register the xds_cluster_impl balancer
Expand Down
31 changes: 29 additions & 2 deletions xds/internal/xdsclient/xdslbregistry/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/leastrequest"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/internal/envconfig"
Expand All @@ -41,6 +42,7 @@ import (
v1xdsudpatypepb "github.com/cncf/xds/go/udpa/type/v1"
v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
v3clientsideweightedroundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3"
v3leastrequestpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/least_request/v3"
v3pickfirstpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/pick_first/v3"
v3ringhashpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/ring_hash/v3"
v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3"
Expand All @@ -53,13 +55,15 @@ func init() {
xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.pick_first.v3.PickFirst", convertPickFirstProtoToServiceConfig)
xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.round_robin.v3.RoundRobin", convertRoundRobinProtoToServiceConfig)
xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality", convertWRRLocalityProtoToServiceConfig)
xdslbregistry.Register("type.googleapis.com/envoy.extensions.load_balancing_policies.least_request.v3.LeastRequest", convertLeastRequestProtoToServiceConfig)
xdslbregistry.Register("type.googleapis.com/udpa.type.v1.TypedStruct", convertV1TypedStructToServiceConfig)
xdslbregistry.Register("type.googleapis.com/xds.type.v3.TypedStruct", convertV3TypedStructToServiceConfig)
}

const (
defaultRingHashMinSize = 1024
defaultRingHashMaxSize = 8 * 1024 * 1024 // 8M
defaultRingHashMinSize = 1024
defaultRingHashMaxSize = 8 * 1024 * 1024 // 8M
defaultLeastRequestChoiceCount = 2
)

func convertRingHashProtoToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
Expand Down Expand Up @@ -177,6 +181,29 @@ func convertWeightedRoundRobinProtoToServiceConfig(rawProto []byte, _ int) (json
return makeBalancerConfigJSON(weightedroundrobin.Name, lbCfgJSON), nil
}

func convertLeastRequestProtoToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
if !envconfig.LeastRequestLB {
return nil, nil
}
lrProto := &v3leastrequestpb.LeastRequest{}
if err := proto.Unmarshal(rawProto, lrProto); err != nil {
return nil, fmt.Errorf("failed to unmarshal resource: %v", err)
}
// "The configuration for the Least Request LB policy is the
// least_request_lb_config field. The field is optional; if not present,
// defaults will be assumed for all of its values." - A48
choiceCount := uint32(defaultLeastRequestChoiceCount)
if cc := lrProto.GetChoiceCount(); cc != nil {
choiceCount = cc.GetValue()
}
lrCfg := &leastrequest.LBConfig{ChoiceCount: choiceCount}
js, err := json.Marshal(lrCfg)
if err != nil {
return nil, fmt.Errorf("error marshaling JSON for type %T: %v", lrCfg, err)
}
return makeBalancerConfigJSON(leastrequest.Name, js), nil
}

func convertV1TypedStructToServiceConfig(rawProto []byte, _ int) (json.RawMessage, error) {
tsProto := &v1xdsudpatypepb.TypedStruct{}
if err := proto.Unmarshal(rawProto, tsProto); err != nil {
Expand Down
37 changes: 30 additions & 7 deletions xds/internal/xdsclient/xdslbregistry/xdslbregistry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func wrrLocalityBalancerConfig(childPolicy *internalserviceconfig.BalancerConfig
}

func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
defer func(old bool) { envconfig.LeastRequestLB = old }(envconfig.LeastRequestLB)
envconfig.LeastRequestLB = false

const customLBPolicyName = "myorg.MyCustomLeastRequestPolicy"
stub.Register(customLBPolicyName, stub.BalancerFuncs{})

Expand All @@ -78,6 +81,7 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
wantConfig string // JSON config
rhDisabled bool
pfDisabled bool
lrEnabled bool
}{
{
name: "ring_hash",
Expand All @@ -96,6 +100,22 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
},
wantConfig: `[{"ring_hash_experimental": { "minRingSize": 10, "maxRingSize": 100 }}]`,
},
{
name: "least_request",
policy: &v3clusterpb.LoadBalancingPolicy{
Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{
{
TypedExtensionConfig: &v3corepb.TypedExtensionConfig{
TypedConfig: testutils.MarshalAny(&v3leastrequestpb.LeastRequest{
ChoiceCount: wrapperspb.UInt32(3),
}),
},
},
},
},
wantConfig: `[{"least_request_experimental": { "choiceCount": 3 }}]`,
lrEnabled: true,
},
{
name: "pick_first_shuffle",
policy: &v3clusterpb.LoadBalancingPolicy{
Expand Down Expand Up @@ -183,7 +203,7 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
rhDisabled: true,
},
{
name: "pick_first_disabled_pf_rr_use_first_supported",
name: "pick_first_enabled_pf_rr_use_pick_first",
policy: &v3clusterpb.LoadBalancingPolicy{
Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{
{
Expand All @@ -200,17 +220,16 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
},
},
},
wantConfig: `[{"round_robin": {}}]`,
pfDisabled: true,
wantConfig: `[{"pick_first": { "shuffleAddressList": true }}]`,
},
{
name: "pick_first_enabled_pf_rr_use_pick_first",
name: "least_request_disabled_pf_rr_use_first_supported",
policy: &v3clusterpb.LoadBalancingPolicy{
Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{
{
TypedExtensionConfig: &v3corepb.TypedExtensionConfig{
TypedConfig: testutils.MarshalAny(&v3pickfirstpb.PickFirst{
ShuffleAddressList: true,
TypedConfig: testutils.MarshalAny(&v3leastrequestpb.LeastRequest{
ChoiceCount: wrapperspb.UInt32(32),
}),
},
},
Expand All @@ -221,7 +240,7 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
},
},
},
wantConfig: `[{"pick_first": { "shuffleAddressList": true }}]`,
wantConfig: `[{"round_robin": {}}]`,
},
{
name: "custom_lb_type_v3_struct",
Expand Down Expand Up @@ -317,6 +336,10 @@ func (s) TestConvertToServiceConfigSuccess(t *testing.T) {
defer func(old bool) { envconfig.XDSRingHash = old }(envconfig.XDSRingHash)
envconfig.XDSRingHash = false
}
if test.lrEnabled {
defer func(old bool) { envconfig.LeastRequestLB = old }(envconfig.LeastRequestLB)
envconfig.LeastRequestLB = true
}
if test.pfDisabled {
defer func(old bool) { envconfig.PickFirstLBConfig = old }(envconfig.PickFirstLBConfig)
envconfig.PickFirstLBConfig = false
Expand Down
58 changes: 58 additions & 0 deletions xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/balancer/leastrequest"
_ "google.golang.org/grpc/balancer/roundrobin" // To register round_robin load balancer.
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/envconfig"
Expand Down Expand Up @@ -103,6 +104,8 @@ func (s) TestValidateCluster_Success(t *testing.T) {
defer func() {
envconfig.XDSCustomLBPolicy = origCustomLBSupport
}()
defer func(old bool) { envconfig.LeastRequestLB = old }(envconfig.LeastRequestLB)
envconfig.LeastRequestLB = true
tests := []struct {
name string
cluster *v3clusterpb.Cluster
Expand Down Expand Up @@ -330,6 +333,31 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
},
},
{
name: "happiest-case-with-least-request-lb-policy-with-default-config",
cluster: &v3clusterpb.Cluster{
Name: clusterName,
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{
EdsConfig: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{
Ads: &v3corepb.AggregatedConfigSource{},
},
},
ServiceName: serviceName,
},
LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST,
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName, EDSServiceName: serviceName,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: "least_request_experimental",
Config: &leastrequest.LBConfig{
ChoiceCount: 2,
},
},
},
{
name: "happiest-case-with-ring-hash-lb-policy-with-none-default-config",
cluster: &v3clusterpb.Cluster{
Expand Down Expand Up @@ -367,6 +395,36 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
},
},
{
name: "happiest-case-with-least-request-lb-policy-with-none-default-config",
cluster: &v3clusterpb.Cluster{
Name: clusterName,
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{
EdsConfig: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{
Ads: &v3corepb.AggregatedConfigSource{},
},
},
ServiceName: serviceName,
},
LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST,
LbConfig: &v3clusterpb.Cluster_LeastRequestLbConfig_{
LeastRequestLbConfig: &v3clusterpb.Cluster_LeastRequestLbConfig{
ChoiceCount: wrapperspb.UInt32(3),
},
},
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName, EDSServiceName: serviceName,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: "least_request_experimental",
Config: &leastrequest.LBConfig{
ChoiceCount: 3,
},
},
},
{
name: "happiest-case-with-ring-hash-lb-policy-configured-through-LoadBalancingPolicy",
cluster: &v3clusterpb.Cluster{
Expand Down
22 changes: 22 additions & 0 deletions xds/internal/xdsclient/xdsresource/unmarshal_cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ const (
defaultRingHashMinSize = 1024
defaultRingHashMaxSize = 8 * 1024 * 1024 // 8M
ringHashSizeUpperBound = 8 * 1024 * 1024 // 8M

defaultLeastRequestChoiceCount = 2
)

func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) {
Expand Down Expand Up @@ -104,6 +106,26 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu

rhLBCfg := []byte(fmt.Sprintf("{\"minRingSize\": %d, \"maxRingSize\": %d}", minSize, maxSize))
lbPolicy = []byte(fmt.Sprintf(`[{"ring_hash_experimental": %s}]`, rhLBCfg))
case v3clusterpb.Cluster_LEAST_REQUEST:
if !envconfig.LeastRequestLB {
return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
}

// "The configuration for the Least Request LB policy is the
// least_request_lb_config field. The field is optional; if not present,
// defaults will be assumed for all of its values." - A48
lr := cluster.GetLeastRequestLbConfig()
var choiceCount uint32 = defaultLeastRequestChoiceCount
if cc := lr.GetChoiceCount(); cc != nil {
choiceCount = cc.GetValue()
}
// "If choice_count < 2, the config will be rejected." - A48
if choiceCount < 2 {
return ClusterUpdate{}, fmt.Errorf("Cluster_LeastRequestLbConfig.ChoiceCount must be >= 2, got: %v", choiceCount)
}

lrLBCfg := []byte(fmt.Sprintf("{\"choiceCount\": %d}", choiceCount))
lbPolicy = []byte(fmt.Sprintf(`[{"least_request_experimental": %s}]`, lrLBCfg))
default:
return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
}
Expand Down
15 changes: 14 additions & 1 deletion xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,19 @@ func (s) TestValidateCluster_Failure(t *testing.T) {
wantUpdate: emptyUpdate,
wantErr: true,
},
{
name: "least-request-choice-count-less-than-two",
cluster: &v3clusterpb.Cluster{
LbPolicy: v3clusterpb.Cluster_RING_HASH,
LbConfig: &v3clusterpb.Cluster_LeastRequestLbConfig_{
LeastRequestLbConfig: &v3clusterpb.Cluster_LeastRequestLbConfig{
ChoiceCount: wrapperspb.UInt32(1),
},
},
},
wantUpdate: emptyUpdate,
wantErr: true,
},
{
name: "ring-hash-max-bound-greater-than-upper-bound",
cluster: &v3clusterpb.Cluster{
Expand Down Expand Up @@ -205,7 +218,7 @@ func (s) TestValidateCluster_Failure(t *testing.T) {
wantErr: true,
},
{
name: "least-request-unsupported-in-converter",
name: "least-request-unsupported-in-converter-since-env-var-unset",
cluster: &v3clusterpb.Cluster{
Name: clusterName,
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
Expand Down

0 comments on commit aca07ce

Please sign in to comment.