Skip to content

Commit

Permalink
[NET-6429] Program ProxyStateTemplate to route cross-partition traffi… (
Browse files Browse the repository at this point in the history
hashicorp#20410)

[NET-6429] Program ProxyStateTemplate to route cross-partition traffic to the correct destination mesh gateway

* Program mesh port to route wildcarded gateway SNI to the appropriate remote partition's mesh gateway

* Update target + route ports in service endpoint refs when building PST

* Use proper name of local datacenter when constructing SNI for gateway target

* Use destination identities for TLS when routing L4 traffic through the mesh gateway

* Use new constants, move comment to correct location

* Use new constants for port names

* Update test assertions

* Undo debug logging change
  • Loading branch information
nathancoleman authored Jan 31, 2024
1 parent c82b78b commit 74e4200
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 101 deletions.
203 changes: 174 additions & 29 deletions internal/mesh/internal/controllers/gatewayproxy/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"google.golang.org/protobuf/types/known/durationpb"

"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/gatewayproxy/fetcher"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/meshgateways"
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/resource"
pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1"
Expand All @@ -24,7 +24,9 @@ import (
"github.com/hashicorp/consul/proto-public/pbresource"
)

const nullRouteClusterName = "null_route_cluster"
const (
nullRouteClusterName = "null_route_cluster"
)

type proxyStateTemplateBuilder struct {
workload *types.DecodedWorkload
Expand All @@ -33,16 +35,18 @@ type proxyStateTemplateBuilder struct {
exportedServices []*pbmulticluster.ComputedExportedService
logger hclog.Logger
trustDomain string
remoteGatewayIDs []*pbresource.ID
}

func NewProxyStateTemplateBuilder(workload *types.DecodedWorkload, exportedServices []*pbmulticluster.ComputedExportedService, logger hclog.Logger, dataFetcher *fetcher.Fetcher, dc, trustDomain string) *proxyStateTemplateBuilder {
func NewProxyStateTemplateBuilder(workload *types.DecodedWorkload, exportedServices []*pbmulticluster.ComputedExportedService, logger hclog.Logger, dataFetcher *fetcher.Fetcher, dc, trustDomain string, remoteGatewayIDs []*pbresource.ID) *proxyStateTemplateBuilder {
return &proxyStateTemplateBuilder{
workload: workload,
dataFetcher: dataFetcher,
dc: dc,
exportedServices: exportedServices,
logger: logger,
trustDomain: trustDomain,
remoteGatewayIDs: remoteGatewayIDs,
}
}

Expand All @@ -67,8 +71,15 @@ func (b *proxyStateTemplateBuilder) listeners() []*pbproxystate.Listener {
// if the address defines no ports we assume the intention is to bind to all
// ports on the workload
if len(address.Ports) == 0 {
for _, workloadPort := range b.workload.Data.Ports {
listeners = append(listeners, b.buildListener(address, workloadPort.Port))
for portName, workloadPort := range b.workload.Data.Ports {
switch portName {
case meshgateways.LANPortName:
listeners = append(listeners, b.meshListener(address, workloadPort.Port))
case meshgateways.WANPortName:
listeners = append(listeners, b.wanListener(address, workloadPort.Port))
default:
b.logger.Warn("encountered unexpected port on mesh gateway workload", "port", portName)
}
}
return listeners
}
Expand All @@ -80,16 +91,38 @@ func (b *proxyStateTemplateBuilder) listeners() []*pbproxystate.Listener {
continue
}

listeners = append(listeners, b.buildListener(address, workloadPort.Port))
switch portName {
case meshgateways.LANPortName:
listeners = append(listeners, b.meshListener(address, workloadPort.Port))
case meshgateways.WANPortName:
listeners = append(listeners, b.wanListener(address, workloadPort.Port))
default:
b.logger.Warn("encountered unexpected port on mesh gateway workload", "port", portName)
}
}

return listeners
}

func (b *proxyStateTemplateBuilder) buildListener(address *pbcatalog.WorkloadAddress, port uint32) *pbproxystate.Listener {
// meshListener constructs a pbproxystate.Listener that receives outgoing
// traffic from the local partition where the mesh gateway mode is "local". This
// traffic will be sent to a mesh gateway in a remote partition.
func (b *proxyStateTemplateBuilder) meshListener(address *pbcatalog.WorkloadAddress, port uint32) *pbproxystate.Listener {
return b.listener("mesh_listener", address, port, pbproxystate.Direction_DIRECTION_OUTBOUND, b.meshRouters())
}

// wanListener constructs a pbproxystate.Listener that receives incoming
// traffic from the public internet, either from a mesh gateway in a remote partition
// where the mesh gateway mode is "local" or from a service in a remote partition
// where the mesh gateway mode is "remote".
func (b *proxyStateTemplateBuilder) wanListener(address *pbcatalog.WorkloadAddress, port uint32) *pbproxystate.Listener {
return b.listener("wan_listener", address, port, pbproxystate.Direction_DIRECTION_INBOUND, b.wanRouters())
}

func (b *proxyStateTemplateBuilder) listener(name string, address *pbcatalog.WorkloadAddress, port uint32, direction pbproxystate.Direction, routers []*pbproxystate.Router) *pbproxystate.Listener {
return &pbproxystate.Listener{
Name: xdscommon.PublicListenerName,
Direction: pbproxystate.Direction_DIRECTION_INBOUND,
Name: name,
Direction: direction,
BindAddress: &pbproxystate.Listener_HostPort{
HostPort: &pbproxystate.HostPortAddress{
Host: address.Host,
Expand All @@ -111,14 +144,55 @@ func (b *proxyStateTemplateBuilder) buildListener(address *pbcatalog.WorkloadAdd
},
},
},
Routers: b.routers(),
Routers: routers,
}
}

// routers loops through the ports and consumers for each exported service and generates
// a pbproxystate.Router matching the SNI to the target cluster. The target port name
// will be included in the ALPN. The targeted cluster will marry this port name with the SNI.
func (b *proxyStateTemplateBuilder) routers() []*pbproxystate.Router {
// meshRouters loops through the list of mesh gateways in other partitions and generates
// a pbproxystate.Router matching the partition + datacenter of the SNI to the target
// cluster. Traffic flowing through this router originates in the local partition where
// the mesh gateway mode is "local".
func (b *proxyStateTemplateBuilder) meshRouters() []*pbproxystate.Router {
var routers []*pbproxystate.Router

for _, remoteGatewayID := range b.remoteGatewayIDs {
serviceID := resource.ReplaceType(pbcatalog.ServiceType, remoteGatewayID)
service, err := b.dataFetcher.FetchService(context.Background(), serviceID)
if err != nil {
b.logger.Trace("error reading exported service", "error", err)
continue
} else if service == nil {
b.logger.Trace("service does not exist, skipping router", "service", serviceID)
continue
}

routers = append(routers, &pbproxystate.Router{
Match: &pbproxystate.Match{
ServerNames: []string{
fmt.Sprintf("*.%s", b.clusterNameForRemoteGateway(remoteGatewayID)),
},
},
Destination: &pbproxystate.Router_L4{
L4: &pbproxystate.L4Destination{
Destination: &pbproxystate.L4Destination_Cluster{
Cluster: &pbproxystate.DestinationCluster{
Name: b.clusterNameForRemoteGateway(remoteGatewayID),
},
},
StatPrefix: "prefix",
},
},
})
}

return routers
}

// wanRouters loops through the ports and consumers for each exported service and generates
// a pbproxystate.Router matching the SNI to the target cluster. Traffic flowing through this
// router originates from a mesh gateway in a remote partition where the mesh gateway mode is
// "local" or from a service in a remote partition where the mesh gateway mode is "remote".
func (b *proxyStateTemplateBuilder) wanRouters() []*pbproxystate.Router {
var routers []*pbproxystate.Router

for _, exportedService := range b.exportedServices {
Expand All @@ -133,17 +207,21 @@ func (b *proxyStateTemplateBuilder) routers() []*pbproxystate.Router {
}

for _, port := range service.Data.Ports {
if port.Protocol == pbcatalog.Protocol_PROTOCOL_MESH {
continue
}

for _, consumer := range exportedService.Consumers {
routers = append(routers, &pbproxystate.Router{
Match: &pbproxystate.Match{
AlpnProtocols: []string{alpnProtocol(port.TargetPort)},
ServerNames: []string{b.sni(exportedService.TargetRef, consumer)},
ServerNames: []string{b.sniForExportedService(exportedService.TargetRef, consumer)},
},
Destination: &pbproxystate.Router_L4{
L4: &pbproxystate.L4Destination{
Destination: &pbproxystate.L4Destination_Cluster{
Cluster: &pbproxystate.DestinationCluster{
Name: b.clusterName(exportedService.TargetRef, consumer, port.TargetPort),
Name: b.clusterNameForExportedService(exportedService.TargetRef, consumer, port.TargetPort),
},
},
StatPrefix: "prefix",
Expand All @@ -160,6 +238,7 @@ func (b *proxyStateTemplateBuilder) routers() []*pbproxystate.Router {
func (b *proxyStateTemplateBuilder) clusters() map[string]*pbproxystate.Cluster {
clusters := map[string]*pbproxystate.Cluster{}

// Clusters handling incoming traffic from a remote partition
for _, exportedService := range b.exportedServices {
serviceID := resource.IDFromReference(exportedService.TargetRef)
service, err := b.dataFetcher.FetchService(context.Background(), serviceID)
Expand All @@ -172,8 +251,12 @@ func (b *proxyStateTemplateBuilder) clusters() map[string]*pbproxystate.Cluster
}

for _, port := range service.Data.Ports {
if port.Protocol == pbcatalog.Protocol_PROTOCOL_MESH {
continue
}

for _, consumer := range exportedService.Consumers {
clusterName := b.clusterName(exportedService.TargetRef, consumer, port.TargetPort)
clusterName := b.clusterNameForExportedService(exportedService.TargetRef, consumer, port.TargetPort)
clusters[clusterName] = &pbproxystate.Cluster{
Name: clusterName,
Protocol: pbproxystate.Protocol_PROTOCOL_TCP, // TODO
Expand All @@ -188,6 +271,31 @@ func (b *proxyStateTemplateBuilder) clusters() map[string]*pbproxystate.Cluster
}
}

// Clusters handling outgoing traffic from the local partition
for _, remoteGatewayID := range b.remoteGatewayIDs {
serviceID := resource.ReplaceType(pbcatalog.ServiceType, remoteGatewayID)
service, err := b.dataFetcher.FetchService(context.Background(), serviceID)
if err != nil {
b.logger.Trace("error reading exported service", "error", err)
continue
} else if service == nil {
b.logger.Trace("service does not exist, skipping router", "service", serviceID)
continue
}

clusterName := b.clusterNameForRemoteGateway(remoteGatewayID)
clusters[clusterName] = &pbproxystate.Cluster{
Name: clusterName,
Protocol: pbproxystate.Protocol_PROTOCOL_TCP, // TODO
Group: &pbproxystate.Cluster_EndpointGroup{
EndpointGroup: &pbproxystate.EndpointGroup{
Group: &pbproxystate.EndpointGroup_Dynamic{},
},
},
AltStatName: "prefix",
}
}

// Add null route cluster for any unmatched traffic
clusters[nullRouteClusterName] = &pbproxystate.Cluster{
Name: nullRouteClusterName,
Expand Down Expand Up @@ -232,6 +340,7 @@ func (b *proxyStateTemplateBuilder) Build() *meshv2beta1.ProxyStateTemplate {
func (b *proxyStateTemplateBuilder) requiredEndpoints() map[string]*pbproxystate.EndpointRef {
requiredEndpoints := make(map[string]*pbproxystate.EndpointRef)

// Endpoints for clusters handling incoming traffic from another partition
for _, exportedService := range b.exportedServices {
serviceID := resource.IDFromReference(exportedService.TargetRef)
service, err := b.dataFetcher.FetchService(context.Background(), serviceID)
Expand All @@ -244,38 +353,74 @@ func (b *proxyStateTemplateBuilder) requiredEndpoints() map[string]*pbproxystate
}

for _, port := range service.Data.Ports {
if port.Protocol == pbcatalog.Protocol_PROTOCOL_MESH {
continue
}

for _, consumer := range exportedService.Consumers {
clusterName := b.clusterName(exportedService.TargetRef, consumer, port.TargetPort)
clusterName := b.clusterNameForExportedService(exportedService.TargetRef, consumer, port.TargetPort)

requiredEndpoints[clusterName] = &pbproxystate.EndpointRef{
Id: resource.ReplaceType(pbcatalog.ServiceEndpointsType, serviceID),
// In the case of a mesh gateway, the route port and mesh port are the same, since you are always
// routing to same port that you add in the endpoint. This is different from a sidecar proxy, where
// the receiving proxy listens on the mesh port and forwards to a different workload port.
Id: resource.ReplaceType(pbcatalog.ServiceEndpointsType, serviceID),
RoutePort: port.TargetPort,
MeshPort: port.TargetPort,
MeshPort: "mesh",
}
}
}
}

// Endpoints for clusters handling outgoing traffic from the local partition
for _, remoteGatewayID := range b.remoteGatewayIDs {
serviceID := resource.ReplaceType(pbcatalog.ServiceType, remoteGatewayID)
service, err := b.dataFetcher.FetchService(context.Background(), serviceID)
if err != nil {
b.logger.Trace("error reading exported service", "error", err)
continue
} else if service == nil {
b.logger.Trace("service does not exist, skipping router", "service", serviceID)
continue
}

clusterName := b.clusterNameForRemoteGateway(remoteGatewayID)

// In the case of a mesh gateway, the route port and mesh port are the same, since you are always
// routing to same port that you add in the endpoint. This is different from a sidecar proxy, where
// the receiving proxy listens on the mesh port and forwards to a different workload port.
requiredEndpoints[clusterName] = &pbproxystate.EndpointRef{
Id: resource.ReplaceType(pbcatalog.ServiceEndpointsType, serviceID),
MeshPort: meshgateways.WANPortName,
RoutePort: meshgateways.WANPortName,
}
}

return requiredEndpoints
}

func (b *proxyStateTemplateBuilder) clusterName(serviceRef *pbresource.Reference, consumer *pbmulticluster.ComputedExportedServiceConsumer, port string) string {
return fmt.Sprintf("%s.%s", port, b.sni(serviceRef, consumer))
// clusterNameForExportedService generates a cluster name for a given service
// that is being exported from the local partition to a remote partition. This
// partition may reside in the same datacenter or in a remote datacenter.
func (b *proxyStateTemplateBuilder) clusterNameForExportedService(serviceRef *pbresource.Reference, consumer *pbmulticluster.ComputedExportedServiceConsumer, port string) string {
return fmt.Sprintf("%s.%s", port, b.sniForExportedService(serviceRef, consumer))
}

func (b *proxyStateTemplateBuilder) sni(serviceRef *pbresource.Reference, consumer *pbmulticluster.ComputedExportedServiceConsumer) string {
switch tConsumer := consumer.Tenancy.(type) {
func (b *proxyStateTemplateBuilder) sniForExportedService(serviceRef *pbresource.Reference, consumer *pbmulticluster.ComputedExportedServiceConsumer) string {
switch consumer.Tenancy.(type) {
case *pbmulticluster.ComputedExportedServiceConsumer_Partition:
return connect.ServiceSNI(serviceRef.Name, "", serviceRef.Tenancy.Namespace, tConsumer.Partition, b.dc, b.trustDomain)
return connect.ServiceSNI(serviceRef.Name, "", serviceRef.Tenancy.Namespace, serviceRef.Tenancy.Partition, b.dc, b.trustDomain)
case *pbmulticluster.ComputedExportedServiceConsumer_Peer:
return connect.PeeredServiceSNI(serviceRef.Name, serviceRef.Tenancy.Namespace, serviceRef.Tenancy.Partition, tConsumer.Peer, b.trustDomain)
return connect.PeeredServiceSNI(serviceRef.Name, serviceRef.Tenancy.Namespace, serviceRef.Tenancy.Partition, b.dc, b.trustDomain)
default:
return ""
}
}

// clusterNameForRemoteGateway generates a cluster name for a given remote mesh
// gateway. This will be used to route traffic from the local partition to the mesh
// gateway for a remote partition.
func (b *proxyStateTemplateBuilder) clusterNameForRemoteGateway(remoteGatewayID *pbresource.ID) string {
return connect.GatewaySNI(b.dc, remoteGatewayID.Tenancy.Partition, b.trustDomain)
}

func alpnProtocol(portName string) string {
return fmt.Sprintf("consul~%s", portName)
}
Loading

0 comments on commit 74e4200

Please sign in to comment.