分布式計算引擎 Flink/Spark on k8s 的實現對比以及實踐
以 Flink 和 Spark 為代表的分布式流批計算框架的下層資源管理平臺逐漸從 Hadoop 生態的 YARN 轉向 Kubernetes 生態的 k8s 原生 scheduler 以及周邊資源調度器,比如 Volcano 和 Yunikorn 等。這篇文章簡單比較一下兩種計算框架在 Native Kubernetes 的支持和實現上的異同,以及對于應用到生產環境我們還需要做些什么。
1. 什么是 Native
這里的 native 其實就是計算框架直接向 Kubernetes 申請資源。比如很多跑在 YARN 上面的計算框架,需要自己實現一個 AppMaster 來想 YARN 的 ResourceManager 來申請資源。Native K8s 相當于計算框架自己實現一個類似 AppMaster 的角色向 k8s 去申請資源,當然和 AppMaster 還是有差異的 (AppMaster 需要按 YARN 的標準進行實現)。
2. Spark on k8s 使用
提交作業
向 k8s 集群提交作業和往 YARN 上面提交很類似,命令如下,主要區別包括:
--master 參數指定 k8s 集群的 ApiServer
需要通過參數 spark.kubernetes.container.image 指定在 k8s 運行作業的 image,
指定 main jar,需要 driver 進程可訪問:如果 driver 運行在 pod 中,jar 包需要包含在鏡像中;如果 driver 運行在本地,那么 jar 需要在本地。
通過 --name 或者 spark.app.name 指定 app 的名字,作業運行起來之后的 driver 命名會以 app 名字為前綴。當然也可以通過參數 spark.kubernetes.driver.pod.name 直接指定 dirver 的名字
- $ ./bin/spark-submit \ --master k8s://https://: \ --deploy-mode cluster \ --name spark-pi \ --class org.apache.spark.examples.SparkPi \ --conf spark.executor.instances=5 \ --conf spark.kubernetes.container.image= \ local:///path/to/examples.jar
提交完該命令之后,spark-submit 會創建一個 driver pod 和一個對應的 servcie,然后由 driver 創建 executor pod 并運行作業。
deploy-mode
和在 YARN 上面使用 Spark 一樣,在 k8s 上面也支持 cluster 和 client 兩種模式:
cluster mode: driver 在 k8s 集群上面以 pod 形式運行。
client mode: driver 運行在提交作業的地方,然后 driver 在 k8s 集群上面創建 executor。為了保證 executor 能夠注冊到 driver 上面,還需要提交作業的機器可以和 k8s 集群內部的 executor 網絡連通(executor 可以訪問到 driver,需要注冊)。
資源清理
這里的資源指的主要是作業的 driver 和 executor pod。spark 通過 k8s 的 onwer reference 機制將作業的各種資源連接起來,這樣當 driver pod 被刪除的時候,關聯的 executor pod 也會被連帶刪除。但是如果沒有 driver pod,也就是以 client 模式運行作業的話,如下兩種情況涉及到資源清理:
作業運行完成,driver 進程退出,executor pod 運行完自動退出
driver 進程被殺掉,executor pod 連不上 driver 也會自行退出
可以參考:https://kubernetes.io/docs/concepts/architecture/garbage-collection/
依賴管理
前面說到 main jar 包需要在 driver 進程可以訪問到的地方,如果是 cluster 模式就需要將 main jar 打包到 spark 鏡像中。但是在日常開發和調試中,每次重新 build 一個鏡像的 effort 實在是太大了。spark 支持提交的時候使用本地的文件,然后使用 s3 等作為中轉:先上傳上去,然后作業運行的時候再從 s3 上面下載下來。下面是一個實例。
- ...--packages org.apache.hadoop:hadoop-aws:3.2.0--conf spark.kubernetes.file.upload.path=s3a:///path--conf spark.hadoop.fs.s3a.access.key=...--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem--conf spark.hadoop.fs.s3a.fast.upload=true--conf spark.hadoop.fs.s3a.secret.key=....--conf spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmpfile:///full/path/to/app.jar
Pod Template
k8s 的 controller (比如 Deployment,Job)創建 Pod 的時候根據 spec 中的 pod template 來創建。下面是一個 Job 的示例。
- apiVersion: batch/v1kind: Jobmetadata: name: hellospec: template: # 下面的是一個 pod template spec: containers: - name: hello image: busybox command: ['sh', '-c', 'echo "Hello, Kubernetes!" && sleep 3600'] restartPolicy: OnFailure # The pod template ends here
由于我們通過 spark-submit 提交 spark 作業的時候,最終的 k8s 資源(driver/executor pod)是由 spark 內部邏輯構建出來的。但是有的時候我們想要在 driver/executor pod 上做一些額外的工作,比如增加 sidecar 容器做一些日志收集的工作。這種場景下 PodTemplate 就是一個比較好的選擇,同時 PodTemplate 也將 spark 和底層基礎設施(k8s)解耦開。比如 k8s 發布新版本支持一些新的特性,那么我們只要修改我們的 PodTemplate 即可,而不涉及到 spark 的內部改動。
RBAC
RBAC 全稱是 Role-based access control,是 k8s 中的一套權限控制機制。通俗來說:
RBAC 中包含了一系列的權限設置,比如 create/delete/watch/list pod 等,這些權限集合的實體叫 Role 或者 ClusterRole
同時 RBAC 還包含了角色綁定關系(Role Binding),用于將 Role/ClusterRole 賦予一個或者一組用戶,比如 Service Account 或者 UserAccount
為了將 Spark 作業在 k8s 集群中運行起來,我們還需要一套 RBAC 資源:
指定 namespace 下的 serviceaccount
定義了權限規則的 Role 或者 ClusterRole,我們可以使用常見的 ClusterRole "edit"(對幾乎所有資源具有操作權限,比如 create/delete/watch 等)
綁定關系
下面命令在 spark namespace 下為 serviceaccount spark 賦予了操作同 namespace 下其他資源的權限,那么只要 spark 的 driver pod 掛載了該 serviceaccount,它就可以創建 executor pod 了。
- $ kubectl create serviceaccount spark$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark:spark --namespace=spark
下面做一個簡單的演示:
通過如下命令提交作業 SparkPiSleep 到 k8s 集群中。
- $ spark-submit --master k8s://https://: --deploy-mode cluster --class org.apache.spark.examples.SparkPiSleep --conf spark.executor.memory=2g --conf spark.driver.memory=2g --conf spark.driver.core=1 --conf spark.app.name=test12 --conf spark.kubernetes.submission.waitAppCompletion=false --conf spark.executor.core=1 --conf spark.kubernetes.container.image= --conf spark.eventLog.enabled=false --conf spark.shuffle.service.enabled=false --conf spark.executor.instances=1 --conf spark.dynamicAllocation.enabled=false --conf sparkspark.kubernetes.namespace=spark --conf sparkspark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.executor.core=1 local:///path/to/main/jar
查看 k8s 集群中的資源
- $ kubectl get po -n sparkNAME READY STATUS RESTARTS AGEspark-pi-5b88a27b576050dd-exec-1 0/1 ContainerCreating 0 2stest12-9fd3c27b576039ae-driver 1/1 Running 0 8s
其中第一個就是 executor pod,第二個是 driver 的 pod。除此之外還創建了一個 service,可以通過該 service 訪問到 driver pod,比如 Spark UI 都可以這樣訪問到。
- $ kubectl get svc -n sparkNAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGEtest12-9fd3c27b576039ae-driver-svc ClusterIP None 7078/TCP,7079/TCP,4040/TCP 110s
下面再看一下 service owner reference,executor pod 也是類似的。
- $ kubectl get svc test12-9fd3c27b576039ae-driver-svc -n spark -oyamlapiVersion: v1kind: Servicemetadata: creationTimestamp: "2021-08-18T03:48:50Z" name: test12-9fd3c27b576039ae-driver-svc namespace: spark # service 的 ownerReference 指向了 driver pod,只要 driver pod 被刪除,該 service 也會被刪除 ownerReferences: - apiVersion: v1 controller: true kind: Pod name: test12-9fd3c27b576039ae-driver uid: 56a50a66-68b5-42a0-b2f6-9a9443665d95 resourceVersion: "9975441" uid: 06c1349f-be52-4133-80d9-07af34419b1f
3. Flink on k8s 使用
Flink on k8s native 的實現支持兩種模式:
application mode:在遠程 k8s 集群中啟動一個 flink 集群(jm 和 tm),driver 運行在 jm 中,也就是只支持 detached 模式,不支持 attached 模式。
session mode:在遠程 k8s 集群啟動一個常駐的 flink 集群(只有 jm),然后向上面提交作業,根據實際情況決定啟動多少個 tm。
在生產上面使用一般不太建議使用 session mode,所以下面主要討論的是 application mode。
Flink 的 native k8s 模式是不需要指定 tm 個數的,jm 會根據用戶的代碼計算需要多少 tm。
提交作業
下面是一個簡單的提交命令,需要包含:
參數 run-application 指定是 application 模式
參數 --target 指定運行在 k8s 上
參數 kubernetes.container.image 指定作業運行使用的 flink 鏡像
最后需要指定 main jar,路徑是鏡像中的路徑
- $ ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ local:///opt/flink/usrlib/my-flink-job.jar
資源清理
Flink 的 native 模式會先創建一個 JobManager 的 deployment,并將其托管給 k8s。同一個作業所有的相關資源的 owner reference 都指向該 Deployment,也就是說刪除了該 deployment,所有相關的資源都會被清理掉。下面根據作業的運行情況討論一下資源如何清理。
作業運行到終態(SUCCESS,FAILED,CANCELED 等)之后,Flink 會清理掉所有作業
JobManager 進程啟動失?。╬od 中的 jm 容器啟動失?。捎诳刂破魇?Deployment,所以會一直重復拉起
運行過程中,如果 JobManager 的 pod 被刪除,Deployment 會重新拉起
運行過程中,如果 JobManager 的 Deployment 被刪除,那么關聯的所有 k8s 資源都會被刪除
Pod Template
Flink native 模式也支持 Pod Template,類似 Spark。
RBAC
類似 Spark。
依賴文件管理
Flink 暫時只支持 main jar 以及依賴文件在鏡像中。也就是說用戶要提交作業需要自己定制化鏡像,體驗不是很好。一種 workaroud 的方式是結合 PodTemplate:
如果依賴是本地文件,需要 upload 到一個 remote 存儲做中轉,比如各大云廠商的對象存儲。
如果依賴是遠端文件,不需要 upload。
運行時在 template 中使用 initContainer 將用戶的 jar 以及依賴文件下載到 Flink 容器中,并加到 classpath 下運行。
Flink 的作業 demo 就不在演示了。
4. Spark on Kubernetes 實現
Spark on Kubernetes 的實現比較簡單:
Spark Client 創建一個 k8s pod 運行 driver
driver 創建 executor pod,然后開始運行作業
作業運行結束之后 driver pod 進入到 Completed 狀態,executor pod 會被清理掉。作業結束之后通過 driver pod 我們還是可以查看 driver pod 的。
代碼實現
Spark 的 native k8s 實現代碼在 resource-managers/kubernetes module 中。我們可以從 SparkSubmit 的代碼開始分析。我們主要看一下 deploy-mode 為 cluster 模式的代碼邏輯。
- // Set the cluster manager val clusterManager: Int = args.master match { case "yarn" => YARN case m if m.startsWith("spark") => STANDALONE case m if m.startsWith("mesos") => MESOS case m if m.startsWith("k8s") => KUBERNETES case m if m.startsWith("local") => LOCAL case _ => error("Master must either be yarn or start with spark, mesos, k8s, or local") -1 }
首先根據 spark.master 配置中 scheme 來判斷是不是 on k8s。我們上面也看到這個配置的形式為 --master k8s://https://: 。如果是 on k8s 的 cluster 模式,則去加載 Class org.apache.spark.deploy.k8s.submit.KubernetesClientApplication,并運行其中的 start 方法。childArgs 方法的核心邏輯簡單來說就是根據 spark-submit 提交的參數構造出 driver pod 提交到 k8s 運行。
- private[spark] class KubernetesClientApplication extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = { val parsedArguments = ClientArguments.fromCommandLineArgs(args) run(parsedArguments, conf) } private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = { // For constructing the app ID, we can't use the Spark application name, as the app ID is going // to be added as a label to group resources belonging to the same application. Label values are // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate // a unique app ID (captured by spark.app.id) in the format below. val kubernetesAppId = KubernetesConf.getKubernetesAppId() val kubernetesConf = KubernetesConf.createDriverConf( sparkConf, kubernetesAppId, clientArguments.mainAppResource, clientArguments.mainClass, clientArguments.driverArgs, clientArguments.proxyUser) // The master URL has been checked for validity already in SparkSubmit. // We just need to get rid of the "k8s://" prefix here. val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master")) val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf) Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( master, Some(kubernetesConf.namespace), KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX, SparkKubernetesClientFactory.ClientType.Submission, sparkConf, None, None)) { kubernetesClient => val client = new Client( kubernetesConf, new KubernetesDriverBuilder(), kubernetesClient, watcher) client.run() } }}
上面的代碼的核心就是最后創建 Client 并運行。這個 Client 是 Spark 封裝出來的 Client,內置了 k8s client。
- private[spark] class Client( conf: KubernetesDriverConf, builder: KubernetesDriverBuilder, kubernetesClient: KubernetesClient, watcher: LoggingPodStatusWatcher) extends Logging { def run(): Unit = { // 構造 Driver 的 Pod val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient) val configMapName = KubernetesClientUtils.configMapNameDriver val confFilesMap = KubernetesClientUtils.buildSparkConfDirFilesMap(configMapName, conf.sparkConf, resolvedDriverSpec.systemProperties) val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap) // 修改 Pod 的 container spec:增加 SPARK_CONF_DIR val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container) .addNewEnv() .withName(ENV_SPARK_CONF_DIR) .withValue(SPARK_CONF_DIR_INTERNAL) .endEnv() .addNewVolumeMount() .withName(SPARK_CONF_VOLUME_DRIVER) .withMountPath(SPARK_CONF_DIR_INTERNAL) .endVolumeMount() .build() val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod) .editSpec() .addToContainers(resolvedDriverContainer) .addNewVolume() .withName(SPARK_CONF_VOLUME_DRIVER) .withNewConfigMap() .withItems(KubernetesClientUtils.buildKeyToPathObjects(confFilesMap).asJava) .withName(configMapName) .endConfigMap() .endVolume() .endSpec() .build() val driverPodName = resolvedDriverPod.getMetadata.getName var watch: Watch = null var createdDriverPod: Pod = null try { // 通過 k8s client 創建 Driver Pod createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) } catch { case NonFatal(e) => logError("Please check \"kubectl auth can-i create pod\" first. It should be yes.") throw e } try { // 創建其他資源,修改 owner reference 等 val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) addOwnerReference(createdDriverPod, otherKubernetesResources) kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() } catch { case NonFatal(e) => kubernetesClient.pods().delete(createdDriverPod) throw e } val sId = Seq(conf.namespace, driverPodName).mkString(":") // watch pod breakable { while (true) { val podWithName = kubernetesClient .pods() .withName(driverPodName) // Reset resource to old before we start the watch, this is important for race conditions watcher.reset() watch = podWithName.watch(watcher) // Send the latest pod state we know to the watcher to make sure we didn't miss anything watcher.eventReceived(Action.MODIFIED, podWithName.get()) // Break the while loop if the pod is completed or we don't want to wait // 根據參數 "spark.kubernetes.submission.waitAppCompletion" 判斷是否需要退出 if(watcher.watchOrStop(sId)) { watch.close() break } } } }
下面再簡單介紹一下 Driver 如何管理 Executor 的流程。當 Spark Driver 運行 main 函數時,會創建一個 SparkSession,SparkSession 中包含了 SparkContext,SparkContext 需要創建一個 SchedulerBackend 會管理 Executor 的生命周期。對應到 k8s 上的 SchedulerBackend 其實就是 KubernetesClusterSchedulerBackend,下面主要看一下這個 backend 是如何創建出來的。大膽猜想一下,大概率也是根據 spark.master 的 url 的 scheme "k8s" 創建的。
下面是 SparkContext 創建 SchedulerBackend 的核心代碼邏輯。
- private def createTaskScheduler(...) = { case masterUrl => // 創建出 KubernetesClusterManager val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: '" + master + "'") } try { val scheduler = cm.createTaskScheduler(sc, masterUrl) // 上面創建出來的 KubernetesClusterManager 這里會創建出 KubernetesClusterSchedulerBackend val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException => throw se case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e) }}// 方法 getClsuterManager 會通過 ServiceLoader 加載所有實現 ExternalClusterManager 的 ClusterManager (KubernetesClusterManager 和 YarnClusterManager),然后通過 master url 進行 filter,選出 KubernetesClusterManagerprivate def getClusterManager(url: String): Option[ExternalClusterManager] = { val loader = Utils.getContextOrSparkClassLoader val serviceLoaders = ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url)) if (serviceLoaders.size > 1) { throw new SparkException( s"Multiple external cluster managers registered for the url $url: $serviceLoaders") } serviceLoaders.headOption}
后面就是 KubernetesClusterSchedulerBackend 管理 Executor 的邏輯了。
可以簡單看一下創建 Executor 的代碼邏輯。
- private def requestNewExecutors( expected: Int, running: Int, applicationId: String, resourceProfileId: Int, pvcsInUse: Seq[String]): Unit = { val numExecutorsToAllocate = math.min(expected - running, podAllocationSize) logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " + s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.") // Check reusable PVCs for this executor allocation batch val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse) for ( _ <- 0 until numExecutorsToAllocate) { val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet() val executorConf = KubernetesConf.createExecutorConf( conf, newExecutorId.toString, applicationId, driverPod, resourceProfileId) // 構造 Executor 的 Pod Spec val resolvedExecutorSpec = executorBuilder.buildFromFeatures(executorConf, secMgr, kubernetesClient, rpIdToResourceProfile(resourceProfileId)) val executorPod = resolvedExecutorSpec.pod val podWithAttachedContainer = new PodBuilder(executorPod.pod) .editOrNewSpec() .addToContainers(executorPod.container) .endSpec() .build() val resources = replacePVCsIfNeeded( podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs) // 創建 Executor Pod val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer) try { // 增加 owner reference addOwnerReference(createdExecutorPod, resources) resources .filter(_.getKind == "PersistentVolumeClaim") .foreach { resource => if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) { addOwnerReference(driverPod.get, Seq(resource)) } val pvc = resource.asInstanceOf[PersistentVolumeClaim] logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " + s"StorageClass ${pvc.getSpec.getStorageClassName}") kubernetesClient.persistentVolumeClaims().create(pvc) } newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis()) logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") } catch { case NonFatal(e) => kubernetesClient.pods().delete(createdExecutorPod) throw e } } }
5. Flink on Kubernetes 實現
Flink 的 Native K8s 實現:
Flink Client 創建 JobManager 的 Deployment,然后將 Deployment 托管給 k8s
k8s 的 Deployment Controller 創建 JobManager 的 Pod
JobManager 內的 ResourceManager 負責先 Kubernetes Scheduler 請求資源并創建 TaskManager 等相關資源并創建相關的 TaskManager Pod 并開始運行作業
當作業運行到終態之后所有相關的 k8s 資源都被清理掉
代碼(基于分支 release-1.13)實現主要如下:
CliFrontend 作為 Flink Client 的入口根據命令行參數 run-application 判斷通過方法 runApplication 去創建 ApplicationCluster
KubernetesClusterDescriptor 通過方法 deployApplicationCluster 創建 JobManager 相關的 Deployment 和一些必要的資源
JobManager 的實現類 JobMaster 通過 ResourceManager 調用類 KubernetesResourceManagerDriver 中的方法 requestResource 創建 TaskManager 等資源
其中 KubernetesClusterDescriptor 實現自 interface ClusterDescriptor ,用來描述對 Flink 集群的操作。根據底層的資源使用不同, ClusterDescriptor 有不同的實現,包括 KubernetesClusterDescriptor、YarnClusterDescriptor、StandaloneClusterDescriptor。
- public interface ClusterDescriptor<T> extends AutoCloseable { /* Returns a String containing details about the cluster (NodeManagers, available memory, ...). */ String getClusterDescription(); /* 查詢已存在的 Flink 集群. */ ClusterClientProvider<T> retrieve(T clusterId) throws ClusterRetrieveException; /** 創建 Flink Session 集群 */ ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException; /** 創建 Flink Application 集群 **/ ClusterClientProvider<T> deployApplicationCluster( final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) throws ClusterDeploymentException; /** 創建 Per-job 集群 **/ ClusterClientProvider<T> deployJobCluster( final ClusterSpecification clusterSpecification, final JobGraph jobGraph, final boolean detached) throws ClusterDeploymentException; /** 刪除集群 **/ void killCluster(T clusterId) throws FlinkException; @Override void close();}
下面簡單看一下 KubernetesClusterDescriptor 的核心邏輯:創建 Application 集群。
- public class KubernetesClusterDescriptor implements ClusterDescriptor<String> { private final Configuration flinkConfig; // 內置 k8s client private final FlinkKubeClient client; private final String clusterId; @Override public ClusterClientProvider<String> deployApplicationCluster( final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) throws ClusterDeploymentException { // 查詢 flink 集群在 k8s 中是否存在 if (client.getRestService(clusterId).isPresent()) { throw new ClusterDeploymentException( "The Flink cluster " + clusterId + " already exists."); } final KubernetesDeploymentTarget deploymentTarget = KubernetesDeploymentTarget.fromConfig(flinkConfig); if (KubernetesDeploymentTarget.APPLICATION != deploymentTarget) { throw new ClusterDeploymentException( "Couldn't deploy Kubernetes Application Cluster." + " Expected deployment.target=" + KubernetesDeploymentTarget.APPLICATION.getName() + " but actual one was \"" + deploymentTarget + "\""); } // 設置 application 參數:$internal.application.program-args 和 $internal.application.main applicationConfiguration.applyToConfiguration(flinkConfig); // 創建集群 final ClusterClientProvider<String> clusterClientProvider = deployClusterInternal( KubernetesApplicationClusterEntrypoint.class.getName(), clusterSpecification, false); try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) { LOG.info( "Create flink application cluster {} successfully, JobManager Web Interface: {}", clusterId, clusterClient.getWebInterfaceURL()); } return clusterClientProvider; } // 創建集群邏輯 private ClusterClientProvider<String> deployClusterInternal( String entryPoint, ClusterSpecification clusterSpecification, boolean detached) throws ClusterDeploymentException { final ClusterEntrypoint.ExecutionMode executionMode = detached ? ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL; flinkConfig.setString( ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString()); flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint); // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values. // 將端口指定為固定值,方便 k8s 的資源構建。因為 pod 的隔離性,所以沒有端口沖突 KubernetesUtils.checkAndUpdatePortConfigOption( flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT); KubernetesUtils.checkAndUpdatePortConfigOption( flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT); KubernetesUtils.checkAndUpdatePortConfigOption( flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT); // HA 配置 if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) { flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId); KubernetesUtils.checkAndUpdatePortConfigOption( flinkConfig, HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE, flinkConfig.get(JobManagerOptions.PORT)); } try { final KubernetesJobManagerParameters kubernetesJobManagerParameters = new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); // 補充 PodTemplate 邏輯 final FlinkPod podTemplate = kubernetesJobManagerParameters .getPodTemplateFilePath() .map( file -> KubernetesUtils.loadPodFromTemplateFile( client, file, Constants.MAIN_CONTAINER_NAME)) .orElse(new FlinkPod.Builder().build()); final KubernetesJobManagerSpecification kubernetesJobManagerSpec = KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( podTemplate, kubernetesJobManagerParameters); // 核心邏輯:在 k8s 中創建包括 JobManager Deployment 在內 k8s 資源,比如 Service 和 ConfigMap client.createJobManagerComponent(kubernetesJobManagerSpec); return createClusterClientProvider(clusterId); } catch (Exception e) { //... } }}
上面代碼中需要說的在構建 JobManager 的時候補充 PodTemplate。簡單來說 PodTemplate 就是一個 Pod 文件。
第三步的 TaskManager 創建就不再贅述了。
7. 生態
這里生態這個詞可能也不太合適,這里主要指的的如果要在生產上面使用該功能還有哪些可以做的。下面主要討論在生產環境上面用來做 trouble-shooting 的兩個功能:日志和監控。
日志
日志收集對于線上系統是非常重要的一環,毫不夸張地說,80% 的故障都可以通過日志查到原因。但是前面也說過,Flink 作業在作業運行到終態之后會清理掉所有資源,Spark 作業運行完只會保留 Driver Pod 的日志,那么我們如何收集到完整的作業日志呢?
有幾種方案可供選擇:
DaemonSet。每個 k8s 的 node 上面以 DaemonSet 形式部署日志收集 agent,對 node 上面運行的所有容器日志進行統一收集,并存儲到類似 ElasticSearch 的統一日志搜索平臺。
SideCar。使用 Flink/Spark 提供的 PodTemplate 功能在主容器側配置一個 SideCar 容器用來進行日志收集,最后存儲到統一的日志服務里面。
這兩種方式都有一個前提是有其他的日志服務提供存儲、甚至搜索的功能,比如 ELK,或者各大云廠商的日志服務。
除此之外還有一種簡易的方式可以考慮:利用 log4j 的擴展機制,自定義 log appender,在 appender 中定制化 append 邏輯,將日志直接收集并存儲到 remote storage,比如 hdfs,對象存儲等。這種方案需要將自定義的 log appender 的 jar 包放到運行作業的 ClassPath 下,而且這種方式有可能會影響作業主流程的運行效率,對性能比較敏感的作業并不太建議使用這種方式。
監控
目前 Prometheus 已經成為 k8s 生態的監控事實標準,下面我們的討論也是討論如何將 Flink/Spark 的作業的指標對接到 Prometheus。下面先看一下 Prometheus 的架構。
其中的核心在于 Prometheus Servier 收集指標的方式是 pull 還是 push:
對于常駐的進程,比如在線服務,一般由 Prometheus Server 主動去進程暴露出來的 api pull 指標。
對于會結束的進程指標收集,比如 batch 作業,一般使用進程主動 push 的方式。詳細流程是進程將指標 push 到常駐的 PushGateway,然后 Prometheus Server 去 PushGateway pull 指標。
上面兩種使用方式也是 Prometheus 官方建議的使用方式,但是看完描述不難發現其實第一種場景也可以使用第二種處理方式。只不過第二種方式由于 PushGateway 是常駐的,對其穩定性要求會比較高。
Flink
Flink 同時提供了 PrometheusReporter (將指標通過 api 暴露,由 Prometheus Server 來主動 pull 數據) 和 PrometheusPushGatewayReporter (將指標主動 push 給 PushGateway,Prometheus Server 不需要感知 Flink 作業)。
這兩種方式中 PrometheusPushGatewayReporter 會更簡單一點,但是 PushGateway 可能會成為瓶頸。如果使用 PrometheusReporter 的方式,需要引入服務發現機制幫助 Prometheus Server 自動發現運行的 Flink 作業的 Endpoint。Prometheus 目前支持的主流的服務發現機制主要有:
基于 Consul。Consul 是基于 etcd 的一套完整的服務注冊與發現解決方案,要使用這種方式,我們需要 Flink 對接 Consul。比如我們在提交作業的時候,將作業對應的 Service 進行捕獲并寫入 Consul。
基于文件。文件也就是 Prometheus 的配置文件,里面配置需要拉取 target 的 endpoint。文件這種方式本來是比較雞肋的,因為它需要 Prometheus Server 和 Flink 作業同時都可以訪問,但是需要文件是 local 的。但是在 k8s 環境中,基于文件反而變的比較簡單,我們可以將 ConfigMap 掛載到 Prometheus Server 的 Pod 上面,Flink 作業修改 ConfigMap 就可以了。
基于 Kubernetes 的服務發現機制。Kubernetes 的服務發現機制簡單來說就是 label select??梢詤⒖?/p>
- https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
關于 Prometheus 支持的更多服務發現機制,可以參考:https://prometheus.io/docs/prometheus/latest/configuration/configuration/ ,簡單羅列包括:
azure
consul
digitalocean
docker
dockerswarm
dns
ec2
eureka
file
gce
hetzner
http
kubernetes
...
Spark
以批計算為代表的 Spark 使用 PushGateway 的方式來對接 Prometheus 是比較好的方式,但是 Spark 官方并沒有提供對 PushGateway 的支持,只支持了 Prometheus 的 Exporter,需要 Prometheus Server 主動去 pull 數據。
這里推薦使用基于 Kubernetes 的服務發現機制。
需要注意的是 Prometheus Server 拉取指標是按固定時間間隔進行拉取的,對于持續時間比較短的批作業,有可能存在還沒有拉取指標,作業就結束的情況。
8. 缺陷
雖然 Spark 和 Flink 都實現了 native k8s 的模式,具體實現略有差異。但是在實際使用上發現兩者的實現在某些場景下還是略有缺陷的。
Spark
pod 不具有容錯性 spark-submit 會先構建一個 k8s 的 driver pod,然后由 driver pod 啟動 executor 的 pod。但是在 k8s 環境中并不太建議直接構建 pod 資源,因為 pod 不具有容錯性,pod 所在節點掛了之后 pod 就掛了。熟悉 k8s scheduler 的同學應該知道 pod 有一個字段叫 podName,scheduler 的核心是為 pod 填充這個字段,也就是為 pod 選擇一個合適的 node。一旦調度完成之后 pod 的該字段就固定下來了。這也是 pod 不具有 node 容錯的原因。
Flink
Deployment 語義。 Deployment 可以認為是 ReplicaSet 的增強版,而 ReplicaSet 的官方定義如下。
A ReplicaSet's purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.
簡單來說,ReplicaSet 的目的是保證幾個相同的 Pod 副本可以不間斷的運行,說是為了線上服務量身定制的也不為過(線上服務最好是無狀態且支持原地重啟,比如 WebService)。但是盡管 Flink 以流式作業為主,但是我們并不能簡單地將流式作業等同于無狀態的 WebService。比如 Flink 作業的 Main Jar 如果寫的有問題,會導致 JobManager 的 Pod 一直啟動失敗,但是由于是 Deployment 語義的問題會不斷被重啟。這個可能是 ByDesign 的,但是感覺并不太好。
Batch 作業處理。 由于 Flink 作業運行完所有資源包括 Deployment 都會被清理掉,拿不到最終的作業狀態,不知道成功有否(流作業的話停止就可以認為是失敗了)。對于這個問題可以利用 Flink 本身的歸檔功能,將結果歸檔到外部的文件系統(兼容 s3 協議,比如阿里云對象存儲 oss)中。涉及到的配置如下:
s3.access-key
s3.secret-key
s3.region
s3.endpoint
jobmanager.archive.fs.dir
如果不想引入外部系統的話,需要改造 Flink 代碼在作業運行完成之后將數據寫到 k8s 的 api object 中,比如 ConfigMap 或者 Secret。
作業日志。 Spark 作業運行結束之后 Executor Pod 被清理掉,Driver Pod 被保留,我們可以通過它查看到 Driver 的日志。Flink 作業結束之后就什么日志都查看不到了。
9. 總結
本文從使用方式、源碼實現以及在生產系統上面如何補足周邊系統地介紹了 Spark 和 Flink 在 k8s 生態上的實現、實踐以及對比。但是限于篇幅,很多內容來不及討論了,比如 shuffle 如何處理。如果你們公司也在做這方面的工作,相信還是有很多參考價值的,也歡迎留言交流。
另外,YARN 的時代已經過去了,以后 on k8s scheduler 將成為大數據計算以及 AI 框架的標配。但是 k8s scheduler 這種天生為在線服務設計的調度器在吞吐上面有很大的不足,并不是很契合大數據作業。k8s 社區的批調度器 kube-batch,以及基于 kube-batch 衍生出來的 Volcano 調度器,基于 YARN 的調度算法實現的 k8s 生態調度器 Yunikorn 也逐漸在大數據 on k8s 場景下嶄露頭角,不過這些都是后話了,后面有時間再專門寫文章進行分析對比。