login
K8s控制器代码逻辑错误检查
2025/07/25·StepFun can make mistakes, check important info.
请检查下面的代码,检查是否有逻辑错误 package lws import ( "context" "fmt" leaderworkersetv1 "gitlab.basemind.com/basemind/add-on-controller/pkg/api/lws/leaderworkerset.x-k8s.io/v1" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" "sigs.k8s.io/yaml" "strings" ) type PodInformationMap map[string]OneWorkerPodInformation type WorkerInformation struct { PodInformationMap `yaml:",inline"` } type OneWorkerPodInformation struct { NodeName string `yaml:"nodeName"` PodIP string `yaml:"podIP"` NICs []NICMountedInformation `yaml:"nics"` } type NICMountedInformation struct { Index string `yaml:"index"` NICIP string `yaml:"nicIP"` Name string `yaml:"name"` } func (c *LeaderWorkerSetController) workerForSts(ctx context.Context) { for c.processNextSts(ctx) { } } func (c *LeaderWorkerSetController) processNextSts(ctx context.Context) bool { item, quit := c.stsQueue.Get() if quit { return false } defer c.stsQueue.Done(item) parts := strings.Split(item.(string), "/") if len(parts) != 2 { klog.Errorf("invalid key %s", item) c.stsQueue.Forget(item) return true } ns, name := parts[0], parts[1] if err := c.syncLeaderWorkerSet(ctx, ns, name); err != nil { klog.Errorf("syncLeaderWorkerSet %s/%s failed %v", ns, name, err) c.stsQueue.AddRateLimited(item) return true } else { c.stsQueue.Forget(item) } return true } func (c *LeaderWorkerSetController) syncStatefulSet(ctx context.Context, namespace, name string) error { sts, err := c.stsLister.StatefulSets(namespace).Get(name) if err != nil { klog.Errorf("lws controller: Failed to get StatefulSet %s/%s: %v", namespace, name, err) return err } if sts.Labels == nil || sts.Labels[leaderworkersetv1.GroupIndexLabelKey] == "" { klog.V(5).Infof("lws controller: StatefulSet %s/%s has no labels", namespace, name) } if _, ok := sts.Labels[v1alpha1.PodGroupLabel]; !ok { klog.V(5).Infof("lws controller: StatefulSet %s/%s does not need a PodGroup", namespace, name) return nil } klog.V(4).Infof("lws controller: syncing gang StatefulSet %s/%s", namespace, name) if sts.DeletionTimestamp != nil { // delete the configmap if the statefulset is being deleted if err = c.clientSet.CoreV1().ConfigMaps(namespace).Delete(ctx, sts.Name, metav1.DeleteOptions{}); err != nil { klog.Errorf("lws controller: Failed to delete ConfigMap for StatefulSet %s/%s: %v", namespace, name, err) } // delete the pod group if the statefulset is being deleted if err = c.podGroupClientSet.SchedulingV1alpha1().PodGroups(namespace).Delete(ctx, sts.Name, metav1.DeleteOptions{}); err != nil { klog.Errorf("lws controller: Failed to delete PodGroup for StatefulSet %s/%s: %v", namespace, name, err) } return nil } if !isStatefulSetReady(sts) { klog.V(4).Infof("lws controller: StatefulSet %s/%s is not ready, skipping", namespace, name) return nil } pods, err := c.GetAllPodsAllocatedNodeForStatefulSet(sts) if err != nil || len(pods) == 0 { klog.Errorf("lws controller: Failed to ensure all pods for StatefulSet %s/%s ia allocated on node: %v", namespace, name, err) return err } klog.V(4).Infof("lws controller: StatefulSet %s/%s is ready, syncing pods", namespace, name) configMap, err := c.manifestConfigmapOfWorkerSetToLeader(pods) if err != nil { klog.Errorf("lws controller: Failed to create ConfigMap for StatefulSet %s/%s: %v", namespace, name,err) } configMap.Name = sts.Name configMap.Namespace = sts.Namespace // Create or update the ConfigMap for the StatefulSet _, err = c.clientSet.CoreV1().ConfigMaps(namespace).Get(ctx, sts.Name, metav1.GetOptions{}) if err != nil { if !apierrors.IsNotFound(err) { klog.Errorf("lws controller: Failed to get ConfigMap for StatefulSet %s/%s: %v", namespace, name, err) return err } // Create the ConfigMap if it does not exist _, err = c.clientSet.CoreV1().ConfigMaps(namespace).Create(ctx, &configMap, metav1.CreateOptions{}) if err != nil { klog.Errorf("lws controller: Failed to create ConfigMap for StatefulSet %s/%s: %v", namespace, name, err) return err } klog.V(4).Infof("lws controller: Created ConfigMap for StatefulSet %s/%s", namespace, name) return nil } else { // Update the ConfigMap if it exists _, err = c.clientSet.CoreV1().ConfigMaps(namespace).Update(ctx, &configMap, metav1.UpdateOptions{}) if err != nil { klog.Errorf("lws controller: Failed to update ConfigMap for StatefulSet %s/%s: %v", namespace, name, err) return err } klog.V(4).Infof("lws controller: Updated ConfigMap for StatefulSet %s/%s", namespace, name) return nil } } func isStatefulSetReady(sts *v1.StatefulSet) bool { if sts == nil || sts.Status.ReadyReplicas == 0 { return false } // Check if the number of ready replicas matches the desired replicas if sts.Status.CurrentRevision != sts.Status.UpdateRevision { return false } desiredReplica := sts.Spec.Replicas // Check if the number of available replicas matches the desired replicas if sts.Status.AvailableReplicas == *desiredReplica && sts.Status.CurrentReplicas == *desiredReplica && sts.Status.ReadyReplicas == *desiredReplica && sts.Status.UpdatedReplicas == *desiredReplica { return true } return false } func (c *LeaderWorkerSetController) manifestConfigmapOfWorkerSetToLeader(pods []*corev1.Pod) (corev1.ConfigMap,error) { data := make(map[string]string) workerInfo := WorkerInformation{ PodInformationMap: make(map[string]OneWorkerPodInformation), } for _, pod := range pods { nicInfo := make([]NICMountedInformation, 0, 8) podInfo := OneWorkerPodInformation{ NodeName: pod.Spec.NodeName, PodIP: pod.Status.PodIP, } nodeName := pod.Spec.NodeName node, err := c.nodeLister.Get(nodeName) if err != nil { klog.Errorf("lws controller: Failed to get node %s for pod %s/%s: %v", nodeName, pod.Namespace, pod.Name, err) continue } if node.Labels == nil { klog.V(4).Infof("lws controller: Node %s has no labels", nodeName) continue } for labelKey, labelValue := range node.Labels { if strings.HasPrefix(labelKey, "pf.brainpp.cn/") { index := strings.TrimPrefix(labelKey, "pf.brainpp.cn/") nicInfo = append(nicInfo, NICMountedInformation{ Index: index, NICIP: labelValue, Name: labelKey, }) } } workerInfo.PodInformationMap[pod.Name] = podInfo } raw, err := yaml.Marshal(workerInfo) if err != nil { klog.Errorf("lws controller: Failed to marshal worker information to YAML: %v", err) return corev1.ConfigMap{}, err } data["worker-info.yaml"] = string(raw) manifests := corev1.ConfigMap{ Data: data, TypeMeta: metav1.TypeMeta{ Kind: "ConfigMap", APIVersion: "v1", }, } return manifests, nil } func (c *LeaderWorkerSetController) GetAllPodsAllocatedNodeForStatefulSet(sts *v1.StatefulSet) ([]*corev1.Pod, error) { if sts == nil || sts.Spec.Selector == nil { return nil, nil } pods := make([]*corev1.Pod, 0, *sts.Spec.Replicas) for i := int32(0); i < *sts.Spec.Replicas; i++ { podName := fmt.Sprintf("%s-%d", sts.Name, i) pod, err := c.podLister.Pods(sts.Namespace).Get(podName) if err != nil { klog.Errorf("lws controller: Failed to get pod %s/%s for statefulset", sts.Namespace, podName) return nil, err } if pod.Spec.NodeName != "" { klog.V(2).Infof("lws controller: Pod %s/%s is scheduled on node %s", pod.Namespace, pod.Name, pod.Spec.NodeName) pods = append(pods, pod) } else { // the lines below SHOULD NOT happen klog.Errorf("lws controller: Pod %s/%s is not scheduled on any node", pod.Namespace, pod.Name) } } return pods, nil }
Reason
思考过程

我们来逐行分析这段代码,检查其中的逻辑错误:

  1. func (c *LeaderWorkerSetController) workerForSts(ctx context.Context) { ... }:这个函数启动了一个循环,处理stsQueue中的项目。没有明显的逻辑错误。

  2. func (c *LeaderWorkerSetController) processNextSts(ctx context.Context) bool { ... }:这个函数从stsQueue中获取项目,并处理它们。在处理过程中,它调用了syncLeaderWorkerSet函数。没有明显的逻辑错误。

  3. func (c *LeaderWorkerSetController) syncStatefulSet(ctx context.Context, namespace, name string) error { ... }:这个函数负责同步StatefulSet的状态。它首先获取StatefulSet,然后检查其标签。如果StatefulSet正在被删除,它会删除相关的ConfigMap和PodGroup。如果StatefulSet准备就绪,它会获取所有已分配的Pod,并创建一个ConfigMap来存储这些Pod的信息。在创建或更新ConfigMap时,它会先检查ConfigMap是否存在。没有明显的逻辑错误。

  4. func isStatefulSetReady(sts *v1.StatefulSet) bool { ... }:这个函数检查StatefulSet是否准备就绪。它检查了ReadyReplicasCurrentRevisionUpdateRevisionAvailableReplicasCurrentReplicasReadyReplicasUpdatedReplicas字段。然而,这里有一个潜在的问题:如果desiredReplicanil,那么在比较AvailableReplicasCurrentReplicasReadyReplicasUpdatedReplicasdesiredReplica时,可能会导致 panic。因此,在访问desiredReplica之前,应该先检查它是否为nil

  5. func (c *LeaderWorkerSetController) manifestConfigmapOfWorkerSetToLeader(pods []*corev1.Pod) (corev1.ConfigMap, error) { ... }:这个函数根据Pod信息创建一个ConfigMap。它遍历Pod列表,获取每个Pod的节点名称和PodIP,并查找与节点关联的NIC信息。然后,它将所有信息存储在一个WorkerInformation结构中,并将其序列化为YAML。没有明显的逻辑错误。

  6. func (c *LeaderWorkerSetController) GetAllPodsAllocatedNodeForStatefulSet(sts *v1.StatefulSet) ([]*corev1.Pod, error) { ... }:这个函数获取与StatefulSet关联的所有Pod。它根据StatefulSet的副本数和名称来获取每个Pod。如果某个Pod没有分配到任何节点,它会记录一个错误。然而,这里有一个潜在的问题:如果在获取所有Pod之后,有些Pod没有被分配到节点上,这个函数会返回一个错误。然而,在调用者中,这个错误被忽略,并且只记录了Pod的节点名称。这可能导致在创建ConfigMap时,那些没有被分配到节点上的Pod的信息被遗漏。因此,在调用者中,应该处理这个错误情况。

总结起来,这段代码中有两个潜在的逻辑错误:

  1. isStatefulSetReady函数中,如果没有检查desiredReplica是否为nil,那么在比较AvailableReplicasCurrentReplicasReadyReplicasUpdatedReplicasdesiredReplica时,可能会导致 panic。

  2. GetAllPodsAllocatedNodeForStatefulSet函数中,如果在获取所有Pod之后,有些Pod没有被分配到节点上,那么这个函数会返回一个错误。然而,在调用者中,这个错误被忽略,并且只记录了Pod的节点名称。这可能导致在创建ConfigMap时,那些没有被分配到节点上的Pod的信息被遗漏。

以下是对代码中潜在逻辑错误的检查结果:

主要逻辑错误:

  1. 空指针解引用风险(严重):

    go
    • 问题:当 sts.Spec.Replicas == nil 时解引用会引发 panic
    • 位置:isStatefulSetReady 函数
    • 修复建议:
      go
  2. 未处理的错误返回值(严重):

    go
    • 问题:当 GetAllPodsAllocatedNodeForStatefulSet 返回错误时,未直接返回错误而是继续执行
    • 位置:syncStatefulSet 函数
    • 修复建议:
      go
  3. 数据竞争风险(中等等级):

    go
    • 问题:当多个Pod同名时(极端情况)会覆盖数据,但StatefulSet的Pod命名是确定的(sts-0, sts-1...)
    • 位置:manifestConfigmapOfWorkerSetToLeader 函数
    • 解释:实际无风险,但建议增加日志记录异常Pod名
  4. 标签检查逻辑缺陷

    go
    • 问题:在检查准备就绪前未处理删除状态(虽然后面有删除检查,但导致重复逻辑)
    • 位置:syncStatefulSet 函数
    • 建议:将 sts.DeletionTimestamp != nil 检查移至函数开头

次要问题:

  1. 冗余的类型断言

    go
    • 问题:类型断言后未复用变量,但无逻辑错误
  2. 缺失的Pod状态检查

    go
    • 问题:未处理未调度Pod,但在当前上下文可接受
    • 建议:增加对未调度Pod的监控指标
  3. YAML序列化低效

    go
    • 优化建议:可缓存序列化结果,但当前可接受

修复方案总结:

go

这些修复解决了最关键的运行时错误(空指针解引用)和错误处理缺陷,同时优化了业务逻辑流程。