成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

騰訊面試:什么是 Spark 寬依賴、窄依賴,如何進行性能調(diào)優(yōu)?

大數(shù)據(jù)
本文將從RDD依賴關系的本質(zhì)出發(fā),深入剖析窄依賴與寬依賴的區(qū)別,詳解Spark如何基于依賴關系進行DAG劃分和Stage優(yōu)化,并結合Spark 3.x的最新特性,提供一套完整的依賴關系調(diào)整與性能調(diào)優(yōu)實踐方案。

Apache Spark作為分布式計算框架的標桿,其性能優(yōu)勢很大程度上源于對計算流程的精細化控制。RDD(彈性分布式數(shù)據(jù)集) 作為Spark的核心抽象,不僅提供了分布式數(shù)據(jù)存儲能力,更通過依賴關系(Dependency) 描述了數(shù)據(jù)轉(zhuǎn)換的血緣關系(Lineage)。而DAG(有向無環(huán)圖) 作為任務執(zhí)行的邏輯表示,其劃分與優(yōu)化直接決定了Spark作業(yè)的并行效率。在實際生產(chǎn)中,開發(fā)者常面臨任務延遲、資源利用率低、Shuffle開銷過大等問題,其根源往往在于對依賴關系的理解不足或DAG優(yōu)化策略的缺失。

本文將從RDD依賴關系的本質(zhì)出發(fā),深入剖析窄依賴與寬依賴的區(qū)別,詳解Spark如何基于依賴關系進行DAG劃分和Stage優(yōu)化,并結合Spark 3.x的最新特性(如自適應查詢執(zhí)行AQE、動態(tài)分區(qū)裁剪DPP),提供一套完整的依賴關系調(diào)整與性能調(diào)優(yōu)實踐方案。

一、RDD依賴關系:從數(shù)據(jù)血緣到計算效率的關鍵

1. 依賴關系的本質(zhì)與分類

RDD的依賴關系描述了子RDD如何從父RDD轉(zhuǎn)換而來,是Spark實現(xiàn)容錯和并行計算的基礎。根據(jù)父RDD分區(qū)被子RDD分區(qū)引用的方式,依賴關系分為窄依賴(Narrow Dependency) 和寬依賴(Wide Dependency),二者的核心區(qū)別在于是否觸發(fā)Shuffle操作。

(1) 窄依賴:無Shuffle的高效并行

窄依賴指父RDD的每個分區(qū)最多被子RDD的一個分區(qū)引用,數(shù)據(jù)無需跨節(jié)點傳輸,可在單個Executor內(nèi)完成轉(zhuǎn)換。Spark將窄依賴操作合并為流水線(Pipeline) 執(zhí)行,顯著提升效率。窄依賴主要包括以下三種形式:

  • 一對一依賴(OneToOneDependency):子RDD分區(qū)與父RDD分區(qū)一一對應,如map、filter算子。例如,對RDD執(zhí)行map(x => x*2)時,子RDD的每個分區(qū)僅依賴父RDD相同索引的分區(qū)。
  • 范圍依賴(RangeDependency):父RDD的連續(xù)分區(qū)被子RDD的一個分區(qū)引用,典型場景為Union操作。例如,兩個RDD通過union合并時,子RDD的分區(qū)0依賴第一個父RDD的分區(qū)0,分區(qū)1依賴第一個父RDD的分區(qū)1,以此類推。
  • 協(xié)同分區(qū)Join(Co-partitioned Join):若兩個RDD具有相同的分區(qū)器(Partitioner)和分區(qū)數(shù),其Join操作可通過窄依賴實現(xiàn)。例如,RDD A和RDD B均按user_id分區(qū),且分區(qū)數(shù)均為100,則Join時子RDD的分區(qū)i僅需關聯(lián)A和B的分區(qū)i,無需Shuffle。

代碼示例:窄依賴轉(zhuǎn)換

// 一對一依賴:map操作
val rdd1 = sc.parallelize(Array(1,2,3,4),2)// 2個分區(qū)
val rdd2 = rdd1.map(_ *2)// rdd2的每個分區(qū)依賴rdd1的對應分區(qū)(窄依賴)

// 范圍依賴:union操作
val rdd3 = sc.parallelize(Array(5,6),1)
val rdd4 = rdd2.union(rdd3)// rdd4的前2個分區(qū)依賴rdd2,第3個分區(qū)依賴rdd3(窄依賴)

// 協(xié)同分區(qū)Join
val rddA = sc.parallelize(Seq(("a",1),("b",2))).partitionBy(new HashPartitioner(2))
val rddB = sc.parallelize(Seq(("a",3),("b",4))).partitionBy(new HashPartitioner(2))
val joinedRDD = rddA.join(rddB)// 分區(qū)器相同,為窄依賴Join

(2) 寬依賴:Shuffle的性能瓶頸

寬依賴指父RDD的一個分區(qū)被多個子RDD分區(qū)引用,此時需通過Shuffle將數(shù)據(jù)跨節(jié)點重新分區(qū)。Shuffle過程涉及磁盤I/O、網(wǎng)絡傳輸和序列化/反序列化,是Spark作業(yè)的主要性能瓶頸。常見觸發(fā)寬依賴的算子包括groupByKey、reduceByKey(需Shuffle)、join(非協(xié)同分區(qū)時)等。

寬依賴的底層機制:以groupByKey為例,父RDD的每個分區(qū)數(shù)據(jù)會按Key哈希分配到不同子分區(qū),過程中需將中間結果寫入本地磁盤(Shuffle Write),再由子RDD的對應分區(qū)通過網(wǎng)絡拉取(Shuffle Read)。此過程中,數(shù)據(jù)傾斜(某Key對應數(shù)據(jù)量過大)會導致部分Task耗時激增,進一步加劇性能問題。

代碼示例:寬依賴轉(zhuǎn)換

// groupByKey觸發(fā)寬依賴
val rdd = sc.parallelize(Seq(("a",1),("a",2),("b",3)),2)
val groupedRDD = rdd.groupByKey()// 寬依賴:父RDD分區(qū)數(shù)據(jù)按Key重新分布

// 非協(xié)同分區(qū)Join觸發(fā)寬依賴
val rddA = sc.parallelize(Seq(("a",1))).partitionBy(new HashPartitioner(2))
val rddB = sc.parallelize(Seq(("a",2))).partitionBy(new HashPartitioner(3))// 分區(qū)數(shù)不同
val shuffledJoinRDD = rddA.join(rddB)// 寬依賴:需Shuffle對齊分區(qū)

2. 依賴關系對Spark執(zhí)行的影響

依賴關系直接決定了Spark的容錯機制和任務并行度:

  • 容錯效率:窄依賴下,單個分區(qū)丟失僅需重算對應父分區(qū);寬依賴則需重算所有父分區(qū),恢復成本高。因此,Spark優(yōu)先對寬依賴結果進行Checkpoint。
  • 并行度:窄依賴支持流水線執(zhí)行(如map -> filter -> map可在單個Task內(nèi)完成),而寬依賴會阻斷流水線,需等待所有父分區(qū)完成才能開始子分區(qū)計算。
  • 資源利用率:寬依賴的Shuffle過程會導致大量網(wǎng)絡傳輸和磁盤I/O,若配置不當(如分區(qū)數(shù)過少),易造成Executor資源空閑或過載。

二、DAG生成與Stage劃分:從邏輯計劃到物理執(zhí)行

Spark將用戶代碼轉(zhuǎn)換為DAG后,需通過Stage劃分將邏輯計劃轉(zhuǎn)換為可執(zhí)行的物理計劃。Stage是Task的集合,每個Stage包含一組可并行執(zhí)行的Task,其劃分的核心依據(jù)是寬依賴。

1. DAG的構建過程

DAG的構建始于RDD轉(zhuǎn)換鏈,終于Action算子(如collect、count)。當觸發(fā)Action時,SparkContext會將RDD依賴鏈提交給DAGScheduler,由其構建DAG并劃分Stage。例如,以下代碼對應的DAG包含3個RDD轉(zhuǎn)換:

val result = sc.textFile("data.txt")// RDD1:HadoopRDD
.flatMap(_.split(" "))// RDD2:MapPartitionsRDD(窄依賴)
.map((_,1))// RDD3:MapPartitionsRDD(窄依賴)
.reduceByKey(_ + _)// RDD4:ShuffledRDD(寬依賴)
.collect()// Action算子,觸發(fā)DAG構建

2. Stage劃分的核心算法:回溯與寬依賴檢測

DAGScheduler采用從后往前回溯的算法劃分Stage:

  • 起點:以Action算子對應的ResultStage(最終輸出Stage)為起點。
  • 回溯依賴:遍歷當前RDD的依賴關系,若為窄依賴,則將其父RDD合并到當前Stage;若為寬依賴,則以寬依賴為邊界拆分Stage,父RDD作為新的ShuffleMapStage(需輸出Shuffle結果)。
  • 遞歸處理:對新拆分的ShuffleMapStage重復上述過程,直至所有RDD均被劃分到Stage中。

示例:上述WordCount代碼的Stage劃分如下:

  • Stage 1(ResultStage):包含reduceByKey操作,依賴寬依賴,需等待Shuffle完成。
  • Stage 0(ShuffleMapStage):包含textFile -> flatMap -> map操作,均為窄依賴,輸出結果用于Shuffle。

源碼邏輯簡化:

// DAGScheduler核心劃分邏輯(簡化版)
privatedef getParentStages(rdd: RDD[_]): List[Stage]={
val parents = mutable.HashSet[Stage]()
val visited = mutable.HashSet[RDD[_]]()
val stack = mutable.Stack[RDD[_]](rdd)

while(stack.nonEmpty){
val currentRDD = stack.pop()
if(!visited(currentRDD)){
      visited.add(currentRDD)
      currentRDD.dependencies.foreach {
case narrowDep: NarrowDependency[_]=>
          stack.push(narrowDep.rdd)// 窄依賴:合并到當前Stage
case shuffleDep: ShuffleDependency[_, _, _]=>
// 寬依賴:創(chuàng)建新的ShuffleMapStage
val stage = getOrCreateShuffleMapStage(shuffleDep)
          parents.add(stage)
}
}
}
  parents.toList
}

3. Stage的任務類型與執(zhí)行順序

劃分后的Stage分為兩類:

  • ShuffleMapStage:輸出結果用于Shuffle,對應ShuffleMapTask,任務數(shù)等于RDD分區(qū)數(shù)。
  • ResultStage:生成最終結果,對應ResultTask,任務數(shù)由Action算子決定(如collect對應1個任務)。

Stage的執(zhí)行順序遵循依賴關系:若Stage B依賴Stage A的Shuffle結果,則需等待Stage A完成后才能執(zhí)行Stage B。Spark通過廣度優(yōu)先調(diào)度提交Stage,以最大化并行度。

三、Spark Stage優(yōu)化策略:從靜態(tài)配置到動態(tài)自適應

Stage優(yōu)化是Spark性能調(diào)優(yōu)的核心,涵蓋Shuffle優(yōu)化、內(nèi)存管理、數(shù)據(jù)本地化等多個維度。Spark 3.x引入的自適應查詢執(zhí)行(AQE) 進一步實現(xiàn)了運行時動態(tài)優(yōu)化,顯著降低了人工調(diào)參成本。

1. 基于依賴關系的靜態(tài)優(yōu)化

(1) 減少寬依賴:算子選擇與邏輯調(diào)整

寬依賴是性能瓶頸的主要來源,優(yōu)化的核心是避免不必要的Shuffle或減少Shuffle數(shù)據(jù)量:

  • 用reduceByKey替代groupByKey:reduceByKey支持Map端預聚合(Combiner),減少Shuffle數(shù)據(jù)量。例如,對(key, value)按Key求和時,reduceByKey(_ + _)會先在每個分區(qū)內(nèi)局部聚合,再Shuffle全局聚合;而groupByKey().mapValues(_.sum)需將所有Value傳輸?shù)侥繕斯?jié)點后聚合,數(shù)據(jù)量更大。代碼對比:
// 低效:groupByKey無預聚合
val groupResult = rdd.groupByKey().mapValues(_.sum)

// 高效:reduceByKey預聚合
val reduceResult = rdd.reduceByKey(_ + _)// 減少Shuffle數(shù)據(jù)量約80%
  • 廣播小表優(yōu)化Join:當Join的一個表較小時(如<100MB),使用broadcast將其廣播到所有Executor,轉(zhuǎn)為Map端Join,避免Shuffle。Spark 3.x可通過spark.sql.autoBroadcastJoinThreshold自動觸發(fā),也可手動指定:代碼示例:
importorg.apache.spark.sql.functions.broadcast

val smallDF = spark.read.parquet("small_table")
val largeDF = spark.read.parquet("large_table")
val joinedDF = largeDF.join(broadcast(smallDF),"id")// 廣播小表,避免Shuffle

(2) 分區(qū)策略優(yōu)化:并行度與數(shù)據(jù)均衡

合理的分區(qū)數(shù)是提升并行度的關鍵。Spark推薦每個Task處理128MB~256MB數(shù)據(jù),分區(qū)數(shù)設置為Executor數(shù)量 × 核心數(shù) × 2~3。例如,50個Executor、每個4核的集群,推薦分區(qū)數(shù)為50×4×2=400。

  • repartition與coalesce:repartition會觸發(fā)Shuffle,用于增加分區(qū)或徹底重分區(qū);coalesce不觸發(fā)Shuffle,僅合并分區(qū)(適用于減少小文件)。代碼示例:
// 增加分區(qū)(觸發(fā)Shuffle)
val repartitionedRDD = rdd.repartition(400)

// 合并分區(qū)(不觸發(fā)Shuffle)
val coalescedRDD = rdd.coalesce(50)// 將100個小分區(qū)合并為50個
  • 動態(tài)分區(qū)裁剪(DPP):Spark 3.0引入的DPP可在Join時基于運行時條件過濾無關分區(qū)。例如,fact_table按date分區(qū),與dim_table Join時,若dim_table過濾出date='2023-01-01',DPP會僅掃描fact_table的對應分區(qū),減少I/O。

2. Spark 3.x自適應查詢執(zhí)行(AQE):動態(tài)優(yōu)化的革命

AQE(Adaptive Query Execution)是Spark 3.x的核心優(yōu)化特性,通過運行時統(tǒng)計信息動態(tài)調(diào)整執(zhí)行計劃,解決靜態(tài)優(yōu)化的局限性。其三大核心功能如下:

(1) 動態(tài)合并Shuffle分區(qū)

傳統(tǒng)Shuffle分區(qū)數(shù)固定(默認200),易導致小分區(qū)過多(調(diào)度開銷大)或大分區(qū)(數(shù)據(jù)傾斜)。AQE在Shuffle后根據(jù)實際數(shù)據(jù)量合并小分區(qū),目標分區(qū)大小由spark.sql.adaptive.advisoryPartitionSizeInBytes控制(默認64MB)。

案例:某電商日志分析任務初始設置spark.sql.shuffle.partitions=2000,AQE根據(jù)實際數(shù)據(jù)量合并為420個分區(qū),Shuffle耗時從58分鐘降至12分鐘(提升79%)。

(2) 動態(tài)調(diào)整Join策略

AQE在運行時根據(jù)表大小動態(tài)選擇Join策略:

  • 若小表大小<廣播閾值(spark.sql.autoBroadcastJoinThreshold),自動轉(zhuǎn)為Broadcast Join。
  • 若表大小適中,轉(zhuǎn)為Shuffled Hash Join。
  • 若表極大,保持Sort Merge Join。

案例:某金融Join任務中,靜態(tài)優(yōu)化誤判小表大小選擇Sort Merge Join(耗時2.1小時),AQE檢測到小表實際僅1GB,動態(tài)轉(zhuǎn)為Broadcast Join,耗時降至18分鐘(提升7倍)。

(3) 動態(tài)優(yōu)化傾斜Join

AQE自動檢測傾斜Key(默認分區(qū)大小>中位數(shù)5倍且>256MB),將傾斜分區(qū)分拆為多個子分區(qū),并行處理。例如,某支付數(shù)據(jù)中“熱門商品”Key占比80%,AQE將其拆分為20個子分區(qū),總耗時從6小時降至1.5小時(提升75%)。

AQE啟用配置:

spark.conf.set("spark.sql.adaptive.enabled","true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled","true")// 合并小分區(qū)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled","true")// 傾斜Join優(yōu)化

3. Tungsten引擎:內(nèi)存與CPU的極致優(yōu)化

Tungsten是Spark的底層優(yōu)化引擎,通過內(nèi)存管理和代碼生成提升性能,尤其對寬依賴Shuffle場景效果顯著:

  • 堆外內(nèi)存(Off-Heap):通過sun.misc.Unsafe API直接操作內(nèi)存,避免JVM對象開銷和GC。例如,Java字符串“abcd”在JVM中占48字節(jié)(含對象頭),Tungsten二進制存儲僅需4字節(jié)。
  • 向量化執(zhí)行:以Batch為單位處理數(shù)據(jù)(而非單行),利用CPU SIMD指令提升計算效率,TPC-DS基準測試性能提升40%。
  • 代碼生成(Whole-Stage CodeGen):將多個算子邏輯合并為單一Java方法,消除虛函數(shù)調(diào)用和中間對象,Shuffle聚合吞吐量提升10倍以上。

Tungsten啟用配置:

spark.conf.set("spark.memory.offHeap.enabled","true")
spark.conf.set("spark.memory.offHeap.size","4g")// 堆外內(nèi)存大小
spark.conf.set("spark.sql.codegen.wholeStage","true")// 啟用全階段代碼生成

四、實際應用中的依賴關系調(diào)整與性能調(diào)優(yōu)案例

1. 案例一:電商用戶畫像計算優(yōu)化(寬依賴→窄依賴)

背景:某電商平臺用戶畫像任務需關聯(lián)用戶行為表(100億行)與用戶標簽表(1億行),原始代碼使用join算子(寬依賴),Shuffle耗時4.2小時。

問題分析:用戶標簽表雖大但可全量加載到內(nèi)存,且用戶行為表按user_id分區(qū),可通過廣播+協(xié)同分區(qū)轉(zhuǎn)為窄依賴。

優(yōu)化步驟:

  • 廣播標簽表:broadcast(userTagsDF)避免Shuffle。
  • 預分區(qū)行為表:按user_id重分區(qū),確保與標簽表協(xié)同。

優(yōu)化后代碼:

val userTagsDF = spark.read.parquet("user_tags").repartition("user_id")// 預分區(qū)
val userBehaviorDF = spark.read.parquet("user_behavior").repartition("user_id")// 協(xié)同分區(qū)
val profileDF = userBehaviorDF.join(broadcast(userTagsDF),"user_id")// 窄依賴Join

效果:任務耗時從4.2小時降至23分鐘(提升11倍),Shuffle數(shù)據(jù)量減少98%。

2. 案例二:金融支付數(shù)據(jù)傾斜處理(寬依賴傾斜→拆分優(yōu)化)

背景:某銀行支付數(shù)據(jù)聚合任務中,5%的商戶占85%交易數(shù)據(jù),groupByKey(merchant_id)導致單個Task處理20GB數(shù)據(jù),耗時6小時。

問題分析:數(shù)據(jù)傾斜導致寬依賴Shuffle中個別Task過載。

優(yōu)化步驟:

  • 檢測傾斜Key:通過df.groupBy("merchant_id").count().orderBy(desc("count"))定位傾斜Key。
  • 拆分傾斜數(shù)據(jù):將傾斜Key單獨處理,添加隨機前綴打散分區(qū)。
  • 二次聚合:先按“Key+隨機前綴”聚合,再去掉前綴全局聚合。

優(yōu)化后代碼:

val skewedKeys = List("merchant_001","merchant_002")// 傾斜Key列表
val saltedDF = df.withColumn("salt", when(col("merchant_id").isin(skewedKeys: _*),
  concat(col("merchant_id"), lit("_"),(rand *10).cast("int"))// 加鹽打散
).otherwise(col("merchant_id")))

// 二次聚合
val resultDF = saltedDF.groupBy("salt").agg(sum("amount").alias("sum_amount"))
.withColumn("merchant_id", split(col("salt"),"_").getItem(0))
.groupBy("merchant_id").agg(sum("sum_amount").alias("total_amount"))

效果:單個Task數(shù)據(jù)量從20GB降至2GB,總耗時從6小時降至1.5小時(提升75%)。

3. 案例三:機器學習特征工程優(yōu)化(窄依賴流水線)

背景:某推薦系統(tǒng)特征工程需對1億用戶樣本執(zhí)行“清洗→特征提取→歸一化”三步轉(zhuǎn)換,原始代碼分三次觸發(fā)Action,導致重復計算。

問題分析:未充分利用窄依賴的流水線特性,多次Action觸發(fā)多次DAG執(zhí)行。

優(yōu)化步驟:

  • 合并轉(zhuǎn)換算子:將窄依賴操作串聯(lián),形成流水線。
  • 持久化中間結果:對復用的RDD使用persist緩存至內(nèi)存。

優(yōu)化后代碼:

val featureRDD = rawDataRDD
.filter(_.isValid)// 清洗(窄依賴)
.map(extractFeatures)// 特征提取(窄依賴)
.map(normalize)// 歸一化(窄依賴)
.persist(StorageLevel.MEMORY_AND_DISK)// 緩存中間結果

// 單次Action觸發(fā)所有轉(zhuǎn)換
val trainData = featureRDD.filter(_.label.isDefined)
val testData = featureRDD.filter(_.label.isEmpty)

效果:計算次數(shù)從3次降至1次,總耗時從90分鐘降至25分鐘(提升72%)。

RDD依賴關系與DAG優(yōu)化是Spark性能調(diào)優(yōu)的核心,其本質(zhì)是通過減少數(shù)據(jù)移動和提升并行效率實現(xiàn)計算加速。窄依賴的流水線執(zhí)行和寬依賴的Shuffle優(yōu)化構成了Spark任務調(diào)度的基礎,而Spark 3.x的AQE和Tungsten引擎進一步降低了調(diào)優(yōu)門檻,實現(xiàn)了“靜態(tài)配置+動態(tài)自適應”的雙重優(yōu)化。

附錄:關鍵配置參數(shù)參考

配置項

作用

推薦值

spark.default.parallelism

RDD默認并行度

Executor數(shù) × 核心數(shù) × 2

spark.sql.shuffle.partitions

SQL Shuffle分區(qū)數(shù)

400~1000(根據(jù)數(shù)據(jù)量調(diào)整)

spark.sql.adaptive.enabled

啟用AQE

true

spark.sql.autoBroadcastJoinThreshold

廣播Join閾值

100MB(大內(nèi)存集群可設200MB)

spark.memory.offHeap.enabled

啟用堆外內(nèi)存

true

spark.shuffle.file.buffer

Shuffle寫緩沖區(qū)

64KB→256KB(減少磁盤I/O)

spark.reducer.maxSizeInFlight

Shuffle讀緩沖區(qū)

48MB→96MB(減少網(wǎng)絡請求)

通過合理配置這些參數(shù),并結合依賴關系調(diào)整策略,可顯著提升Spark作業(yè)性能,充分發(fā)揮分布式計算的威力。

責任編輯:趙寧寧 來源: 大數(shù)據(jù)技能圈
相關推薦

2024-05-21 09:08:57

JVM調(diào)優(yōu)面試

2020-08-06 00:14:16

Spring IoC依賴注入開發(fā)

2012-03-26 10:55:03

JavaJava EE

2021-03-04 08:39:21

SparkRDD調(diào)優(yōu)

2018-07-18 12:12:20

Spark大數(shù)據(jù)代碼

2023-04-24 14:54:09

JVM性能調(diào)優(yōu)

2017-07-07 11:01:04

Spark性能調(diào)優(yōu)

2017-10-20 13:41:11

Spark集群代碼

2019-07-29 17:15:35

MySQL操作系統(tǒng)數(shù)據(jù)庫

2021-12-06 11:03:57

JVM性能調(diào)優(yōu)

2017-07-21 08:55:13

TomcatJVM容器

2021-12-26 00:03:25

Spark性能調(diào)優(yōu)

2012-06-20 11:05:47

性能調(diào)優(yōu)攻略

2020-07-14 14:59:00

控制反轉(zhuǎn)依賴注入容器

2010-04-20 15:41:38

Oracle sql

2025-06-23 10:25:00

Trino開源大數(shù)據(jù)

2024-04-22 00:00:00

幽靈依賴前端

2023-10-04 18:29:24

NFS小文件業(yè)務

2013-09-24 13:06:56

AngularJS性能優(yōu)化

2011-03-10 14:40:54

LAMPMysql
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 毛片com | 午夜免费观看体验区 | 色综合九九 | 99热在线免费 | 久久精品视频一区二区三区 | 精品国产伦一区二区三区观看方式 | 日本成人免费观看 | 在线观看黄免费 | 国产一区二区欧美 | 视频一区欧美 | 国产日韩欧美激情 | 国产欧美一区二区三区日本久久久 | 日韩成人在线观看 | 最新中文字幕在线 | 999精品网 | 91网在线观看 | 精品在线一区二区三区 | 九九免费视频 | 欧美日韩亚洲一区 | 在线不卡视频 | 欧美a√| 超碰av在线| 免费一区二区在线观看 | 青青久久av北条麻妃海外网 | 视频在线观看一区 | 亚洲日日夜夜 | 嫩草视频网 | 免费在线观看黄视频 | 性色av一区二区三区 | 国产成人精品av | 激情久久久久 | 久久久久久天堂 | 青青久在线视频 | 日韩免费av| 黄色在线免费网站 | 国产一区二区三区四区hd | 一区二区三区视频在线免费观看 | 中文二区| 狠狠涩 | 精品美女久久久久久免费 | 日本a视频 |