kubeproxy源码分析.docx
《kubeproxy源码分析.docx》由会员分享,可在线阅读,更多相关《kubeproxy源码分析.docx(40页珍藏版)》请在冰点文库上搜索。
![kubeproxy源码分析.docx](https://file1.bingdoc.com/fileroot1/2023-4/28/8b184a8f-d476-49eb-9e95-1b738b4224c3/8b184a8f-d476-49eb-9e95-1b738b4224c31.gif)
kubeproxy源码分析
kube-proxy源码分析
源码目录结构分析
cmd/kube-proxy//负责kube-proxy的创建,启动的入口
.
├──app
│├──conntrack.go//linuxkernel的nf_conntrack-sysctl的interface定义,更多关于conntracker的定义请看https:
//www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt
│├──options
││└──options.go//kube-proxy的参数定义ProxyServerConfig及相关方法
│├──server.go//ProxyServer结构定义及其创建(NewProxyServerDefault)和运行(Run)的方法。
│└──server_test.go
└──proxy.go//kube-proxy的main方法
pkg/proxy
.
├──OWNERS
├──config
│├──api.go//给proxy配置Service和Endpoint的Reflectors和Cache.Store
│├──api_test.go
│├──config.go//定义ServiceUpdate,EndpointUpdate结构体以及ServiceConfigHandler,EndpointConfigHandler来处理Service和Endpoint的Update
│├──config_test.go
│└──doc.go
├──doc.go
├──healthcheck//负责servicelistener和endpoint的healthcheck,add/delete请求。
│├──api.go
│├──doc.go
│├──healthcheck.go
│├──healthcheck_test.go
│├──http.go
│├──listener.go
│└──worker.go
├──iptables//proxymode为iptables的实现
│├──proxier.go
│└──proxier_test.go
├──types.go
├──userspace//proxymode为userspace的实现
│├──loadbalancer.go
│├──port_allocator.go
│├──port_allocator_test.go
│├──proxier.go
│├──proxier_test.go
│├──proxysocket.go
│├──rlimit.go
│├──rlimit_windows.go
│├──roundrobin.go
│├──roundrobin_test.go
│└──udp_server.go
└──winuserspace//windowsOS时,proxymode为userspace的实现
├──loadbalancer.go
├──port_allocator.go
├──port_allocator_test.go
├──proxier.go
├──proxier_test.go
├──proxysocket.go
├──roundrobin.go
├──roundrobin_test.go
└──udp_server.go
源码分析
main
kube-proxy的main入口在:
cmd/kube-proxy/proxy.Go:
39
funcmain(){
//创建kube-proxy的默认config对象
config:
=options.NewProxyConfig()
//用kube-proxy命令行的参数替换默认参数
config.AddFlags(pflag.CommandLine)
flag.InitFlags()
logs.InitLogs()
deferlogs.FlushLogs()
verflag.PrintAndExitIfRequested()
//根据config创建ProxyServer
s,err:
=app.NewProxyServerDefault(config)
iferr!
=nil{
fmt.Fprintf(os.Stderr,"%v\n",err)
os.Exit
(1)
}
//执行Run方法让kube-proxy开始干活了
iferr=s.Run();err!
=nil{
fmt.Fprintf(os.Stderr,"%v\n",err)
os.Exit
(1)
}
}
main方法中,我们重点关注app.NewProxyServerDefault(config)创建ProxyServer和Run方法。
创建ProxyServer
NewProxyServerDefault负责根据提供的config参数创建一个新的ProxyServer对象,其代码比较长,逻辑相对复杂,下面会挑重点说一下。
cmd/kube-proxy/app/server.go:
131
funcNewProxyServerDefault(config*options.ProxyServerConfig)(*ProxyServer,error){
...
//Createaiptablesutils.
execer:
=exec.New()
ifruntime.GOOS=="windows"{
netshInterface=utilnetsh.New(execer)
}else{
dbus=utildbus.New()
iptInterface=utiliptables.New(execer,dbus,protocol)
}
...
//设置OOM_SCORE_ADJ
varoomAdjuster*oom.OOMAdjuster
ifconfig.OOMScoreAdj!
=nil{
oomAdjuster=oom.NewOOMAdjuster()
iferr:
=oomAdjuster.ApplyOOMScoreAdj(0,int(*config.OOMScoreAdj));err!
=nil{
glog.V
(2).Info(err)
}
}
...
//CreateaKubeClient
...
//创建eventBroadcaster和eventrecorder
hostname:
=nodeutil.GetHostname(config.HostnameOverride)
eventBroadcaster:
=record.NewBroadcaster()
recorder:
=eventBroadcaster.NewRecorder(v1.EventSource{Component:
"kube-proxy",Host:
hostname})
//定义proxier和endpointsHandler,分别用于处理services和endpoints的updateevent。
varproxierproxy.ProxyProvider
varendpointsHandlerproxyconfig.EndpointsConfigHandler
//从config中获取proxymode
proxyMode:
=getProxyMode(string(config.Mode),client.Core().Nodes(),hostname,iptInterface,iptables.LinuxKernelCompatTester{})
//proxymode为iptables场景
ifproxyMode==proxyModeIPTables{
glog.V(0).Info("UsingiptablesProxier.")
ifconfig.IPTablesMasqueradeBit==nil{
//IPTablesMasqueradeBitmustbespecifiedordefaulted.
returnnil,fmt.Errorf("UnabletoreadIPTablesMasqueradeBitfromconfig")
}
//调用pkg/proxy/iptables/proxier.go:
222中的iptables.NewProxier来创建proxier,赋值给前面定义的proxier和endpointsHandler,表示由该proxier同时负责service和endpoint的event处理。
proxierIPTables,err:
=iptables.NewProxier(iptInterface,utilsysctl.New(),execer,config.IPTablesSyncPeriod.Duration,config.IPTablesMinSyncPeriod.Duration,config.MasqueradeAll,int(*config.IPTablesMasqueradeBit),config.ClusterCIDR,hostname,getNodeIP(client,hostname))
iferr!
=nil{
glog.Fatalf("Unabletocreateproxier:
%v",err)
}
proxier=proxierIPTables
endpointsHandler=proxierIPTables
//Noturningback.RemoveartifactsthatmightstillexistfromtheuserspaceProxier.
glog.V(0).Info("Tearingdownuserspacerules.")
userspace.CleanupLeftovers(iptInterface)
}
//proxymode为userspace场景
else{
glog.V(0).Info("UsinguserspaceProxier.")
//Thisisaproxy.LoadBalancerwhichNewProxierneedsbuthasmethodswedon'tneedfor
//ourconfig.EndpointsConfigHandler.
loadBalancer:
=userspace.NewLoadBalancerRR()
//setEndpointsConfigHandlertoourloadBalancer
endpointsHandler=loadBalancer
varproxierUserspaceproxy.ProxyProvider
//windowsOS场景下,调用pkg/proxy/winuserspace/proxier.go:
146的winuserspace.NewProxier来创建proxier。
ifruntime.GOOS=="windows"{
proxierUserspace,err=winuserspace.NewProxier(
loadBalancer,
net.ParseIP(config.BindAddress),
netshInterface,
*utilnet.ParsePortRangeOrDie(config.PortRange),
//TODO@piresreplacebelowwithdefaultvalues,ifapplicable
config.IPTablesSyncPeriod.Duration,
config.UDPIdleTimeout.Duration,
)
}
//linuxOS场景下,调用pkg/proxy/userspace/proxier.go:
143的userspace.NewProxier来创建proxier。
else{
proxierUserspace,err=userspace.NewProxier(
loadBalancer,
net.ParseIP(config.BindAddress),
iptInterface,
*utilnet.ParsePortRangeOrDie(config.PortRange),
config.IPTablesSyncPeriod.Duration,
config.IPTablesMinSyncPeriod.Duration,
config.UDPIdleTimeout.Duration,
)
}
iferr!
=nil{
glog.Fatalf("Unabletocreateproxier:
%v",err)
}
proxier=proxierUserspace
//Removeartifactsfromthepure-iptablesProxier,ifnotonWindows.
ifruntime.GOOS!
="windows"{
glog.V(0).Info("Tearingdownpure-iptablesproxyrules.")
iptables.CleanupLeftovers(iptInterface)
}
}
//Addiptablesreloadfunction,ifnotonWindows.
ifruntime.GOOS!
="windows"{
iptInterface.AddReloadFunc(proxier.Sync)
}
//Createconfigs(i.e.WatchesforServicesandEndpoints)
//创建serviceConfig负责service的watchforUpdates
serviceConfig:
=proxyconfig.NewServiceConfig()
//给serviceConfig注册proxier,既添加对应的listener用来处理serviceupdate时逻辑。
serviceConfig.RegisterHandler(proxier)
//创建endpointsConfig负责endpoint的watchforUpdates
endpointsConfig:
=proxyconfig.NewEndpointsConfig()
//给endpointsConfig注册endpointsHandler,既添加对应的listener用来处理endpointupdate时的逻辑。
endpointsConfig.RegisterHandler(endpointsHandler)
//NewSourceAPIcreatesconfigsourcethatwatchesforchangestotheservicesandendpoints.
//NewSourceAPI通过ListWatchapiserver的Service和endpoint,并周期性的维护serviceStore和endpointStore的更新
proxyconfig.NewSourceAPI(
client.Core().RESTClient(),
config.ConfigSyncPeriod,
serviceConfig.Channel("api"),//ServiceUpdateChannel
endpointsConfig.Channel("api"),//endpointupdatechannel
)
...
//把前面创建的对象作为参数,构造出ProxyServer对象。
returnNewProxyServer(client,config,iptInterface,proxier,eventBroadcaster,recorder,conntracker,proxyMode)
}
NewProxyServerDefault中的核心逻辑我都已经在上述代码中添加了注释,其中有几个地方需要我们再深入跟进去看看:
proxyconfig.NewServiceConfig,proxyconfig.NewEndpointsConfig,serviceConfig.RegisterHandler,endpointsConfig.RegisterHandler,proxyconfig.NewSourceAPI。
proxyconfig.NewServiceConfig
我们对ServiceConfig的代码分析一遍,EndpointsConfig的代码则类似。
pkg/proxy/config/config.go:
192
funcNewServiceConfig()*ServiceConfig{
//创建updateschannel
updates:
=make(chanstruct{},1)
//构建serviceStore对象
store:
=&serviceStore{updates:
updates,services:
make(map[string]map[types.NamespacedName]api.Service)}
mux:
=config.NewMux(store)
//新建Broadcaster,在后续的serviceConfig.RegisterHandler会注册该Broadcaster的listener。
bcaster:
=config.NewBroadcaster()
//启动协程,马上开始watchupdateschannel
gowatchForUpdates(bcaster,store,updates)
return&ServiceConfig{mux,bcaster,store}
}
下面我们再跟进watchForUpdates去看看。
pkg/proxy/config/config.go:
292
funcwatchForUpdates(bcaster*config.Broadcaster,accessorconfig.Accessor,updates<-chanstruct{}){
fortrue{
<-updates
bcaster.Notify(accessor.MergedState())
}
}
watchForUpdates就是一直在watchupdateschannel,如果有数据,则立刻由该BroadcasterNotify到注册的listeners。
Notify的代码如下,可见,它负责将数据通知给所有的listener,并调用各个listener的OnUpdate方法。
pkg/util/config/config.go:
133
//Notifynotifiesalllisteners.
func(b*Broadcaster)Notify(instanceinterface{}){
b.listenerLock.RLock()
listeners:
=b.listeners
b.listenerLock.RUnlock()
for_,listener:
=rangelisteners{
listener.OnUpdate(instance)
}
}
func(fListenerFunc)OnUpdate(instanceinterface{}){
f(instance)
}
serviceConfig.RegisterHandler
上面分析的proxyconfig.NewServiceConfig负责创建ServiceConfig,开始watchupdateschannel了,当从channel中取到值的时候,Broadcaster就会通知listener进行处理。
serviceConfig.RegisterHandler正是负责给Broadcaster注册listener的,其代码如下。
pkg/proxy/config/config.go:
205
func(c*ServiceConfig)RegisterHandler(handlerServiceConfigHandler){
//给ServiceConfig的Broadcaster注册listener。
c.bcaster.Add(config.ListenerFunc(func(instanceinterface{}){
glog.V(3).Infof("Callinghandler.OnServiceUpdate()")
handler.OnServiceUpdate(instance.([]api.Service))
}))
}
上面分析proxyconfig.NewServiceConfig时可知,当从updateschannel中取到值的时候,最终会调用对应的ListenerFunc(instance)进行处理,在这里,也就是调用:
glog.V(3).Infof("Callinghandler.OnServiceUpdate()")
handler.OnServiceUpdate(instance.([]api.Service))
}
即调用到handler.OnServiceUpdate。
每种proxymode对应的proxier都有对应的handler.OnServiceUpdate接口实现,我们以iptablesmode为例,看看handler.OnServiceUpdate的实现:
pkg/proxy/iptables/proxier.go:
428
func(proxier*Proxier)OnServiceUpdate(allServices[]api.Service){
...
proxier.syncProxyRules()
proxier.deleteServiceConnections(staleUDPServices.List())
}
因此,最终关键的逻辑都转向了proxier.syncProxyRules(),从我们上面给出的内部模块交互图