请检查下面的代码,检查是否有逻辑错误
package lws
import (
"context"
"fmt"
leaderworkersetv1 "gitlab.basemind.com/basemind/add-on-controller/pkg/api/lws/leaderworkerset.x-k8s.io/v1"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
"sigs.k8s.io/yaml"
"strings"
)
type PodInformationMap map[string]OneWorkerPodInformation
type WorkerInformation struct {
PodInformationMap `yaml:",inline"`
}
type OneWorkerPodInformation struct {
NodeName string `yaml:"nodeName"`
PodIP string `yaml:"podIP"`
NICs []NICMountedInformation `yaml:"nics"`
}
type NICMountedInformation struct {
Index string `yaml:"index"`
NICIP string `yaml:"nicIP"`
Name string `yaml:"name"`
}
func (c *LeaderWorkerSetController) workerForSts(ctx context.Context) {
for c.processNextSts(ctx) {
}
}
func (c *LeaderWorkerSetController) processNextSts(ctx context.Context) bool {
item, quit := c.stsQueue.Get()
if quit {
return false
}
defer c.stsQueue.Done(item)
parts := strings.Split(item.(string), "/")
if len(parts) != 2 {
klog.Errorf("invalid key %s", item)
c.stsQueue.Forget(item)
return true
}
ns, name := parts[0], parts[1]
if err := c.syncLeaderWorkerSet(ctx, ns, name); err != nil {
klog.Errorf("syncLeaderWorkerSet %s/%s failed %v", ns, name, err)
c.stsQueue.AddRateLimited(item)
return true
} else {
c.stsQueue.Forget(item)
}
return true
}
func (c *LeaderWorkerSetController) syncStatefulSet(ctx context.Context, namespace, name string) error {
sts, err := c.stsLister.StatefulSets(namespace).Get(name)
if err != nil {
klog.Errorf("lws controller: Failed to get StatefulSet %s/%s: %v", namespace, name, err)
return err
}
if sts.Labels == nil || sts.Labels[leaderworkersetv1.GroupIndexLabelKey] == "" {
klog.V(5).Infof("lws controller: StatefulSet %s/%s has no labels", namespace, name)
}
if _, ok := sts.Labels[v1alpha1.PodGroupLabel]; !ok {
klog.V(5).Infof("lws controller: StatefulSet %s/%s does not need a PodGroup", namespace, name)
return nil
}
klog.V(4).Infof("lws controller: syncing gang StatefulSet %s/%s", namespace, name)
if sts.DeletionTimestamp != nil {
// delete the configmap if the statefulset is being deleted
if err = c.clientSet.CoreV1().ConfigMaps(namespace).Delete(ctx, sts.Name, metav1.DeleteOptions{}); err != nil {
klog.Errorf("lws controller: Failed to delete ConfigMap for StatefulSet %s/%s: %v", namespace, name, err)
}
// delete the pod group if the statefulset is being deleted
if err = c.podGroupClientSet.SchedulingV1alpha1().PodGroups(namespace).Delete(ctx, sts.Name, metav1.DeleteOptions{}); err != nil {
klog.Errorf("lws controller: Failed to delete PodGroup for StatefulSet %s/%s: %v", namespace, name, err)
}
return nil
}
if !isStatefulSetReady(sts) {
klog.V(4).Infof("lws controller: StatefulSet %s/%s is not ready, skipping", namespace, name)
return nil
}
pods, err := c.GetAllPodsAllocatedNodeForStatefulSet(sts)
if err != nil || len(pods) == 0 {
klog.Errorf("lws controller: Failed to ensure all pods for StatefulSet %s/%s ia allocated on node: %v", namespace, name, err)
return err
}
klog.V(4).Infof("lws controller: StatefulSet %s/%s is ready, syncing pods", namespace, name)
configMap, err := c.manifestConfigmapOfWorkerSetToLeader(pods)
if err != nil {
klog.Errorf("lws controller: Failed to create ConfigMap for StatefulSet %s/%s: %v", namespace, name,err)
}
configMap.Name = sts.Name
configMap.Namespace = sts.Namespace
// Create or update the ConfigMap for the StatefulSet
_, err = c.clientSet.CoreV1().ConfigMaps(namespace).Get(ctx, sts.Name, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("lws controller: Failed to get ConfigMap for StatefulSet %s/%s: %v", namespace, name, err)
return err
}
// Create the ConfigMap if it does not exist
_, err = c.clientSet.CoreV1().ConfigMaps(namespace).Create(ctx, &configMap, metav1.CreateOptions{})
if err != nil {
klog.Errorf("lws controller: Failed to create ConfigMap for StatefulSet %s/%s: %v", namespace, name, err)
return err
}
klog.V(4).Infof("lws controller: Created ConfigMap for StatefulSet %s/%s", namespace, name)
return nil
} else {
// Update the ConfigMap if it exists
_, err = c.clientSet.CoreV1().ConfigMaps(namespace).Update(ctx, &configMap, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("lws controller: Failed to update ConfigMap for StatefulSet %s/%s: %v", namespace, name, err)
return err
}
klog.V(4).Infof("lws controller: Updated ConfigMap for StatefulSet %s/%s", namespace, name)
return nil
}
}
func isStatefulSetReady(sts *v1.StatefulSet) bool {
if sts == nil || sts.Status.ReadyReplicas == 0 {
return false
}
// Check if the number of ready replicas matches the desired replicas
if sts.Status.CurrentRevision != sts.Status.UpdateRevision {
return false
}
desiredReplica := sts.Spec.Replicas
// Check if the number of available replicas matches the desired replicas
if sts.Status.AvailableReplicas == *desiredReplica &&
sts.Status.CurrentReplicas == *desiredReplica &&
sts.Status.ReadyReplicas == *desiredReplica &&
sts.Status.UpdatedReplicas == *desiredReplica {
return true
}
return false
}
func (c *LeaderWorkerSetController) manifestConfigmapOfWorkerSetToLeader(pods []*corev1.Pod) (corev1.ConfigMap,error) {
data := make(map[string]string)
workerInfo := WorkerInformation{
PodInformationMap: make(map[string]OneWorkerPodInformation),
}
for _, pod := range pods {
nicInfo := make([]NICMountedInformation, 0, 8)
podInfo := OneWorkerPodInformation{
NodeName: pod.Spec.NodeName,
PodIP: pod.Status.PodIP,
}
nodeName := pod.Spec.NodeName
node, err := c.nodeLister.Get(nodeName)
if err != nil {
klog.Errorf("lws controller: Failed to get node %s for pod %s/%s: %v", nodeName, pod.Namespace, pod.Name, err)
continue
}
if node.Labels == nil {
klog.V(4).Infof("lws controller: Node %s has no labels", nodeName)
continue
}
for labelKey, labelValue := range node.Labels {
if strings.HasPrefix(labelKey, "pf.brainpp.cn/") {
index := strings.TrimPrefix(labelKey, "pf.brainpp.cn/")
nicInfo = append(nicInfo, NICMountedInformation{
Index: index,
NICIP: labelValue,
Name: labelKey,
})
}
}
workerInfo.PodInformationMap[pod.Name] = podInfo
}
raw, err := yaml.Marshal(workerInfo)
if err != nil {
klog.Errorf("lws controller: Failed to marshal worker information to YAML: %v", err)
return corev1.ConfigMap{}, err
}
data["worker-info.yaml"] = string(raw)
manifests := corev1.ConfigMap{
Data: data,
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
}
return manifests, nil
}
func (c *LeaderWorkerSetController) GetAllPodsAllocatedNodeForStatefulSet(sts *v1.StatefulSet) ([]*corev1.Pod, error) {
if sts == nil || sts.Spec.Selector == nil {
return nil, nil
}
pods := make([]*corev1.Pod, 0, *sts.Spec.Replicas)
for i := int32(0); i < *sts.Spec.Replicas; i++ {
podName := fmt.Sprintf("%s-%d", sts.Name, i)
pod, err := c.podLister.Pods(sts.Namespace).Get(podName)
if err != nil {
klog.Errorf("lws controller: Failed to get pod %s/%s for statefulset", sts.Namespace, podName)
return nil, err
}
if pod.Spec.NodeName != "" {
klog.V(2).Infof("lws controller: Pod %s/%s is scheduled on node %s", pod.Namespace, pod.Name, pod.Spec.NodeName)
pods = append(pods, pod)
} else {
// the lines below SHOULD NOT happen
klog.Errorf("lws controller: Pod %s/%s is not scheduled on any node", pod.Namespace, pod.Name)
}
}
return pods, nil
}