ArgoCD核心组件源码浅析

在上一章ArgoCD的设计与功能中,我们介绍了与ArgoCD相关的抽象概念:GitOps、IaC、DevOps,并梳理了ArgoCD的核心组件:Application Controller、Repository Server、API Server

由于API Server的功能在于提供RPC/REST接口和UI界面展示集群资源状态,与GitOps的实现关系不大(正如Core部署模式去掉了API Server),本文将从源码角度分析ArgoCD组件Application Controller、Repository Server的功能实现。

Application Controller

Application(以下简称App)是ArgoCD在K8S中注册的自定义资源(CRD),App Controller是ArgoCD实现的K8S控制器,它连续监视App CR, 并将CR中描述的期望状态和集群实际状态对比,若不一致,将采取纠正措施将二者同步。ArgoCD将集群连接信息和部署的项目信息以及资源清单渲染参数存入了Application CR中,一个最小化的App如下:

apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
  name: guestbook
  namespace: argocd
spec:
  project: default
  source:
    repoURL: https://github.com/argoproj/argocd-example-apps.git
    targetRevision: HEAD
    path: guestbook
  destination:
    server: https://kubernetes.default.svc
    namespace: guestbook

其中source表示git仓库和部署项目路径分析信息,destination表示部署的目标集群,在argocd中,可以有多个项目和集群被管理,他们的信息以k8s资源secret的形式存储在etcd中:

// cluster info
apiVersion: v1
data:
  config: ... // 身份信息、用于TLS双向验证的对端CA证书
  name: cluster-1   // 集群名
  server: ... // 目标集群API Server的URL
kind: Secret
metadata:
  annotations:
    managed-by: argocd.argoproj.io
  creationTimestamp: "2022-08-05T03:14:07Z"
  labels:
    argocd.argoproj.io/secret-type: cluster
  name: cluster-1
  namespace: argocd
  resourceVersion: "443946"
  selfLink: /api/v1/namespaces/argocd/secrets/cluster-1
  uid: b6f3ff89-782d-426e-aeaf-0c431f7f84bb
type: Opaque

// project info

apiVersion: v1
data:
  name: Zmxhbm5lbA==
  sshPrivateKey: ... // 拉取 ssh repo的私钥
  type: Z2l0
  url: ... // repo ssh url
kind: Secret
metadata:
  annotations:
    managed-by: argocd.argoproj.io
  creationTimestamp: "2022-09-26T07:25:33Z"
  labels:
    argocd.argoproj.io/secret-type: repository
  name: flannel
  namespace: argocd
  ownerReferences:
  - apiVersion: apps.robocloud.autowise.ai/v1alpha1
    blockOwnerDeletion: true
    controller: true
    kind: Project
    name: flannel
    uid: 6f98130d-8e32-4ddb-9e56-02586e5debbd
  resourceVersion: "121649320"
  selfLink: /api/v1/namespaces/argocd/secrets/flannel
  uid: 06cfa628-c0a0-4a0e-b15e-ec3cca97e281
type: Opaque

Argocd的Application Controller Run方法一些重要的步骤我以注释的形式解释:

// Run starts the Application CRD controller.
func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int, operationProcessors int) {
	...
	defer ctrl.appRefreshQueue.ShutDown() // app 刷新队列,生产者添加待刷新app的key,消费者负责向git repo获取最新的资源期望状态,与集群当前状态做diff,更新app的sync status和health status
	defer ctrl.appOperationQueue.ShutDown() // app 同步队列,生产者添加需要同步的app的key,消费者负责执行同步任务
  ...
	go ctrl.appInformer.Run(ctx.Done()) // 运行AppInformer, watch到的app add/update/delete事件会被加入到工作队列appReFreshQueue、appOperationQueue中等待worker处理
  ...
  // argocd 提供启动参数status processors参数配置AppRefreshQueue消费者worker的数量
	for i := 0; i < statusProcessors; i++ {
		go wait.Until(func() {
			for ctrl.processAppRefreshQueueItem() {
			}
		}, time.Second, ctx.Done())
	}

  // 同上,提供operation processors参数配置AppOperationQueue消费者worker数量
	for i := 0; i < operationProcessors; i++ {
		go wait.Until(func() {
			for ctrl.processAppOperationQueueItem() {
			}
		}, time.Second, ctx.Done())
	}
  ...
	<-ctx.Done()
}

值得一提的是,argocd开发operator与一般k8s二开operator有些不同:前者使用client-go实现了sharedIndexInformer(appInformer)并单独使用workqueue,后者使用operator开发框架kubebuilder结合controller-manager填充Reconcile方法,可能因为argocd项目启动时间早于kubebuilder GA的时间。

在上面的代码分析中,我们可以得知app对应两个workqueue:appRefreshQueue、appOperationQueue,以及对应的处理方法:processAppRefreshQueueItem(), ctrl.processAppOperationQueueItem()

下面我们从队列的生产者角度分析,触发生产者向队列添加元素的事件有哪些

查阅代码可知函数调用链

// controller/appcontroller.go

// 1. app有add/update/delete事件时
NewApplicationController
  ctrl.newApplicationInformerAndLister
    informer.AddEventHandler
      AddFunc
        ctrl.appRefreshQueue.AddRateLimited
        ctrl.appOperationQueue.AddRateLimited
      UpdateFunc
        ctrl.requestAppRefresh
          ctrl.appRefreshQueue.Add
          ctrl.appOperationQueue.Add
        ctrl.appOperationQueue.AddRateLimited
      DeleteFunc
        ctrl.appRefreshQueue.Add

// 2. ArgoCD监听的集群状态缓存statecache有变动时,触发对应app key入队
NewApplicationController
  statecache.NewLiveStateCache
    ctrl.handleObjectUpdated
      ctrl.requestAppRefresh
        ctrl.appRefreshQueue.Add
        ctrl.appOperationQueue.Add

// 3. 开启自动同步的app进入OutOfSync状态尝试同步时
NewApplicationController
  ctrl.processAppRefreshQueueItem
    ctrl.autoSync
      ctrl.requestAppRefresh
        ctrl.appRefreshQueue.Add
        ctrl.appOperationQueue.Add

// server/application/application.go
// 4 ArgoCD API Server实现了GRPC接口Get, 上游方法(ManagedResources getAppResources)调用时直接向云端K8S API Server发送Patch请求更新annotaion,触发App更新,进而触发App调谐
server.ManagedResources
server.getAppResources
  server.getCachedAppState
    server.Get
      RefreshApp
        appIf.Patch

总结有以下几个操作会触发app的refresh和sync:

  • ArgoCD UI使用时未找到集群缓存资源、手动refresh、sync
  • App资源有Create/Update/Delete事件
  • App自动同步重试
  • 管理的集群资源缓存更新

集群状态缓存

上文我们提到目标集群资源缓存也会触发app的refresh、sync。也就是说argocd与目标集群建立了长连接,来watch集群内受argocd自身管控的资源的状态,有意思的是,ArgoCD贡献者没有直接实现clusterCache组件,而是引用了一个库gitops-engine的cache类实现,结构的字段数量太多,若有兴趣查阅ClusterCache struct

经过对gitops-engine的源码梳理,clusterCache同步集群资源状态主要有两个方法 EnsureSynced、Invalidated。EnsureSync先获取目标集群的API Resource, 再为每个API Resource创建dynamic client, 调用List方法初始化缓存,watch方法持续更新缓存,Invalidate关闭channel中止相关协程:

image

看的出ArgoCD监听集群资源状态的协程数量大于等于目标集群的API资源的数量,较占用协程资源,好在API Server支持Http2,watch机制支持多路复用,所以本机文件描述符/socket的消耗不算多。

gitops-engine clusterCache提供了Invalidate方法中止状态同步,但是ArgoCD在使用gitops-engine这套机制的时候没有直接暴露出可以停止clusterCache list&watch协程的api,相关配置就一个app的状态同步最大超时次数,也就是app的refresh retry次数达到指定的最大值时,app便不会继续retry,需要修改Application的字段或者在UI手动触发refresh,这对弱网环境的集群来讲并不友好,因为集群断连后在ArgoCD的refresh retry期间,ArgoCD的refreshQueue会瞬间被塞满大量的key,导致后面一些能正常连接的集群的refresh请求没法及时处理,对应的现象是UI上Refresh button一直在loading,同时资源状态未更新,影响DevOps的效率。

集群期望状态

GitOps的核心逻辑是比较实际状态和期望状态,根据diff的结果决策资源的更新逻辑,而期望状态需要从git仓库中获取。ArgoCD为git和资源渲染工具helm/kustomize封装成了方法,作为gRPC服务供其他组件调用,这是我们后面要分析的组件Repository Server。

而App Controller作为Repo Server的gRPC Client端,在该方法中获取并比较了实际状态和期望状态:

// controller/state.go

// CompareAppState compares application git state to the live app state, using the specified
// revision and supplied source. If revision or overrides are empty, then compares against
// revision and overrides in the app spec.
func (m *appStateManager) CompareAppState(app *v1alpha1.Application, project *v1alpha1.AppProject, revisions []string, sources []v1alpha1.ApplicationSource, noCache bool, noRevisionCache bool, localManifests []string, hasMultipleSources bool) *comparisonResult {
	if len(localManifests) == 0 {
		// If the length of revisions is not same as the length of sources,
		// we take the revisions from the sources directly for all the sources.
		if len(revisions) != len(sources) {
			revisions = make([]string, 0)
			for _, source := range sources {
				revisions = append(revisions, source.TargetRevision)
			}
		}

		targetObjs, manifestInfos, err = m.getRepoObjs(app, sources, appLabelKey, revisions, noCache, noRevisionCache, verifySignature, project)
  ...
	} else {
		// Prevent applying local manifests for now when signature verification is enabled
		// This is also enforced on API level, but as a last resort, we also enforce it here
		if gpg.IsGPGEnabled() && verifySignature {
      ...
		} else {
			targetObjs, err = unmarshalManifests(localManifests)
      ...
		}
		// empty out manifestInfoMap
		manifestInfos = make([]*apiclient.ManifestResponse, 0)
	}
  ...

	liveObjByKey, err := m.liveStateCache.GetManagedLiveObjs(app, targetObjs)

  ...

	reconciliation := sync.Reconcile(targetObjs, liveObjByKey, app.Spec.Destination.Namespace, infoProvider)
	ts.AddCheckpoint("live_ms")

  ...
	diffResults, err := argodiff.StateDiffs(reconciliation.Live, reconciliation.Target, diffConfig)
	if err != nil {
		diffResults = &diff.DiffResultList{}
		failedToLoadObjs = true
		msg := fmt.Sprintf("Failed to compare desired state to live state: %s", err.Error())
		conditions = append(conditions, v1alpha1.ApplicationCondition{Type: v1alpha1.ApplicationConditionComparisonError, Message: msg, LastTransitionTime: &now})
	}
	ts.AddCheckpoint("diff_ms")

	syncCode := v1alpha1.SyncStatusCodeSynced
	managedResources := make([]managedResource, len(reconciliation.Target))
	resourceSummaries := make([]v1alpha1.ResourceStatus, len(reconciliation.Target))
	for i, targetObj := range reconciliation.Target {
		liveObj := reconciliation.Live[i]
		obj := liveObj
		if obj == nil {
			obj = targetObj
		}
		if obj == nil {
			continue
		}
		gvk := obj.GroupVersionKind()

		isSelfReferencedObj := m.isSelfReferencedObj(liveObj, targetObj, app.GetName(), appLabelKey, trackingMethod)

		resState := v1alpha1.ResourceStatus{
			Namespace:       obj.GetNamespace(),
			Name:            obj.GetName(),
			Kind:            gvk.Kind,
			Version:         gvk.Version,
			Group:           gvk.Group,
			Hook:            hookutil.IsHook(obj),
			RequiresPruning: targetObj == nil && liveObj != nil && isSelfReferencedObj,
		}
		if targetObj != nil {
			resState.SyncWave = int64(syncwaves.Wave(targetObj))
		}

		var diffResult diff.DiffResult
		if i < len(diffResults.Diffs) {
			diffResult = diffResults.Diffs[i]
		} else {
			diffResult = diff.DiffResult{Modified: false, NormalizedLive: []byte("{}"), PredictedLive: []byte("{}")}
		}

		// For the case when a namespace is managed with `managedNamespaceMetadata` AND it has resource tracking
		// enabled (e.g. someone manually adds resource tracking labels or annotations), we need to do some
		// bookkeeping in order to ensure that it's not considered `OutOfSync` (since it does not exist in source
		// control).
		//
		// This is in addition to the bookkeeping we do (see `isManagedNamespace` and its references) to prevent said
		// namespace from being pruned.
		isManagedNs := isManagedNamespace(targetObj, app) && liveObj == nil

		if resState.Hook || ignore.Ignore(obj) || (targetObj != nil && hookutil.Skip(targetObj)) || !isSelfReferencedObj {
			// For resource hooks, skipped resources or objects that may have
			// been created by another controller with annotations copied from
			// the source object, don't store sync status, and do not affect
			// overall sync status
		} else if !isManagedNs && (diffResult.Modified || targetObj == nil || liveObj == nil) {
			// Set resource state to OutOfSync since one of the following is true:
			// * target and live resource are different
			// * target resource not defined and live resource is extra
			// * target resource present but live resource is missing
			resState.Status = v1alpha1.SyncStatusCodeOutOfSync
			// we ignore the status if the obj needs pruning AND we have the annotation
			needsPruning := targetObj == nil && liveObj != nil
			if !(needsPruning && resourceutil.HasAnnotationOption(obj, common.AnnotationCompareOptions, "IgnoreExtraneous")) {
				syncCode = v1alpha1.SyncStatusCodeOutOfSync
			}
		} else {
			resState.Status = v1alpha1.SyncStatusCodeSynced
		}
		// set unknown status to all resource that are not permitted in the app project
		isNamespaced, err := m.liveStateCache.IsNamespaced(app.Spec.Destination.Server, gvk.GroupKind())
		if !project.IsGroupKindPermitted(gvk.GroupKind(), isNamespaced && err == nil) {
			resState.Status = v1alpha1.SyncStatusCodeUnknown
		}

		if isNamespaced && obj.GetNamespace() == "" {
			conditions = append(conditions, v1alpha1.ApplicationCondition{Type: v1alpha1.ApplicationConditionInvalidSpecError, Message: fmt.Sprintf("Namespace for %s %s is missing.", obj.GetName(), gvk.String()), LastTransitionTime: &now})
		}

		// we can't say anything about the status if we were unable to get the target objects
		if failedToLoadObjs {
			resState.Status = v1alpha1.SyncStatusCodeUnknown
		}

		resourceVersion := ""
		if liveObj != nil {
			resourceVersion = liveObj.GetResourceVersion()
		}
		managedResources[i] = managedResource{
			Name:            resState.Name,
			Namespace:       resState.Namespace,
			Group:           resState.Group,
			Kind:            resState.Kind,
			Version:         resState.Version,
			Live:            liveObj,
			Target:          targetObj,
			Diff:            diffResult,
			Hook:            resState.Hook,
			ResourceVersion: resourceVersion,
		}
		resourceSummaries[i] = resState
	}

  ...

	compRes := comparisonResult{
		syncStatus:           &syncStatus,
		healthStatus:         healthStatus,
		resources:            resourceSummaries,
		managedResources:     managedResources,
		reconciliationResult: reconciliation,
		diffConfig:           diffConfig,
		diffResultList:       diffResults,
	}
    ...
	return &compRes
}

获取CompareResult之后,appController调用SyncAppState方法创建gitops-engine中另一个类SyncContext,为目标集群apply/delete资源。

Repository Server

repo server是ArgoCD获取表达期望状态的资源清单数据源,该服务封装了版本控制工具git和一系列资源模板渲染工具helm、kustomize,作为gRPC服务暴露给其他组件调用。repo server实现的接口不多:

// reposerver/repository/repository.proto
// ManifestService
service RepoServerService {

    // GenerateManifest generates manifest for application in specified repo name and revision
    rpc GenerateManifest(ManifestRequest) returns (ManifestResponse) {
    }

    // GenerateManifestWithFiles generates manifest for application using provided tarball of files
    rpc GenerateManifestWithFiles(stream ManifestRequestWithFiles) returns (ManifestResponse) {
    }

    // Returns a bool val if the repository is valid and has proper access
    rpc TestRepository(TestRepositoryRequest) returns (TestRepositoryResponse) {
    }

    // Returns a valid revision
    rpc ResolveRevision(ResolveRevisionRequest) returns (ResolveRevisionResponse) {
    }

    // Returns a list of refs (e.g. branches and tags) in the repo
    rpc ListRefs(ListRefsRequest) returns (Refs) {
    }

    // ListApps returns a list of apps in the repo
    rpc ListApps(ListAppsRequest) returns (AppList) {
    }

    // ListPlugins returns a list of cmp v2 plugins running as sidecar to reposerver
    rpc ListPlugins(google.protobuf.Empty) returns (PluginList) {
    }

    // Generate manifest for application in specified repo name and revision
    rpc GetAppDetails(RepoServerAppDetailsQuery) returns (RepoAppDetailsResponse) {
    }

    // Get the meta-data (author, date, tags, message) for a specific revision of the repo
    rpc GetRevisionMetadata(RepoServerRevisionMetadataRequest) returns (github.com.argoproj.argo_cd.v2.pkg.apis.application.v1alpha1.RevisionMetadata) {
    }
    
    // Get the chart details (author, date, tags, message) for a specific revision of the repo
    rpc GetRevisionChartDetails(RepoServerRevisionChartDetailsRequest) returns (github.com.argoproj.argo_cd.v2.pkg.apis.application.v1alpha1.ChartDetails) {
    }

    // GetHelmCharts returns list of helm charts in the specified repository
    rpc GetHelmCharts(HelmChartsRequest) returns (HelmChartsResponse) {
    }

    // GetGitFiles returns a set of file paths and their contents for the given repo
    rpc GetGitFiles(GitFilesRequest) returns (GitFilesResponse) {
    }

    // GetGitDirectories returns a set of directory paths for the given repo
    rpc GetGitDirectories(GitDirectoriesRequest) returns (GitDirectoriesResponse) {
    }
}

我比较熟悉的git repo存储资源形式为helm charts,在argocd配置helm的参数后,它会调用repo server的GenerateManifest方法使用helm参数(parameter valueFile等)渲染出可直接部署的k8s资源清单,ArgoCD内置了helm二进制文件,并封装了主要命令helm template:

// util/helm/helm.go
// Helm provides wrapper functionality around the `helm` command.
type Helm interface {
	// Template returns a list of unstructured objects from a `helm template` command
	Template(opts *TemplateOpts) (string, error)
	// GetParameters returns a list of chart parameters taking into account values in provided YAML files.
	GetParameters(valuesFiles []pathutil.ResolvedFilePath, appPath, repoRoot string) (map[string]string, error)
	// DependencyBuild runs `helm dependency build` to download a chart's dependencies
	DependencyBuild() error
	// Init runs `helm init --client-only`
	Init() error
	// Dispose deletes temp resources
	Dispose()
}

repo server对git则混合使用了git二进制文件和go-git/v5库实现git仓库的本地管理,git client和实现的方法:

type Client interface {
	Root() string
	Init() error
	Fetch(revision string) error
	Submodule() error
	Checkout(revision string, submoduleEnabled bool) error
	LsRefs() (*Refs, error)
	LsRemote(revision string) (string, error)
	LsFiles(path string, enableNewGitFileGlobbing bool) ([]string, error)
	LsLargeFiles() ([]string, error)
	CommitSHA() (string, error)
	RevisionMetadata(revision string) (*RevisionMetadata, error)
	VerifyCommitSignature(string) (string, error)
	IsAnnotatedTag(string) bool
}

// nativeGitClient implements Client interface using git CLI
type nativeGitClient struct {
	EventHandlers

	// URL of the repository
	repoURL string
	// Root path of repository
	root string
	// Authenticator credentials for private repositories
	creds Creds
	// Whether to connect insecurely to repository, e.g. don't verify certificate
	insecure bool
	// Whether the repository is LFS enabled
	enableLfs bool
	// gitRefCache knows how to cache git refs
	gitRefCache gitRefCache
	// indicates if client allowed to load refs from cache
	loadRefFromCache bool
	// HTTP/HTTPS proxy used to access repository
	proxy string
}

我们知道,git版本控制有四个区:工作区、暂存区、版本库、远程仓库。我们平时存放代码的地方叫工作区,ArgoCD也需要依赖git checkout命令更新工作区的文件为指定分支。所以在repo server提取资源清单时,只能单线程同步操作不同commit下的文件,所以粗粒度控制本地git repo的方式时为每一个相同的repo增加互斥锁,仅支持同步操作,但是在操作相同commit是可以支持并发的,repo server使用一个stateMap和sync.Cond实现了:

func NewRepositoryLock() *repositoryLock {
	return &repositoryLock{stateByKey: map[string]*repositoryState{}}
}

type repositoryLock struct {
	lock       sync.Mutex
	stateByKey map[string]*repositoryState
}

// Lock acquires lock unless lock is already acquired with the same commit and allowConcurrent is set to true
func (r *repositoryLock) Lock(path string, revision string, allowConcurrent bool, init func() (io.Closer, error)) (io.Closer, error) {
	r.lock.Lock()
	state, ok := r.stateByKey[path]
	if !ok {
		state = &repositoryState{cond: &sync.Cond{L: &sync.Mutex{}}}
		r.stateByKey[path] = state
	}
	r.lock.Unlock()

	closer := ioutil.NewCloser(func() error {
		state.cond.L.Lock()
		notify := false
		state.processCount--
		var err error
		if state.processCount == 0 {
			notify = true
			state.revision = ""
			err = state.initCloser.Close()
		}

		state.cond.L.Unlock()
		if notify {
			state.cond.Broadcast()
		}
		if err != nil {
			return fmt.Errorf("init closer failed: %w", err)
		}
		return nil
	})

	for {
		state.cond.L.Lock()
		if state.revision == "" {
			// no in progress operation for that repo. Go ahead.
			initCloser, err := init()
			if err != nil {
				state.cond.L.Unlock()
				return nil, fmt.Errorf("failed to initialize repository resources: %w", err)
			}
			state.initCloser = initCloser
			state.revision = revision
			state.processCount = 1
			state.allowConcurrent = allowConcurrent
			state.cond.L.Unlock()
			return closer, nil
		} else if state.revision == revision && state.allowConcurrent && allowConcurrent {
			// same revision already processing and concurrent processing allowed. Increment process count and go ahead.
			state.processCount++
			state.cond.L.Unlock()
			return closer, nil
		} else {
			state.cond.Wait()
			// wait when all in-flight processes of this revision complete and try again
			state.cond.L.Unlock()
		}
	}
}

type repositoryState struct {
	cond            *sync.Cond
	revision        string
	initCloser      io.Closer
	processCount    int
	allowConcurrent bool
}

总结

在许久之前阅读ArgoCD源码时画过一张函数调用草图,希望能形象地展示其中函数和组件调用关系:

image