本文发布于Cylon的收藏册,转载请著名原文链接~
前景
这里谈 kube-proxy
如何保证规则的一致性以及提升 kube-proxy
性能点的地方,这也是 kubernetes 使用稳定性的一部分。
kube-proxy 如何做到的CRUD
kube-proxy
实际上与其他内置 controller 架构是相同的,实际上也属于一个 controller ,但它属于一个 service, endpoints 的可读可写的控制器,node的读控制器。对于CRUD方面,kube-proxy,在设计上分为 增/改 两方面。正如下面代码所示 pkg/proxy/ipvs/proxier.go
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
proxier.OnServiceUpdate(nil, service)
}
// OnServiceUpdate is called whenever modification of an existing service object is observed.
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
proxier.Sync()
}
}
// OnServiceDelete is called whenever deletion of an existing service object is observed.
func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
proxier.OnServiceUpdate(service, nil)
}
可以看到代码最终调用的都是 OnServiceUpdate
,最终 调用的是 proxier.Sync()
。对于 Sync(),这里会调用在 Proxier 初始化时注入的那个函数,而 Sync() 本质上是 一个异步有限的函数管理器,这里将实现两个方面,一是,定时去触发执行这个函数;二是满足规则去触发这个函数;而 Sync() 属于条件2
对于注入的函数,则是 proxier.syncProxyRules ,由下列代码可以看到
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
这样就是说,kube-proxy 通过 syncProxyRules 实现了整个 service 与 endpoint 的增删改查
性能提升点1
由上面有限的函数管理器 runner,可以作为性能提升点,而该runner初始化时提供了minSyncPeriod, syncPeriod 两个函数,这两个函数代表的意思为,minSyncPeriod是runner允许你在最小多少时间内可以调用,如果你的集群规模大,那么则可以适当配置该参数小写,因为service的频繁更改会被这个参数限制。
如何通过一个函数做CRUD
对于增改,存在三个资源,ClusterIP, NodePort, Ingress,当这些资源被触发时,会同步这个service与endpoint,如代码所示 syncProxyRules
if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil {
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
}
} else {
klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
}
由上面代码可知,所有的 service 与 endpoint 的更新,都会触发 Sync()
,而 Sync()
执行的是 syncProxyRules()
,当service有变动时,就会通过 syncService/syncEndpoint 进行同步
而对于删除动作来说,kube-proxy 提供了 cleanLegacyService 函数在变动做完时,进行清理遗留的service规则,如下列代码所示。
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs)
并且通过两个数组来维护两个 activeIPVSServices , 与 currentIPVSServices 为主,来维护删除的数据
CRUD实际实现
上面了解到了删除与添加的逻辑,下面分析这些是如何进行的
当添加被触发时,会触发 proxier.syncService() ,首先会进行本机上ipvs规则是否存在这个规则,存在则更改,而后返回一个 error, 这个 error 取决于是否更新 endpoints,如下列代码所示
func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool, bindedAddresses sets.String) error {
appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
if appliedVirtualServer == nil {
// IPVS service is not found, create a new service
klog.V(3).Infof("Adding new service %q %s:%d/%s", svcName, vs.Address, vs.Port, vs.Protocol)
if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
klog.Errorf("Failed to add IPVS service %q: %v", svcName, err)
return err
}
} else {
// IPVS service was changed, update the existing one
// During updates, service VIP will not go down
klog.V(3).Infof("IPVS service %s was changed", svcName)
if err := proxier.ipvs.UpdateVirtualServer(vs); err != nil {
klog.Errorf("Failed to update IPVS service, err:%v", err)
return err
}
}
}
// bind service address to dummy interface
if bindAddr {
// always attempt to bind if bindedAddresses is nil,
// otherwise check if it's already binded and return early
if bindedAddresses != nil && bindedAddresses.Has(vs.Address.String()) {
return nil
}
klog.V(4).Infof("Bind addr %s", vs.Address.String())
_, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
if err != nil {
klog.Errorf("Failed to bind service address to dummy device %q: %v", svcName, err)
return err
}
}
return nil
}
接下来通过后会 触发 proxier.syncEndpoint() 这里传入了当前的 service, 这里是为了与 IPVS 概念相吻合,如IPVS 中存在 RealServers/VirtualServers,首先会拿到本机这个VirtualServer下的所有RealServer,而后进行添加,而后对比 传入的 Endpoints 列表 与 本机这个VirtualServer下的所有RealServer,不相等的则被删除;删除的动作是一个异步操作。由 gracefuldeleteManager 维护
func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, vs *utilipvs.VirtualServer) error {
appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(vs)
if err != nil || appliedVirtualServer == nil {
klog.Errorf("Failed to get IPVS service, error: %v", err)
return err
}
// curEndpoints represents IPVS destinations listed from current system.
curEndpoints := sets.NewString()
// newEndpoints represents Endpoints watched from API Server.
newEndpoints := sets.NewString()
curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer)
if err != nil {
klog.Errorf("Failed to list IPVS destinations, error: %v", err)
return err
}
for _, des := range curDests {
curEndpoints.Insert(des.String())
}
endpoints := proxier.endpointsMap[svcPortName]
// Service Topology will not be enabled in the following cases:
// 1. externalTrafficPolicy=Local (mutually exclusive with service topology).
// 2. ServiceTopology is not enabled.
// 3. EndpointSlice is not enabled (service topology depends on endpoint slice
// to get topology information).
if !onlyNodeLocalEndpoints && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) {
endpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, proxier.serviceMap[svcPortName].TopologyKeys(), endpoints)
}
for _, epInfo := range endpoints {
if onlyNodeLocalEndpoints && !epInfo.GetIsLocal() {
continue
}
newEndpoints.Insert(epInfo.String())
}
// Create new endpoints
for _, ep := range newEndpoints.List() {
ip, port, err := net.SplitHostPort(ep)
if err != nil {
klog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
continue
}
portNum, err := strconv.Atoi(port)
if err != nil {
klog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
continue
}
newDest := &utilipvs.RealServer{
Address: net.ParseIP(ip),
Port: uint16(portNum),
Weight: 1,
}
if curEndpoints.Has(ep) {
// check if newEndpoint is in gracefulDelete list, if true, delete this ep immediately
uniqueRS := GetUniqueRSName(vs, newDest)
if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
continue
}
klog.V(5).Infof("new ep %q is in graceful delete list", uniqueRS)
err := proxier.gracefuldeleteManager.MoveRSOutofGracefulDeleteList(uniqueRS)
if err != nil {
klog.Errorf("Failed to delete endpoint: %v in gracefulDeleteQueue, error: %v", ep, err)
continue
}
}
err = proxier.ipvs.AddRealServer(appliedVirtualServer, newDest)
if err != nil {
klog.Errorf("Failed to add destination: %v, error: %v", newDest, err)
continue
}
}
// Delete old endpoints
for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() {
// if curEndpoint is in gracefulDelete, skip
uniqueRS := vs.String() + "/" + ep
if proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
continue
}
ip, port, err := net.SplitHostPort(ep)
if err != nil {
klog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
continue
}
portNum, err := strconv.Atoi(port)
if err != nil {
klog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
continue
}
delDest := &utilipvs.RealServer{
Address: net.ParseIP(ip),
Port: uint16(portNum),
}
klog.V(5).Infof("Using graceful delete to delete: %v", uniqueRS)
err = proxier.gracefuldeleteManager.GracefulDeleteRS(appliedVirtualServer, delDest)
if err != nil {
klog.Errorf("Failed to delete destination: %v, error: %v", uniqueRS, err)
continue
}
}
return nil
}
删除 service 将删除所有的 RealServer,这点上面提到过,kube-proxy 通过 cleanLegacyService 进行删除,如下列代码所示
func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer, legacyBindAddrs map[string]bool) {
isIPv6 := utilnet.IsIPv6(proxier.nodeIP)
for cs := range currentServices {
svc := currentServices[cs]
if proxier.isIPInExcludeCIDRs(svc.Address) {
continue
}
if utilnet.IsIPv6(svc.Address) != isIPv6 {
// Not our family
continue
}
if _, ok := activeServices[cs]; !ok {
klog.V(4).Infof("Delete service %s", svc.String())
if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
klog.Errorf("Failed to delete service %s, error: %v", svc.String(), err)
}
addr := svc.Address.String()
if _, ok := legacyBindAddrs[addr]; ok {
klog.V(4).Infof("Unbinding address %s", addr)
if err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice); err != nil {
klog.Errorf("Failed to unbind service addr %s from dummy interface %s: %v", addr, DefaultDummyDevice, err)
} else {
// In case we delete a multi-port service, avoid trying to unbind multiple times
delete(legacyBindAddrs, addr)
}
}
}
}
}
性能提升点2
由上面的讲解可知,CRUD动作是每一个事件 syncProxyRules 被触发时都会进行执行,而删除动作存在多组循环(如构建维护的两个列表;进行循环删除)即每一次 Endpoints 变动也会触发 大量的 Service 的循环,从而检测 是否由遗留的Service资源,而这个操作保留到kubernetes 1.26版本
假设你的集群节点是5000个,service资源是两万个,那么当你更新一个Service资源循环的次数,会至少循环多达数万次(Service, EndpointSpilt, currentServices, NodeIP, Ingress)其中无用的为currentServices,因为这个只有在删除Service本身才会有效(如果存在20000个service,其中currentServices在构建与对比的过程超过4万次)
本文发布于Cylon的收藏册,转载请著名原文链接~
链接:https://www.oomkill.com/2023/03/ch24-kube-proxy-performance/
版权:本作品采用「署名-非商业性使用-相同方式共享 4.0 国际」 许可协议进行许可。