前景

这里谈 kube-proxy 如何保证规则的一致性以及提升 kube-proxy 性能点的地方,这也是 kubernetes 使用稳定性的一部分。

kube-proxy 如何做到的CRUD

kube-proxy 实际上与其他内置 controller 架构是相同的,实际上也属于一个 controller ,但它属于一个 service, endpoints 的可读可写的控制器,node的读控制器。对于CRUD方面,kube-proxy,在设计上分为 增/改 两方面。正如下面代码所示 pkg/proxy/ipvs/proxier.go

go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
	proxier.OnServiceUpdate(nil, service)
}

// OnServiceUpdate is called whenever modification of an existing service object is observed.
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
	if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
		proxier.Sync()
	}
}

// OnServiceDelete is called whenever deletion of an existing service object is observed.
func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
	proxier.OnServiceUpdate(service, nil)
}

可以看到代码最终调用的都是 OnServiceUpdate,最终 调用的是 proxier.Sync()。对于 Sync(),这里会调用在 Proxier 初始化时注入的那个函数,而 Sync() 本质上是 一个异步有限的函数管理器,这里将实现两个方面,一是,定时去触发执行这个函数;二是满足规则去触发这个函数;而 Sync() 属于条件2

对于注入的函数,则是 proxier.syncProxyRules ,由下列代码可以看到

go
1
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)

这样就是说,kube-proxy 通过 syncProxyRules 实现了整个 service 与 endpoint 的增删改查

性能提升点1

由上面有限的函数管理器 runner,可以作为性能提升点,而该runner初始化时提供了minSyncPeriod, syncPeriod 两个函数,这两个函数代表的意思为,minSyncPeriod是runner允许你在最小多少时间内可以调用,如果你的集群规模大,那么则可以适当配置该参数小写,因为service的频繁更改会被这个参数限制。

如何通过一个函数做CRUD

对于增改,存在三个资源,ClusterIP, NodePort, Ingress,当这些资源被触发时,会同步这个service与endpoint,如代码所示 syncProxyRules

go
1
2
3
4
5
6
7
8
9
if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
    activeIPVSServices[serv.String()] = true
    activeBindAddrs[serv.Address.String()] = true
    if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil {
        klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
    }
} else {
    klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
}

由上面代码可知,所有的 service 与 endpoint 的更新,都会触发 Sync(),而 Sync() 执行的是 syncProxyRules() ,当service有变动时,就会通过 syncService/syncEndpoint 进行同步

而对于删除动作来说,kube-proxy 提供了 cleanLegacyService 函数在变动做完时,进行清理遗留的service规则,如下列代码所示。

go
1
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs)

并且通过两个数组来维护两个 activeIPVSServices , 与 currentIPVSServices 为主,来维护删除的数据

CRUD实际实现

上面了解到了删除与添加的逻辑,下面分析这些是如何进行的

当添加被触发时,会触发 proxier.syncService() ,首先会进行本机上ipvs规则是否存在这个规则,存在则更改,而后返回一个 error, 这个 error 取决于是否更新 endpoints,如下列代码所示

go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool, bindedAddresses sets.String) error {
	appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
	if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
		if appliedVirtualServer == nil {
			// IPVS service is not found, create a new service
			klog.V(3).Infof("Adding new service %q %s:%d/%s", svcName, vs.Address, vs.Port, vs.Protocol)
			if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
				klog.Errorf("Failed to add IPVS service %q: %v", svcName, err)
				return err
			}
		} else {
			// IPVS service was changed, update the existing one
			// During updates, service VIP will not go down
			klog.V(3).Infof("IPVS service %s was changed", svcName)
			if err := proxier.ipvs.UpdateVirtualServer(vs); err != nil {
				klog.Errorf("Failed to update IPVS service, err:%v", err)
				return err
			}
		}
	}

	// bind service address to dummy interface
	if bindAddr {
		// always attempt to bind if bindedAddresses is nil,
		// otherwise check if it's already binded and return early
		if bindedAddresses != nil && bindedAddresses.Has(vs.Address.String()) {
			return nil
		}

		klog.V(4).Infof("Bind addr %s", vs.Address.String())
		_, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
		if err != nil {
			klog.Errorf("Failed to bind service address to dummy device %q: %v", svcName, err)
			return err
		}
	}

	return nil
}

接下来通过后会 触发 proxier.syncEndpoint() 这里传入了当前的 service, 这里是为了与 IPVS 概念相吻合,如IPVS 中存在 RealServers/VirtualServers,首先会拿到本机这个VirtualServer下的所有RealServer,而后进行添加,而后对比 传入的 Endpoints 列表 与 本机这个VirtualServer下的所有RealServer,不相等的则被删除;删除的动作是一个异步操作。由 gracefuldeleteManager 维护

go
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNodeLocalEndpoints bool, vs *utilipvs.VirtualServer) error {
	appliedVirtualServer, err := proxier.ipvs.GetVirtualServer(vs)
	if err != nil || appliedVirtualServer == nil {
		klog.Errorf("Failed to get IPVS service, error: %v", err)
		return err
	}

	// curEndpoints represents IPVS destinations listed from current system.
	curEndpoints := sets.NewString()
	// newEndpoints represents Endpoints watched from API Server.
	newEndpoints := sets.NewString()

	curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer)
	if err != nil {
		klog.Errorf("Failed to list IPVS destinations, error: %v", err)
		return err
	}
	for _, des := range curDests {
		curEndpoints.Insert(des.String())
	}

	endpoints := proxier.endpointsMap[svcPortName]

	// Service Topology will not be enabled in the following cases:
	// 1. externalTrafficPolicy=Local (mutually exclusive with service topology).
	// 2. ServiceTopology is not enabled.
	// 3. EndpointSlice is not enabled (service topology depends on endpoint slice
	// to get topology information).
	if !onlyNodeLocalEndpoints && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) {
		endpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, proxier.serviceMap[svcPortName].TopologyKeys(), endpoints)
	}

	for _, epInfo := range endpoints {
		if onlyNodeLocalEndpoints && !epInfo.GetIsLocal() {
			continue
		}
		newEndpoints.Insert(epInfo.String())
	}

	// Create new endpoints
	for _, ep := range newEndpoints.List() {
		ip, port, err := net.SplitHostPort(ep)
		if err != nil {
			klog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
			continue
		}
		portNum, err := strconv.Atoi(port)
		if err != nil {
			klog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
			continue
		}

		newDest := &utilipvs.RealServer{
			Address: net.ParseIP(ip),
			Port:    uint16(portNum),
			Weight:  1,
		}

		if curEndpoints.Has(ep) {
			// check if newEndpoint is in gracefulDelete list, if true, delete this ep immediately
			uniqueRS := GetUniqueRSName(vs, newDest)
			if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
				continue
			}
			klog.V(5).Infof("new ep %q is in graceful delete list", uniqueRS)
			err := proxier.gracefuldeleteManager.MoveRSOutofGracefulDeleteList(uniqueRS)
			if err != nil {
				klog.Errorf("Failed to delete endpoint: %v in gracefulDeleteQueue, error: %v", ep, err)
				continue
			}
		}
		err = proxier.ipvs.AddRealServer(appliedVirtualServer, newDest)
		if err != nil {
			klog.Errorf("Failed to add destination: %v, error: %v", newDest, err)
			continue
		}
	}
	// Delete old endpoints
	for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() {
		// if curEndpoint is in gracefulDelete, skip
		uniqueRS := vs.String() + "/" + ep
		if proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
			continue
		}
		ip, port, err := net.SplitHostPort(ep)
		if err != nil {
			klog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
			continue
		}
		portNum, err := strconv.Atoi(port)
		if err != nil {
			klog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
			continue
		}

		delDest := &utilipvs.RealServer{
			Address: net.ParseIP(ip),
			Port:    uint16(portNum),
		}

		klog.V(5).Infof("Using graceful delete to delete: %v", uniqueRS)
		err = proxier.gracefuldeleteManager.GracefulDeleteRS(appliedVirtualServer, delDest)
		if err != nil {
			klog.Errorf("Failed to delete destination: %v, error: %v", uniqueRS, err)
			continue
		}
	}
	return nil
}

删除 service 将删除所有的 RealServer,这点上面提到过,kube-proxy 通过 cleanLegacyService 进行删除,如下列代码所示

go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer, legacyBindAddrs map[string]bool) {
	isIPv6 := utilnet.IsIPv6(proxier.nodeIP)
	for cs := range currentServices {
		svc := currentServices[cs]
		if proxier.isIPInExcludeCIDRs(svc.Address) {
			continue
		}
		if utilnet.IsIPv6(svc.Address) != isIPv6 {
			// Not our family
			continue
		}
		if _, ok := activeServices[cs]; !ok {
			klog.V(4).Infof("Delete service %s", svc.String())
			if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
				klog.Errorf("Failed to delete service %s, error: %v", svc.String(), err)
			}
			addr := svc.Address.String()
			if _, ok := legacyBindAddrs[addr]; ok {
				klog.V(4).Infof("Unbinding address %s", addr)
				if err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice); err != nil {
					klog.Errorf("Failed to unbind service addr %s from dummy interface %s: %v", addr, DefaultDummyDevice, err)
				} else {
					// In case we delete a multi-port service, avoid trying to unbind multiple times
					delete(legacyBindAddrs, addr)
				}
			}
		}
	}
}

性能提升点2

由上面的讲解可知,CRUD动作是每一个事件 syncProxyRules 被触发时都会进行执行,而删除动作存在多组循环(如构建维护的两个列表;进行循环删除)即每一次 Endpoints 变动也会触发 大量的 Service 的循环,从而检测 是否由遗留的Service资源,而这个操作保留到kubernetes 1.26版本

假设你的集群节点是5000个,service资源是两万个,那么当你更新一个Service资源循环的次数,会至少循环多达数万次(Service, EndpointSpilt, currentServices, NodeIP, Ingress)其中无用的为currentServices,因为这个只有在删除Service本身才会有效(如果存在20000个service,其中currentServices在构建与对比的过程超过4万次)