默认同步时间
--min-resync-period duration The resync period in reflectors will be random between MinResyncPeriod and 2*MinResyncPeriod. (default 12h0m0s) https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/apis/config/v1alpha1/defaults.go#L120
判断 pod状态
// pkg/contoller/statefulset/stateful_set_utils.go
// isRunningAndReady returns true if pod is in the PodRunning Phase, if it has a condition of PodReady.
func isRunningAndReady(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod)
}
// isCreated returns true if pod has been created and is maintained by the API server
func isCreated(pod *v1.Pod) bool {
return pod.Status.Phase != ""
}
// isFailed returns true if pod has a Phase of PodFailed
func isFailed(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodFailed
}
// isTerminating returns true if pod's DeletionTimestamp has been set
func isTerminating(pod *v1.Pod) bool {
return pod.DeletionTimestamp != nil
}
// isHealthy returns true if pod is running and ready and has not been terminated
func isHealthy(pod *v1.Pod) bool {
return isRunningAndReady(pod) && !isTerminating(pod)
}
podutil 在pkg/api/v1/pod/util.go
// IsPodAvailable returns true if a pod is available; false otherwise.
// Precondition for an available pod is that it must be ready. On top
// of that, there are two cases when a pod can be considered available:
// 1. minReadySeconds == 0, or
// 2. LastTransitionTime (is set) + minReadySeconds < current time
func IsPodAvailable(pod *v1.Pod, minReadySeconds int32, now v1.Time) bool {}
// IsPodReady returns true if a pod is ready; false otherwise.
func IsPodReady(pod *v1.Pod) bool {
return IsPodReadyConditionTrue(pod.Status)
}判断 job 状态
// pkg/controller/cronjob/utils.go
func getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobConditionType) {
for _, c := range j.Status.Conditions {
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == v1.ConditionTrue {
return true, c.Type
}
}
return false, ""
}
// IsJobFinished returns whether or not a job has completed successfully or failed.
func IsJobFinished(j *batchv1.Job) bool {
isFinished, _ := getFinishedStatus(j)
return isFinished
}判断 node 状态
// k8s.io/kubernetes/pkg/api/v1/node/util.go
func IsNodeReady(node *v1.Node) bool {
for _, c := range node.Status.Conditions {
if c.Type == v1.NodeReady {
return c.Status == v1.ConditionTrue
}
}
return false
}kubelet 是如何找到 docker secrect的
// 1. if len(pod.Spec.ImagePullSecrets) == 0 注入 serviceaccount 的默认 secret
// plugin/pkg/admission/serviceaccount/admission.go
func (s *serviceAccount) Admit(a admission.Attributes) (err error)
// 2. 从 ImagePullSecrets 指向到 secret中 中提取 Secret
// pkg/kubelet/kubelet_pods.go
func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) []v1.Secret
// 3. pkg/credentialprovider/secrets/secrets.go
func MakeDockerKeyring(...){
...
// secret 里面的 docker config 会提取到 basicKeyring 中
return credentialprovider.UnionDockerKeyring{basicKeyring, defaultKeyring}
}
// 4. RegisterCredentialProvider 自动注入支持的 CredentialProvider
// 比如 .dockercfg 读文件, azure, aws
// pkg/credentialprovider/provider.go
func init() {
RegisterCredentialProvider(".dockercfg",
&CachingDockerConfigProvider{
Provider: &defaultDockerConfigProvider{},
Lifetime: 5 * time.Minute,
})
}
// 5. 使用 keyring 找 auth 信息
// pkg/kubelet/kuberuntime/kuberuntime_image.go
keyring.Lookup(repoToPull)
// 6. 根据镜像地址找 LazyAuthConfiguration BasicDockerKeyring Lookup
// LazyAuthConfiguration 里面有 provider 和 username,password (provider 注入)
// pkg/credentialprovider/keyring.go
func (dk *BasicDockerKeyring) Lookup(image string) ([]LazyAuthConfiguration, bool)
// 7. 使用 credentialprovider + LazyAuthConfiguration => Auth 信息
// pkg/kubelet/kuberuntime/kuberuntime_image.go
authConfig := credentialprovider.LazyProvide(currentCreds)
// 8. 使用 auth 拉镜像
// pkg/kubelet/kuberuntime/kuberuntime_image.go
m.imageService.PullImage(imgSpec, auth)给 plugin 注入信息
定义了一种 plugin interface,怎么支持给 plugin 设置必要的参数(各种 plugin 需要的参数可能不同)?
- 可以定义一系列 wants interface, 即 set 函数,有 set 函数的,表示需要这种参数,给他设置
// pkg/kubeapiserver/admission/initializer.go
// WantsInternalKubeClientSet defines a function which sets ClientSet for admission plugins that need it
type WantsInternalKubeClientSet interface {
SetInternalKubeClientSet(internalclientset.Interface)
admission.InitializationValidator
}
// WantsInternalKubeInformerFactory defines a function which sets InformerFactory for admission plugins that need it
type WantsInternalKubeInformerFactory interface {
SetInternalKubeInformerFactory(informers.SharedInformerFactory)
admission.InitializationValidator
}
// WantsCloudConfig defines a function which sets CloudConfig for admission plugins that need it.
type WantsCloudConfig interface {
SetCloudConfig([]byte)
}
// WantsRESTMapper defines a function which sets RESTMapper for admission plugins that need it.
type WantsRESTMapper interface {
SetRESTMapper( .RESTMapper)
}
// WantsQuotaConfiguration defines a function which sets quota configuration for admission plugins that need it.
type WantsQuotaConfiguration interface {
SetQuotaConfiguration(quota.Configuration)
admission.InitializationValidator
}
// Initialize checks the initialization interfaces implemented by each plugin
// and provide the appropriate initialization data
func (i *PluginInitializer) Initialize(plugin admission.Interface) {
if wants, ok := plugin.(WantsInternalKubeClientSet); ok {
wants.SetInternalKubeClientSet(i.internalClient)
}
if wants, ok := plugin.(WantsInternalKubeInformerFactory); ok {
wants.SetInternalKubeInformerFactory(i.informers)
}
if wants, ok := plugin.(WantsCloudConfig); ok {
wants.SetCloudConfig(i.cloudConfig)
}
if wants, ok := plugin.(WantsRESTMapper); ok {
wants.SetRESTMapper(i.restMapper)
}
if wants, ok := plugin.(WantsQuotaConfiguration); ok {
wants.SetQuotaConfiguration(i.quotaConfiguration)
}
}cronjob contoller 效率很低
还在使用定期 relist 方法的 contoller, 很多 job 会使 apiserver 压力很大
// pkg/controller/cronjob/cronjob_controller.go
go wait.Until(jm.syncAll, 10*time.Second, stopCh)
func (jm *CronJobController) syncAll() {
// ...
jl, err := jm.kubeClient.BatchV1().Jobs( v1.NamespaceAll).List( v1.ListOptions{})
// ...
// 里面有大量使用 KubeClient 去apiserver 拿数据,而一般的 contoller 实现
// 获取数据会尽可能的使用 Lister 等去获取数据,本质是从 Indexer (cache) 中获取vistor 模式
vistor 模式 在 kubernetes 里面很常用, 给外部函数 遍历内部状态的入口, 下面是几个例子
// 例子1: each
// staging/src/k8s.io/apimachinery/pkg/apis/ /v1/unstructured/unstructured.go
func (obj *Unstructured) EachListItem(fn func(runtime. ) error) error {
field, ok := obj. ["items"]
if !ok {
return errors.New("content is not a list")
}
// ... 略
for _, item := range items {
child, ok := item.(map[string]interface{})
// ... 略
if err := fn(&Unstructured{ : child}); err != nil {
// 调用外部函数,外部函数如果 err,则 break each 流程
return err
}
}
return nil
}
// 例子2: visitor
// pkg/api/pod/util.go 允许外部访问 Pod 内部的所有引用的 configMapName
func VisitPodConfigmapNames(pod *api.Pod, visitor Visitor) bool {
VisitContainers(&pod.Spec, func(c *api.Container) bool {
return visitContainerConfigmapNames(c, visitor)
})
var source *api.VolumeSource
for i := range pod.Spec.Volumes {
source = &pod.Spec.Volumes[i].VolumeSource
switch {
case source.Projected != nil:
// .. 略
if !visitor(source.Projected.Sources[j].ConfigMap.Name) {}
//.. 略
case source.ConfigMap != nil:
if !visitor(source.ConfigMap.Name) {}
}
}
return true
}context 使用
kubernetes 里面 context 用得很多,在定义 interface 给 plugin, 或者其他地方实现的时候第一个参数经常是 context.
// 一个关联知识点
// 1. 对 nil channel中读写数据会一直被block。
// 2. close的channel 读立即返回零值,写会panic,无论读写都不会阻塞。
// 使用说明: https://blog.golang.org/context
// Incoming requests to a server should create a Context, and outgoing
// calls to servers should accept a Context. The chain of function
// calls between them must propagate the Context, optionally replacing
// it with a derived Context created using WithCancel, WithDeadline,
// WithTimeout, or WithValue. When a Context is canceled, all
// Contexts derived from it are also canceled.
// A Context carries a deadline, cancelation signal, and request-scoped values
// across API boundaries. Its methods are safe for simultaneous use by multiple
// goroutines.
type Context interface {
// Done returns a channel that is closed when this Context is canceled
// or times out.
Done() <-chan struct{}
// Err indicates why this context was canceled, after the Done channel
// is closed.
Err() error
// Deadline returns the time when this Context will be canceled, if any.
Deadline() (deadline time.Time, ok bool)
// Value returns the value associated with key or nil if none.
Value(key interface{}) interface{}
}
// 常用的使用模式 break if cancel
func Stream(ctx context.Context, out chan<- Value) error {
for {
v, err := DoSomething(ctx)
if err != nil { return err }
select {
case <-ctx.Done():
return ctx.Err()
case out <- v:
}
}
}
// 简化的 google search 例子
func handleSearch(w http.ResponseWriter, req *http.Request) {
timeout, err := time.ParseDuration(req.FormValue("timeout"))
if err == nil {
ctx, cancel = context.WithTimeout(context.Background(), timeout)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
defer cancel() // Cancel ctx as soon as handleSearch returns.
results, err := search(ctx, query)
// ...略
}
// Search sends query to Google search and returns the results.
func Search(ctx context.Context, query string) (Results, error) {
// ...略
err = httpDo(ctx, req, func(resp *http.Response, err error) error {
// 略
results = xxx
return nil
})
// httpDo waits for the closure we provided to return, so it's safe to read results here.
return results, err
}
// httpDo issues the HTTP request and calls f with the response. If ctx.Done is
// closed while the request or f is running, httpDo cancels the request, waits
// for f to exit, and returns ctx.Err. Otherwise, httpDo returns f's error.
func httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error {
// Run the HTTP request in a goroutine and pass the response to f.
c := make(chan error, 1)
req = req.WithContext(ctx)
go func() { c <- f(http.DefaultClient.Do(req)) }()
select {
case <-ctx.Done():// 被取消了 (可能是 timeout 触发的)
<-c // Wait for f to return.
return ctx.Err()
case err := <-c:
return err
}
}
// 另一个例子 这是一个收到一个信息 stop 两个信号退出的函数
func SetupSignalHandler(parent context.Context) context.Context {
close(onlyOneSignalHandler) // panics when called twice
ctx, cancel := context.WithCancel(parent)
c := make(chan os.Signal, 2)
signal.Notify(c, shutdownSignals...)
go func() {
<-c
cancel() // 收到信号,取消 ctx, 后面使用这个 ctx 的任务都会 done
<-c
os.Exit(1) // second signal. Exit directly.
}()
return ctx
}
// 另一个例子 来自kubernetes scheduler
func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
// ...略
// Prepare a reusable run function.
run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
}
ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
defer cancel()
go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()
// If leader election is enabled, run via LeaderElector until done and exit.
if c.LeaderElection != nil {
// ...略
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// Leader election is disabled, so run inline until done.
run(ctx)
return fmt.Errorf("finished without leader elect")
}
// 再来最后一个例子,如何实现这样一个函数, retry f,直到 f 成功或者 timeout
// 对于这个例子 更通用的实现见 k8s.io/apimachinery/pkg/util/wait/wait.go
// 不过 wait 中的 timeout 并不准确, 它在 重复的时候才会检查 timeout
func Retry(fn func(ctx context.Context) error, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
c := make(chan error, 1)
for {
go func() { c <- fn(ctx) }()
select {
case <-ctx.Done():
return ctx.Err() // timeout error
case err := <-c:
if err == nil {
return nil
}
}
}
}如何让一个 pod 运行到一个 node 上
由于 node 可能有 taint,需要 设置 Toleration, 可以参考 deamonsetcontroller 加了哪些
// pkg/controller/daemon/util/daemonset_util.go
func AddOrUpdateDaemonPodTolerations(spec *v1.PodSpec) {
TaintNodeNotReady
TaintNodeUnreachable
TaintNodeDiskPressure
TaintNodeMemoryPressure
TaintNodePIDPressure
TaintNodeUnschedulable
TaintNodeNetworkUnavailable
}获取某个资源的 controller
// staging/src/k8s.io/apimachinery/pkg/apis/ /v1/controller_ref.go func IsControlledBy(obj , owner ) bool func GetControllerOf(controllee ) *OwnerReference func NewControllerRef(owner , gvk schema.GroupVersionKind) *OwnerReference
List 相关的工具
// staging/src/k8s.io/apimachinery/pkg/api/ /help.go func IsListType(obj runtime. ) bool func EachListItem(obj runtime. , fn func(runtime. ) error) error func ExtractList(obj runtime. ) ([]runtime. , error) func SetList(list runtime. , s []runtime. ) error
Accessor
Accessor 是用来获取设置 kubernetes api 中的部分信息的工具
// staging/src/k8s.io/apimachinery/pkg/api/ / .go
func CommonAccessor(obj interface{}) ( v1.Common, error)
func ListAccessor(obj interface{}) (List, error)
func Accessor(obj interface{}) ( v1. , error)
func TypeAccessor(obj interface{}) (Type, error)
func (resourceAccessor) SetKind(obj runtime. , kind string) error //....
func (a genericAccessor) SetNamespace(namespace string) //....collisionCount 是什么
很多 kubernetes 资源都有一个 collisionCount 字段,主要是为了预防 hash 冲突,不同的 spec template 一般会计算出一个 hash, 作为 name 的一部分,比如 deployment 使用这个作为生成的 replica 的 name 的一部分 (1.12 之前不会直接加,会encode一下,1.12之后就是直接加的了) (也会加到 label/selector 里面 key 为 pod-template-hash), 如果只用 spec template,那么就有可能 name 冲突,使用 collisionCount 就能避免这种冲突的出现.
garbagecollector 是怎么实现的
如何删除所有/一种资源
// get all deletable resources
resources, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources()
deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources)
groupVersionResources, err := discovery.GroupVersionResources(deletableResources)
// each gvr and delete collection
for gvr := range groupVersionResources {
dataClient.Resource(gvr).Namespace(namespace).DeleteCollection(opts, v1.ListOptions{})
}pod hpa 是怎么工作的
基本计算公式
# 比如 AverageValue 的情况
utilization=0
for _, val := range metrics {
utilization = utilization + val
}
replicaCount = statusReplicas
usageRatio := float64(utilization) / (float64(targetUtilizationPerPod) * float64(replicaCount))
if math.Abs(1.0-usageRatio) > c.tolerance {
// update number of replicas if the change is large enough
replicaCount = int32(math.Ceil(float64(utilization) / float64(targetUtilizationPerPod)))
}GVK 转成 GR
GV + Kind -> GK -> RESTMapping() -> GR
schema.ParseGroupVersion
api .RESTMapper.RESTMappings
mapping.Resource.GroupResource()
type RESTMapping struct {
// Resource is the GroupVersionResource (location) for this endpoint
Resource schema.GroupVersionResource
// GroupVersionKind is the GroupVersionKind (data format) to submit to this endpoint
GroupVersionKind schema.GroupVersionKind
// Scope contains the information needed to deal with REST Resources that are in a resource hierarchy
Scope RESTScope
} 继续阅读与本文标签相同的文章
上一篇 :
详解Java实现单例的五种方式
下一篇 :
Java中的字节流文件读取教程(一)
-
有看头了!Showtime正将Uber故事拍成一部电视剧
2026-05-15栏目: 教程
-
卡死?你的BI速度慢吗?试试这个工具
2026-05-15栏目: 教程
-
印度电信运营商宣布使用爱立信设备建设5G核心网
2026-05-15栏目: 教程
-
微信这样发语音才能让人更喜欢听,看了这些,网友:还真的是!
2026-05-15栏目: 教程
-
任正非:6G或将10年内问世
2026-05-15栏目: 教程
