kubeproxy源码分析.docx

上传人:b****2 文档编号:298604 上传时间:2023-04-28 格式:DOCX 页数:40 大小:72.76KB
下载 相关 举报
kubeproxy源码分析.docx_第1页
第1页 / 共40页
kubeproxy源码分析.docx_第2页
第2页 / 共40页
kubeproxy源码分析.docx_第3页
第3页 / 共40页
kubeproxy源码分析.docx_第4页
第4页 / 共40页
kubeproxy源码分析.docx_第5页
第5页 / 共40页
kubeproxy源码分析.docx_第6页
第6页 / 共40页
kubeproxy源码分析.docx_第7页
第7页 / 共40页
kubeproxy源码分析.docx_第8页
第8页 / 共40页
kubeproxy源码分析.docx_第9页
第9页 / 共40页
kubeproxy源码分析.docx_第10页
第10页 / 共40页
kubeproxy源码分析.docx_第11页
第11页 / 共40页
kubeproxy源码分析.docx_第12页
第12页 / 共40页
kubeproxy源码分析.docx_第13页
第13页 / 共40页
kubeproxy源码分析.docx_第14页
第14页 / 共40页
kubeproxy源码分析.docx_第15页
第15页 / 共40页
kubeproxy源码分析.docx_第16页
第16页 / 共40页
kubeproxy源码分析.docx_第17页
第17页 / 共40页
kubeproxy源码分析.docx_第18页
第18页 / 共40页
kubeproxy源码分析.docx_第19页
第19页 / 共40页
kubeproxy源码分析.docx_第20页
第20页 / 共40页
亲,该文档总共40页,到这儿已超出免费预览范围,如果喜欢就下载吧!
下载资源
资源描述

kubeproxy源码分析.docx

《kubeproxy源码分析.docx》由会员分享,可在线阅读,更多相关《kubeproxy源码分析.docx(40页珍藏版)》请在冰点文库上搜索。

kubeproxy源码分析.docx

kubeproxy源码分析

kube-proxy源码分析

源码目录结构分析

cmd/kube-proxy//负责kube-proxy的创建,启动的入口

.

├──app

│├──conntrack.go//linuxkernel的nf_conntrack-sysctl的interface定义,更多关于conntracker的定义请看https:

//www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt

│├──options

││└──options.go//kube-proxy的参数定义ProxyServerConfig及相关方法

│├──server.go//ProxyServer结构定义及其创建(NewProxyServerDefault)和运行(Run)的方法。

│└──server_test.go

└──proxy.go//kube-proxy的main方法

pkg/proxy

.

├──OWNERS

├──config

│├──api.go//给proxy配置Service和Endpoint的Reflectors和Cache.Store

│├──api_test.go

│├──config.go//定义ServiceUpdate,EndpointUpdate结构体以及ServiceConfigHandler,EndpointConfigHandler来处理Service和Endpoint的Update

│├──config_test.go

│└──doc.go

├──doc.go

├──healthcheck//负责servicelistener和endpoint的healthcheck,add/delete请求。

│├──api.go

│├──doc.go

│├──healthcheck.go

│├──healthcheck_test.go

│├──http.go

│├──listener.go

│└──worker.go

├──iptables//proxymode为iptables的实现

│├──proxier.go

│└──proxier_test.go

├──types.go

├──userspace//proxymode为userspace的实现

│├──loadbalancer.go

│├──port_allocator.go

│├──port_allocator_test.go

│├──proxier.go

│├──proxier_test.go

│├──proxysocket.go

│├──rlimit.go

│├──rlimit_windows.go

│├──roundrobin.go

│├──roundrobin_test.go

│└──udp_server.go

└──winuserspace//windowsOS时,proxymode为userspace的实现

├──loadbalancer.go

├──port_allocator.go

├──port_allocator_test.go

├──proxier.go

├──proxier_test.go

├──proxysocket.go

├──roundrobin.go

├──roundrobin_test.go

└──udp_server.go

源码分析

main

kube-proxy的main入口在:

cmd/kube-proxy/proxy.Go:

39

funcmain(){

//创建kube-proxy的默认config对象

config:

=options.NewProxyConfig()

//用kube-proxy命令行的参数替换默认参数

config.AddFlags(pflag.CommandLine)

flag.InitFlags()

logs.InitLogs()

deferlogs.FlushLogs()

verflag.PrintAndExitIfRequested()

//根据config创建ProxyServer

s,err:

=app.NewProxyServerDefault(config)

iferr!

=nil{

fmt.Fprintf(os.Stderr,"%v\n",err)

os.Exit

(1)

}

//执行Run方法让kube-proxy开始干活了

iferr=s.Run();err!

=nil{

fmt.Fprintf(os.Stderr,"%v\n",err)

os.Exit

(1)

}

}

main方法中,我们重点关注app.NewProxyServerDefault(config)创建ProxyServer和Run方法。

创建ProxyServer

NewProxyServerDefault负责根据提供的config参数创建一个新的ProxyServer对象,其代码比较长,逻辑相对复杂,下面会挑重点说一下。

cmd/kube-proxy/app/server.go:

131

funcNewProxyServerDefault(config*options.ProxyServerConfig)(*ProxyServer,error){

...

//Createaiptablesutils.

execer:

=exec.New()

ifruntime.GOOS=="windows"{

netshInterface=utilnetsh.New(execer)

}else{

dbus=utildbus.New()

iptInterface=utiliptables.New(execer,dbus,protocol)

}

...

//设置OOM_SCORE_ADJ

varoomAdjuster*oom.OOMAdjuster

ifconfig.OOMScoreAdj!

=nil{

oomAdjuster=oom.NewOOMAdjuster()

iferr:

=oomAdjuster.ApplyOOMScoreAdj(0,int(*config.OOMScoreAdj));err!

=nil{

glog.V

(2).Info(err)

}

}

...

//CreateaKubeClient

...

//创建eventBroadcaster和eventrecorder

hostname:

=nodeutil.GetHostname(config.HostnameOverride)

eventBroadcaster:

=record.NewBroadcaster()

recorder:

=eventBroadcaster.NewRecorder(v1.EventSource{Component:

"kube-proxy",Host:

hostname})

//定义proxier和endpointsHandler,分别用于处理services和endpoints的updateevent。

varproxierproxy.ProxyProvider

varendpointsHandlerproxyconfig.EndpointsConfigHandler

//从config中获取proxymode

proxyMode:

=getProxyMode(string(config.Mode),client.Core().Nodes(),hostname,iptInterface,iptables.LinuxKernelCompatTester{})

//proxymode为iptables场景

ifproxyMode==proxyModeIPTables{

glog.V(0).Info("UsingiptablesProxier.")

ifconfig.IPTablesMasqueradeBit==nil{

//IPTablesMasqueradeBitmustbespecifiedordefaulted.

returnnil,fmt.Errorf("UnabletoreadIPTablesMasqueradeBitfromconfig")

}

//调用pkg/proxy/iptables/proxier.go:

222中的iptables.NewProxier来创建proxier,赋值给前面定义的proxier和endpointsHandler,表示由该proxier同时负责service和endpoint的event处理。

proxierIPTables,err:

=iptables.NewProxier(iptInterface,utilsysctl.New(),execer,config.IPTablesSyncPeriod.Duration,config.IPTablesMinSyncPeriod.Duration,config.MasqueradeAll,int(*config.IPTablesMasqueradeBit),config.ClusterCIDR,hostname,getNodeIP(client,hostname))

iferr!

=nil{

glog.Fatalf("Unabletocreateproxier:

%v",err)

}

proxier=proxierIPTables

endpointsHandler=proxierIPTables

//Noturningback.RemoveartifactsthatmightstillexistfromtheuserspaceProxier.

glog.V(0).Info("Tearingdownuserspacerules.")

userspace.CleanupLeftovers(iptInterface)

}

//proxymode为userspace场景

else{

glog.V(0).Info("UsinguserspaceProxier.")

//Thisisaproxy.LoadBalancerwhichNewProxierneedsbuthasmethodswedon'tneedfor

//ourconfig.EndpointsConfigHandler.

loadBalancer:

=userspace.NewLoadBalancerRR()

//setEndpointsConfigHandlertoourloadBalancer

endpointsHandler=loadBalancer

varproxierUserspaceproxy.ProxyProvider

//windowsOS场景下,调用pkg/proxy/winuserspace/proxier.go:

146的winuserspace.NewProxier来创建proxier。

ifruntime.GOOS=="windows"{

proxierUserspace,err=winuserspace.NewProxier(

loadBalancer,

net.ParseIP(config.BindAddress),

netshInterface,

*utilnet.ParsePortRangeOrDie(config.PortRange),

//TODO@piresreplacebelowwithdefaultvalues,ifapplicable

config.IPTablesSyncPeriod.Duration,

config.UDPIdleTimeout.Duration,

}

//linuxOS场景下,调用pkg/proxy/userspace/proxier.go:

143的userspace.NewProxier来创建proxier。

else{

proxierUserspace,err=userspace.NewProxier(

loadBalancer,

net.ParseIP(config.BindAddress),

iptInterface,

*utilnet.ParsePortRangeOrDie(config.PortRange),

config.IPTablesSyncPeriod.Duration,

config.IPTablesMinSyncPeriod.Duration,

config.UDPIdleTimeout.Duration,

}

iferr!

=nil{

glog.Fatalf("Unabletocreateproxier:

%v",err)

}

proxier=proxierUserspace

//Removeartifactsfromthepure-iptablesProxier,ifnotonWindows.

ifruntime.GOOS!

="windows"{

glog.V(0).Info("Tearingdownpure-iptablesproxyrules.")

iptables.CleanupLeftovers(iptInterface)

}

}

//Addiptablesreloadfunction,ifnotonWindows.

ifruntime.GOOS!

="windows"{

iptInterface.AddReloadFunc(proxier.Sync)

}

//Createconfigs(i.e.WatchesforServicesandEndpoints)

//创建serviceConfig负责service的watchforUpdates

serviceConfig:

=proxyconfig.NewServiceConfig()

//给serviceConfig注册proxier,既添加对应的listener用来处理serviceupdate时逻辑。

serviceConfig.RegisterHandler(proxier)

//创建endpointsConfig负责endpoint的watchforUpdates

endpointsConfig:

=proxyconfig.NewEndpointsConfig()

//给endpointsConfig注册endpointsHandler,既添加对应的listener用来处理endpointupdate时的逻辑。

endpointsConfig.RegisterHandler(endpointsHandler)

//NewSourceAPIcreatesconfigsourcethatwatchesforchangestotheservicesandendpoints.

//NewSourceAPI通过ListWatchapiserver的Service和endpoint,并周期性的维护serviceStore和endpointStore的更新

proxyconfig.NewSourceAPI(

client.Core().RESTClient(),

config.ConfigSyncPeriod,

serviceConfig.Channel("api"),//ServiceUpdateChannel

endpointsConfig.Channel("api"),//endpointupdatechannel

...

//把前面创建的对象作为参数,构造出ProxyServer对象。

returnNewProxyServer(client,config,iptInterface,proxier,eventBroadcaster,recorder,conntracker,proxyMode)

}

NewProxyServerDefault中的核心逻辑我都已经在上述代码中添加了注释,其中有几个地方需要我们再深入跟进去看看:

proxyconfig.NewServiceConfig,proxyconfig.NewEndpointsConfig,serviceConfig.RegisterHandler,endpointsConfig.RegisterHandler,proxyconfig.NewSourceAPI。

proxyconfig.NewServiceConfig

我们对ServiceConfig的代码分析一遍,EndpointsConfig的代码则类似。

pkg/proxy/config/config.go:

192

funcNewServiceConfig()*ServiceConfig{

//创建updateschannel

updates:

=make(chanstruct{},1)

//构建serviceStore对象

store:

=&serviceStore{updates:

updates,services:

make(map[string]map[types.NamespacedName]api.Service)}

mux:

=config.NewMux(store)

//新建Broadcaster,在后续的serviceConfig.RegisterHandler会注册该Broadcaster的listener。

bcaster:

=config.NewBroadcaster()

//启动协程,马上开始watchupdateschannel

gowatchForUpdates(bcaster,store,updates)

return&ServiceConfig{mux,bcaster,store}

}

下面我们再跟进watchForUpdates去看看。

pkg/proxy/config/config.go:

292

funcwatchForUpdates(bcaster*config.Broadcaster,accessorconfig.Accessor,updates<-chanstruct{}){

fortrue{

<-updates

bcaster.Notify(accessor.MergedState())

}

}

watchForUpdates就是一直在watchupdateschannel,如果有数据,则立刻由该BroadcasterNotify到注册的listeners。

Notify的代码如下,可见,它负责将数据通知给所有的listener,并调用各个listener的OnUpdate方法。

pkg/util/config/config.go:

133

//Notifynotifiesalllisteners.

func(b*Broadcaster)Notify(instanceinterface{}){

b.listenerLock.RLock()

listeners:

=b.listeners

b.listenerLock.RUnlock()

for_,listener:

=rangelisteners{

listener.OnUpdate(instance)

}

}

func(fListenerFunc)OnUpdate(instanceinterface{}){

f(instance)

}

serviceConfig.RegisterHandler

上面分析的proxyconfig.NewServiceConfig负责创建ServiceConfig,开始watchupdateschannel了,当从channel中取到值的时候,Broadcaster就会通知listener进行处理。

serviceConfig.RegisterHandler正是负责给Broadcaster注册listener的,其代码如下。

pkg/proxy/config/config.go:

205

func(c*ServiceConfig)RegisterHandler(handlerServiceConfigHandler){

//给ServiceConfig的Broadcaster注册listener。

c.bcaster.Add(config.ListenerFunc(func(instanceinterface{}){

glog.V(3).Infof("Callinghandler.OnServiceUpdate()")

handler.OnServiceUpdate(instance.([]api.Service))

}))

}

上面分析proxyconfig.NewServiceConfig时可知,当从updateschannel中取到值的时候,最终会调用对应的ListenerFunc(instance)进行处理,在这里,也就是调用:

glog.V(3).Infof("Callinghandler.OnServiceUpdate()")

handler.OnServiceUpdate(instance.([]api.Service))

}

即调用到handler.OnServiceUpdate。

每种proxymode对应的proxier都有对应的handler.OnServiceUpdate接口实现,我们以iptablesmode为例,看看handler.OnServiceUpdate的实现:

pkg/proxy/iptables/proxier.go:

428

func(proxier*Proxier)OnServiceUpdate(allServices[]api.Service){

...

proxier.syncProxyRules()

proxier.deleteServiceConnections(staleUDPServices.List())

}

因此,最终关键的逻辑都转向了proxier.syncProxyRules(),从我们上面给出的内部模块交互图

展开阅读全文
相关资源
猜你喜欢
相关搜索
资源标签

当前位置:首页 > 法律文书 > 调解书

copyright@ 2008-2023 冰点文库 网站版权所有

经营许可证编号:鄂ICP备19020893号-2