久久精品国产精品国产精品污,男人扒开添女人下部免费视频,一级国产69式性姿势免费视频,夜鲁夜鲁很鲁在线视频 视频,欧美丰满少妇一区二区三区,国产偷国产偷亚洲高清人乐享,中文 在线 日韩 亚洲 欧美,熟妇人妻无乱码中文字幕真矢织江,一区二区三区人妻制服国产

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > 数据库 >内容正文

数据库

FlinkSQL实战开发

發(fā)布時(shí)間:2024/1/16 数据库 36 coder
生活随笔 收集整理的這篇文章主要介紹了 FlinkSQL实战开发 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

FlinkSQL實(shí)戰(zhàn)開發(fā)

1、基礎(chǔ)知識(shí)

FlinkSQL分為Table API和SQL API,是架構(gòu)于Flink Core之上用SQL予以方便快捷地進(jìn)行結(jié)構(gòu)化數(shù)據(jù)處理的上層庫(kù)。

  • 工作流程

SQL和Table在進(jìn)入Flink以后轉(zhuǎn)化成統(tǒng)一的數(shù)據(jù)結(jié)構(gòu)表達(dá)形式,也就是邏輯計(jì)劃(logic plan),其中catalog提供元數(shù)據(jù)信息,用于后續(xù)的優(yōu)化,邏輯計(jì)劃是優(yōu)化的入門,經(jīng)過一系列規(guī)則后,F(xiàn)link把初始的邏輯計(jì)劃優(yōu)化為物理計(jì)劃(phy plan),物理計(jì)劃通過代碼構(gòu)造器翻譯為Transformation,最后轉(zhuǎn)換為工作圖(job graph)。

整個(gè)過程沒有單獨(dú)的流處理和批處理,因?yàn)榱魈幚砗团幚韮?yōu)化過程和擴(kuò)建都是共享的。

  • 編程模型

創(chuàng)建Flink SQL運(yùn)行環(huán)境。

將數(shù)據(jù)源定義成表。

執(zhí)行SQL語(yǔ)義查詢。

將查詢結(jié)果輸出到目標(biāo)表中。

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <flink.version>1.15.2</flink.version>
    <scala.version>2.12.2</scala.version>
    <log4j.version>2.12.1</log4j.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--flink客戶端-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--本地運(yùn)行的webUI-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-runtime-web</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--flink與kafka整合-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-base</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-jdbc</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.16</version>
    </dependency>
    <!--狀態(tài)后端-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--日志系統(tǒng)-->
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-slf4j-impl</artifactId>
      <version>${log4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-api</artifactId>
      <version>${log4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>${log4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-core</artifactId>
      <version>5.3.21</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.3.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-cep</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--json格式依賴-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--csv格式依賴-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-csv</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- Flink SQL -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-scala_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- Flink CDC 的依賴 -->
    <dependency>
      <groupId>com.ververica</groupId>
      <artifactId>flink-connector-mysql-cdc</artifactId>
      <version>2.3.0</version>
    </dependency>

    <!-- flink與File整合的依賴 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-files</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <!-- Flink On Hive-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-hive_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>3.1.2</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.calcite.avatica</groupId>
          <artifactId>avatica</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.calcite</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.logging.log4j</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>com.alibaba.fastjson2</groupId>
      <artifactId>fastjson2</artifactId>
      <version>2.0.41</version>
    </dependency>
      <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-compress</artifactId>
      <version>1.21</version>
    </dependency>
      <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-compress</artifactId>
      <version>1.21</version>
    </dependency>
  </dependencies>
  • emp.txt數(shù)據(jù)
{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":7902,"hiredate":345830400000,"sal":800.0,"comm":null,"deptno":20}
{"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":351446400000,"sal":1600.0,"comm":300.0,"deptno":30}
{"empno":7521,"ename":"WARD","job":"SALESMAN","mgr":7698,"hiredate":351619200000,"sal":1250.0,"comm":500.0,"deptno":30}
{"empno":7566,"ename":"JONES","job":"MANAGER","mgr":7839,"hiredate":354988800000,"sal":2975.0,"comm":null,"deptno":20}
{"empno":7654,"ename":"MARTIN","job":"SALESMAN","mgr":7698,"hiredate":370454400000,"sal":1250.0,"comm":1400.0,"deptno":30}
{"empno":7698,"ename":"BLAKE","job":"MANAGER","mgr":7839,"hiredate":357494400000,"sal":2850.0,"comm":null,"deptno":30}
{"empno":7782,"ename":"CLARK","job":"MANAGER","mgr":7839,"hiredate":360864000000,"sal":2450.0,"comm":null,"deptno":10}
{"empno":7788,"ename":"SCOTT","job":"ANALYST","mgr":7566,"hiredate":553100400000,"sal":3000.0,"comm":null,"deptno":20}
{"empno":7839,"ename":"KING","job":"PRESIDENT","mgr":null,"hiredate":374774400000,"sal":5000.0,"comm":null,"deptno":10}
{"empno":7844,"ename":"TURNER","job":"SALESMAN","mgr":7698,"hiredate":368726400000,"sal":1500.0,"comm":0.0,"deptno":30}
{"empno":7876,"ename":"ADAMS","job":"CLERK","mgr":7788,"hiredate":553100400000,"sal":1100.0,"comm":null,"deptno":20}
{"empno":7900,"ename":"JAMES","job":"CLERK","mgr":7698,"hiredate":376156800000,"sal":950.0,"comm":null,"deptno":30}
{"empno":7902,"ename":"FORD","job":"ANALYST","mgr":7566,"hiredate":376156800000,"sal":3000.0,"comm":null,"deptno":20}
{"empno":7934,"ename":"MILLER","job":"CLERK","mgr":7782,"hiredate":380563200000,"sal":1300.0,"comm":null,"deptno":10}
  • JAVA代碼
    public static void main(String[] args) throws Exception {
        //快速入門
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment);
        //讀取文本文件數(shù)據(jù)轉(zhuǎn)為Table 對(duì)象
        DataStream<Emp> source = environment.readTextFile("data/emp.txt")
                .map(lines ->JSONObject.parseObject(lines, Emp.class));
        //把JAVA對(duì)象轉(zhuǎn)為table對(duì)象
        //注意Emp對(duì)象中hiredate時(shí)間戳是Long類型
//        {"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":351446400000,"sal":1600.0,"comm":300.0,"deptno":30}
        Table table = tableEnv.fromDataStream(source);
        table.select(Expressions.$("*")).execute().print();
    }
  • 運(yùn)行環(huán)境

TableEnvironment是Table API和SQL的核心概念:

  • 內(nèi)部catalog中注冊(cè)Table
  • 注冊(cè)外部的catalog
  • 加載可插拔模式
  • 執(zhí)行SQL查詢
  • 注冊(cè)自定義函數(shù)(scalar table aggregation)
  • DataStream和Table之間的轉(zhuǎn)換

Table與特定的TableEnvironment綁定,不能在同一條查詢中使用不同的TableEnvironment中的表。

輸入源流式還是批式,Table API和SQL查詢都會(huì)轉(zhuǎn)換成DataStream程序。

Table對(duì)象的標(biāo)識(shí)位:CataLog.DB.Table

  • 創(chuàng)建方式一
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment);
  • 創(chuàng)建方式二
   EnvironmentSettings build = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();
        TableEnvironment TabEnv = TableEnvironment.create(build);
  • 創(chuàng)建表

標(biāo)識(shí)符由三個(gè)部分組成:catalog 名稱、數(shù)據(jù)庫(kù)名稱以及對(duì)象名稱。

如果catalog或者數(shù)據(jù)庫(kù)沒有指明,就會(huì)使用當(dāng)前默認(rèn)值。

Table可以是虛擬的(視圖views)也可以是常規(guī)的表Tables,其中視圖是臨時(shí)的存儲(chǔ)在內(nèi)存中,會(huì)話結(jié)束臨時(shí)表就消失,而tables表示永久化保存的外部數(shù)據(jù)物理表。

表分類:臨時(shí)表(僅存在flink會(huì)話中)永久表(元數(shù)據(jù)保存在catalog中)屏蔽特性(臨時(shí)表與永久表同名,臨時(shí)表存在永久表就無法訪問,刪除臨時(shí)表就可以訪問永久表)

  • 案例
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment);
        Table table = tabEnv.fromDataStream(source);
//Table table = tabEnv.fromDataStream(source,$("deptno").as("dno"));查詢指定列數(shù)據(jù)并設(shè)置別名。
        tabEnv.createTemporaryView("t_emp",table);
        tabEnv.sqlQuery("select * from t_emp").execute().print();
  • DataStream轉(zhuǎn)Table對(duì)象
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment);
        //設(shè)置別名并查詢指定列數(shù)據(jù)
        Table table = tabEnv.fromDataStream(source,$("deptno").as("dno"));
  • createTemporaryView

創(chuàng)建臨時(shí)視圖(臨時(shí)表),第一個(gè)參數(shù)是注冊(cè)的表名([catalog.db.]tableName),第二個(gè)參數(shù)可以是Tabe對(duì)象也可以是DataStream對(duì)象,第三個(gè)參數(shù)是指定的列字段名(可選)。

      Table table = tabEnv.fromDataStream(source);
//Table table = tabEnv.fromDataStream(source,$("deptno").as("dno"));查詢指定列數(shù)據(jù)并設(shè)置別名。
        tabEnv.createTemporaryView("t_emp",table);
=========================================================================================
  DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment);
        //設(shè)置別名  并指定查詢的列數(shù)據(jù)
        tabEnv.createTemporaryView("t_emp",source,$("deptno").as("dd"));
        tabEnv.sqlQuery("select * from t_emp").execute().print();

  • 數(shù)據(jù)類型
  • 原子類型:DataStream中支持的數(shù)據(jù)類型,Table也是支持的,也就是基本數(shù)據(jù)類和通用類型(Integer、Double、String等)
  • Tuple類型:從f0開始計(jì)數(shù),f0 f1 f2,所有字段都可以被重新排序,也可以提前一部分字段。
  • Pojo類型:Flink 也支持多種數(shù)據(jù)類型組合成的“復(fù)合類型”,最典型的就是簡(jiǎn)單 Java 對(duì)象(POJO 類型)。將 POJO 類型的 DataStream 轉(zhuǎn)換成 Table,如果不指定字段名稱,就會(huì)直接使用原始 POJO 類型 中的字段名稱。Pojo字段可以被重新排序、提取和重命名。
  • Row類型:Flink 中還定義了一個(gè)在關(guān)系型表中更加通用的數(shù)據(jù)類型——行(Row),它是 Table 中數(shù)據(jù)的基 本組織形式。長(zhǎng)度固定,無法推斷出每個(gè)字段的類型,在使用時(shí)必須聲明具體的類型信息。
  StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = environment.readTextFile("data/dept.txt");
  //所謂的字段重新排序就是查詢出來的指定字段順序可以自定義 
StreamTableEnvironment.create(environment).fromDataStream(source,$("f1")).execute().print();
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment);
        DataStreamSource<Row> source = environment.fromElements(Row.ofKind(RowKind.INSERT, "張三", 20)
                , Row.ofKind(RowKind.INSERT, "李四", 25)
                //RowKind.UPDATE_BEFORE  打標(biāo)記的作用
                , Row.ofKind(RowKind.UPDATE_BEFORE, "yy", 12)
                , Row.ofKind(RowKind.UPDATE_AFTER, "aaa", 18));
        Table table = tabEnv.fromChangelogStream(source);
        table.execute().print();
  • 查詢表

Table API 是關(guān)于 Scala 和 Java 的集成語(yǔ)言式查詢 API。與 SQL 相反,Table API 的查詢不是由字符串指定,而是在宿主語(yǔ)言中逐步構(gòu)建。

table.groupBy(...).select() ,其中 groupBy(...) 指定 table 的分組,而 select(...) 在 table 分組上的投影

 //{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":7902,"hiredate":345830400000,"sal":800.0,"comm":null,"deptno":20}
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
        Table table = StreamTableEnvironment.create(environment).fromDataStream(source);
        table.where($("deptno").isEqual(10)).select($("ename"), $("job")).execute().print();
        table.groupBy($("deptno")).select($("deptno"),$("sal").avg().as("sal_avg")).execute().print();

  • SQL語(yǔ)法

StreamTableEnvironment對(duì)象有兩個(gè)常用的方法:sqlQuery()和executeSql()兩個(gè)方法。

  • sqlQuery()主要用于查詢數(shù)據(jù),并且可以查詢混用。
  • executeSql()可以用來增刪改查數(shù)據(jù)都可以。
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
StreamTableEnvironment tbl = StreamTableEnvironment.create(environment);
        tbl.createTemporaryView("t_emp_demo",source);
        String sql="select deptno,avg(sal) " +
                " from t_emp_demo " +
                " group by deptno ";
         tbl.executeSql(sql).print();
=========================================================================================
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
        Table empTable = tableEnvironment.fromDataStream(source);
tableEnvironment.sqlQuery("select * from "+empTable).execute().print();
  • 輸出表

insertInto:Table通過寫入TableSink輸出。TableSink是一個(gè)通用接口,包括:

  • 用于支持多種文件格式(如CSV、Apache Parquest、Apache Avro)
  • 存儲(chǔ)系統(tǒng)(如JDBC、Apache Hbase、Apache Cassandra、Es)
  • 消息隊(duì)列系統(tǒng)(如Apache kafka、Rabbit MQ)
  • 控制臺(tái)寫入并輸出
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.createTemporaryView("t_emp_d",source);
        Table tableSource = tblEnv.fromDataStream(source, $("empno"), $("ename"), $("job"));
        String sql=
                "create table t_emp_r(" +
                "empno Integer," +
                "ename String," +
                "job String) " +
                "with ( " +
                "'connector'='print')";
         tblEnv.executeSql(sql);
         tableSource.insertInto("t_emp_r").execute();
         //t_emp_r 不能當(dāng)做表進(jìn)行查詢 只能當(dāng)做sink端
//        tblEnv.executeSql("select * from t_emp_r").print();
        environment.execute();
  • kafka寫入
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        String sqlSource="create table kafka_source( " +
                "deptno int," +
                "dname String," +
                "loc String)" +
                "with (" +
                "'connector'='kafka'," +
                "'topic'='flink_kafka_source'," +
                "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
                "'properties.group.id'='flink-zwf'," +
                "'scan.startup.mode'='earliest-offset'," +
                "'format'='csv')";
        tblEnv.executeSql(sqlSource);

        String sqlSink="create table kafka_sink( " +
                "deptno int," +
                "dname String," +
                "loc String)" +
                "with (" +
                "'connector'='kafka'," +
                "'topic'='flink_kafka_sink'," +
                "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
                "'properties.group.id'='flink-zwf'," +
                "'scan.startup.mode'='earliest-offset'," +
                "'format'='json')";
        tblEnv.executeSql(sqlSink);
         //從一張表查詢數(shù)據(jù)插入到另外一張表中
        tblEnv.sqlQuery("select * from kafka_source").insertInto("kafka_sink").execute();
  • 查看執(zhí)行計(jì)劃
tblEnv.sqlQuery("select * from kafka_source").insertInto("kafka_sink").printExplain();
  • 表對(duì)象轉(zhuǎn)換為流對(duì)象

將一個(gè)Table對(duì)象轉(zhuǎn)換成DataStream,直接調(diào)用表環(huán)境中國(guó)的ToDataStream();

  tableEnv.toDataStream(table).print();
  • toChangelogStream

對(duì)于有更新操作的表,我們不要視圖直接把它轉(zhuǎn)換成DataStream打印,而是記錄一下它的更新日志(change log)。

對(duì)于表的更新操作的表,就變成了一條更新日志的流,可轉(zhuǎn)換成流打印輸出。

規(guī)則:Insert插入操作編碼是add消息。Delete刪除操作編碼為retract消息 update更新操作則為編碼更改行的retract消息和更新后行的add消息

tableEnv.toChangelogStream(table).print();
  • JDBC連接

Flink 支持連接到多個(gè)使用方言(dialect)的數(shù)據(jù)庫(kù),如 MySQL、Oracle、PostgreSQL、Derby 等。其中,Derby 通常是用于測(cè)試目的。下表列出了從關(guān)系數(shù)據(jù)庫(kù)數(shù)據(jù)類型到 Flink SQL 數(shù)據(jù)類型的類型映射,映射表可以使得在 Flink 中定義 JDBC 表更加簡(jiǎn)單。

  • 常見的數(shù)據(jù)類型映射

  • 依賴
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc</artifactId>
  <version>1.15.4</version>
</dependency>
   <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.16</version>
    </dependency>
  • 案例
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
         String jdbcSQL=
                 "create table jdbc_scott_emp(" +
                 "empno int," +
                 "ename string," +
                 "job string," +
                 "mgr int," +
                 "hiredate date," +
                 "sal double," +
                 "comm double," +
                 "deptno int)" +
                 "with (" +
                 "'connector'='jdbc'," +
                 "'url'='jdbc:mysql://master:3306/scott?serverTimeZone=Asia/Shanghai'," +
                 "'table-name'='emp'," +
                 "'driver'='com.mysql.cj.jdbc.Driver'," +
                 "'username'='root'," +
                 "'password'='Root@123456.')";
        tblEnv.executeSql(jdbcSQL);
        tblEnv.sqlQuery("select * from jdbc_scott_emp").execute().print();
  • SQL語(yǔ)句(jdbc數(shù)據(jù)插入操作、時(shí)態(tài)關(guān)聯(lián)創(chuàng)建維表)
-- 從另一張表 "T" 將數(shù)據(jù)寫入到 JDBC 表中
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;


-- JDBC 表在時(shí)態(tài)表關(guān)聯(lián)中作為維表
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;
  • DataGen SQL連接器

用于生成模擬數(shù)據(jù),DataGen 連接器允許按數(shù)據(jù)生成規(guī)則進(jìn)行讀取。

不支持復(fù)雜類型: Array,Map,Row。 請(qǐng)用計(jì)算列構(gòu)造這些類型。

連接器參數(shù)

  • 案例
 //按照一定規(guī)則隨機(jī)生成數(shù)據(jù)
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        String SqlStr="CREATE TABLE datagen (\n" +
                " f_sequence INT,\n" +
                " f_random INT,\n" +
                " f_random_str STRING,\n" +
                " ts AS localtimestamp,\n" +
                " WATERMARK FOR ts AS ts\n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='5',\n" +
                " 'fields.f_sequence.kind'='sequence',\n" +
                " 'fields.f_sequence.start'='1',\n" +
                " 'fields.f_sequence.end'='1000',\n" +
                " 'fields.f_random.min'='1',\n" +
                " 'fields.f_random.max'='1000',\n" +
                " 'fields.f_random_str.length'='10'\n" +
                ")";
        tblEnv.executeSql(SqlStr);
        tblEnv.sqlQuery("select * from datagen").execute().print();
  • Upsert Kafka SQL連接器

由于flink是流式計(jì)算,會(huì)出現(xiàn)相同的key值數(shù)據(jù)寫入,在寫入kafka中,同一個(gè)key生成的value值會(huì)不斷被更新(update -u u+標(biāo)記),如果沒有重復(fù)的key則被插入(insert +i標(biāo)記),如果value為空值就會(huì)被標(biāo)記刪除(delete +d標(biāo)記)。

作為 sink,upsert-kafka 連接器可以消費(fèi) changelog 流。它會(huì)將 INSERT/UPDATE_AFTER 數(shù)據(jù)作為正常的 Kafka 消息寫入并將 DELETE 數(shù)據(jù)以 value 為空的 Kafka 消息寫入(表示對(duì)應(yīng) key 的消息被刪除)。Flink 將根據(jù)主鍵列的值對(duì)數(shù)據(jù)進(jìn)行分區(qū),從而保證主鍵上的消息有序,因此同一主鍵上的更新/刪除消息將落在同一分區(qū)中。

  • 案例
//使用datagen模擬數(shù)據(jù)
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        String dataGen=
                "create table t_dataGen(" +
                        "deptno int," +
                        "salnum int," +
                        "ts AS localtimestamp," +
                        "WATERMARK FOR ts AS ts" +
                        ") with ( " +
                        "'connector'='datagen'," +
                        "'rows-per-second'='2'," +
                        "'fields.deptno.min'='88'," +
                        "'fields.deptno.max'='99'," +
                        "'fields.salnum.min'='10'," +
                        "'fields.salnum.max'='20')";
        tblEnv.executeSql(dataGen);
//        tblEnv.sqlQuery("select deptno,sum(salnum) as salnum from t_dataGen group by deptno").execute().print();
        //kafka sink端
        String kafkaSink="create table upsert_kafka_num(" +
                "deptno int," +
                "salnum int," +
                "primary key(deptno) not enforced)" +
                "with(" +
                "'connector'='upsert-kafka'," +
                "'topic'='upsert_kafka'," +
                "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
                "'key.format'='csv'," +
                "'value.format'='json')";
        tblEnv.executeSql(kafkaSink);
          //插入數(shù)據(jù)
        tblEnv.executeSql("insert into upsert_kafka_num select deptno,sum(salnum) as salnum from t_dataGen group by deptno");
  • FileSystem連接器

文件系統(tǒng)分為:本地文件系統(tǒng)、外部文件系統(tǒng)。

本地文件系統(tǒng):ink 原生支持本地機(jī)器上的文件系統(tǒng),包括任何掛載到本地文件系統(tǒng)的 NFS 或 SAN 驅(qū)動(dòng)器,默認(rèn)即可使用,無需額外配置。本地文件可通過 file:// URI Scheme 引用。

外部文件系統(tǒng):常見的有HDFS、clickhouse、HBase,上述文件系統(tǒng)可以并且需要作為插件使用。

使用外部文件系統(tǒng)時(shí),在啟動(dòng) Flink 之前需將對(duì)應(yīng)的 JAR 文件從 opt 目錄復(fù)制到 Flink 發(fā)行版 plugin 目錄下的某一文件夾中。

  • 本地文件測(cè)試
 public static void main(String[] args) {
        //設(shè)置環(huán)境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        String sqlDemo="create table t_dept_d(" +
                "deptno int," +
                "dname string," +
                "loc string)" +
                "with(" +
                "'connector'='filesystem'," +
                "'path'='data/dept.txt'," +
                "'format'='csv'" +
                ")";
         tblEnv.executeSql(sqlDemo);
         tblEnv.sqlQuery("select * from t_dept_d").execute().print();
    }
  • HDFS分布文件系統(tǒng)測(cè)試
  <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.3.4</version>
    </dependency>
<!--加載一些其他配置文件 比如core-site.xml dfs-core.xml yarn-site.xml等配置文件進(jìn)resource目錄-->
  StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        String hdfsSql="create table dfs_dept(" +
                "deptno int," +
                "dname string," +
                "loc string)" +
                "with (" +
                "'connector'='filesystem'," +
                "'path'='hdfs://hdfs-zwf/dept.txt'," +
                "'format'='csv')";
            tblEnv.executeSql(hdfsSql);
            tblEnv.sqlQuery("select * from dfs_dept").execute().print();

    }

4、Schema結(jié)構(gòu)

  • Pythsical column

物理字段:源自于外部存儲(chǔ)系統(tǒng)本身schema中的字段

  • kafka消息的key、value中的字段
  • mysql表中的字段
  • hive表中的字段
  • parquet文件中的字段
  • computed column

表達(dá)式字段:在物理字段上施加一個(gè)sql表達(dá)式,并將表達(dá)式結(jié)果定義為一個(gè)字段.

// 第一種sqlAPI 
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);

        String sqlStr="create table upsert_info(" +
                "deptno int," +
                "salnum2 as salnum*100,"+    //計(jì)算列
                "salnum int)" +
                "with (" +
                "'connector'='kafka'," +
                "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
                "'properties.group.id'='zwf'," +
                "'topic'='upsert_kafka'," +
                "'scan.startup.mode'='earliest-offset'," +
                "'format'='json')";
        tblEnv.executeSql(sqlStr);
        tblEnv.sqlQuery("select * from upsert_info").execute().print();


//第二種方式  TableAPI
        tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka")
                        .schema(Schema.newBuilder()
                                .column("deptno", DataTypes.INT())
                                .column("salnum",DataTypes.INT())
                                .columnByExpression("salpluns","salnum*100")
                                .build()).option("connector","kafka")
                .option("topic","upsert_kafka")
                .option("scan.startup.mode","earliest-offset")
                .option("properties.bootstrap.servers","node1:9092,master:9092,node2:9092")
                .format("json").build());
        tblEnv.sqlQuery("select * from kafka_dept").execute().print();
  • metadata column

元數(shù)據(jù)字段:來源于connector從外部存儲(chǔ)系統(tǒng)中獲取到外部系統(tǒng)元信息。

kafka消息,通常意義上的數(shù)據(jù)內(nèi)容是在record的key和value中,但是kafka還會(huì)攜帶所屬partition、offset、timestamp等元信息。而flink的連接器可以獲取并暴露這些元信息,允許用戶將信息定義成flinksql表中的字段。

//第一種sqlAPi
        String sqlStr="create table upsert_info(" +
                "deptno int," +
                "salnum2 as salnum*100," +   //計(jì)算列
                "event_time timestamp_ltz(3) metadata from 'timestamp',"+    //metadata列
                "salnum int)" +
                "with (" +
                "'connector'='kafka'," +
                "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
                "'properties.group.id'='zwf'," +
                "'topic'='upsert_kafka'," +
                "'scan.startup.mode'='earliest-offset'," +
                "'format'='json')";
        tblEnv.executeSql(sqlStr);
        tblEnv.sqlQuery("select * from upsert_info").execute().print();

//第二種方式  TableAPI
        tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka")
                        .schema(Schema.newBuilder()
                                .column("deptno", DataTypes.INT())
                                .column("salnum",DataTypes.INT())
                                
                                //metadata column
                                .columnByMetadata("event_time",DataTypes.TIMESTAMP_LTZ(2),"timestamp",true)
                                .columnByMetadata("k_offset",DataTypes.INT(),"offset",true)
                                .build()).option("connector","kafka")
                .option("topic","upsert_kafka")
                .option("scan.startup.mode","earliest-offset")
                .option("properties.bootstrap.servers","node1:9092,master:9092,node2:9092")
                .format("json").build());
        tblEnv.sqlQuery("select * from kafka_dept").execute().print();
  • 主鍵約束

單字段主鍵約束語(yǔ)法:

// SQL API
id INT PRIMARY KEY NOT ENFORCED,
name STRING

// Table Api
tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka")
                        .schema(Schema.newBuilder()
                                .column("deptno", DataTypes.INT())
                                //設(shè)置主鍵字段 primary key
                                .primaryKey("deptno")
                                .column("salnum",DataTypes.INT())
                                //metadata column
                                .columnByMetadata("event_time",DataTypes.TIMESTAMP_LTZ(2),"timestamp",true)
                                .columnByMetadata("k_offset",DataTypes.INT(),"offset",true)
                                .build()).option("connector","kafka")

多字段主鍵約束語(yǔ)法:

-- SQL API
id,
name,
PRIMARY KEY(id,name) NOT ENFORCED

//Table API    
 tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka")
                        .schema(Schema.newBuilder()
                                .column("deptno", DataTypes.INT())
                                //設(shè)置主鍵字段 primary key
                                .primaryKey("deptno","event_time")
                                .column("salnum",DataTypes.INT())
                                //metadata column
                                .columnByMetadata("event_time",DataTypes.TIMESTAMP_LTZ(2),"timestamp",true)
                                .columnByMetadata("k_offset",DataTypes.INT(),"offset",true)
                                .build()).option("connector","kafka")
                    
                    
//第一種sqlAPi
        String sqlStr="create table upsert_info(" +
                "deptno int," +//計(jì)算列
                "event_time timestamp_ltz(3) metadata from 'timestamp',"+  //metadata列
                "dname string," +
                "loc string," +
                "primary key(deptno,loc) not enforced)" +
                "with (" +
                "'connector'='upsert-kafka'," +
                "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
                "'properties.group.id'='zwf'," +
                "'topic'='flink_kafka_source'," +
                "'key.format'='csv'," +
                "'value.format'='json')";
        tblEnv.executeSql(sqlStr);
        tblEnv.sqlQuery("select * from upsert_info").execute().print();

注意的是:kafka連接器模式下不能設(shè)置主鍵,但是upsert-kafka連接器模式必須設(shè)置主鍵!主鍵字段不能有空值

在upsert-kafka模式下,key和value值不能為空,否則在csv模式中會(huì)解析失敗!

5、FlinkSQL Format

connector 連接器:對(duì)接外部存儲(chǔ)時(shí), 根據(jù)外部存儲(chǔ)中的數(shù)據(jù)格式不同, 需要用到不同的 format 組件;

format 組件:作用就是告訴連接器, 如何解析外部存儲(chǔ)中的數(shù)據(jù)及映射到表 schema;

使用基本步驟:

  • 導(dǎo)入format組件的jar依賴
  • 指導(dǎo)format組件名稱
  • 設(shè)置format組件所需的參數(shù)
  • FlinkSQL支持的Format

  • 案例
<!--json格式依賴-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--csv格式依賴-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-csv</artifactId>
      <version>${flink.version}</version>
    </dependency>
  • 案例
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true',
'csv.allow-comments' = 'true'
)

CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)

動(dòng)態(tài)表 是 Flink 的支持流數(shù)據(jù)的 Table API 和 SQL 的核心概念。與表示批處理數(shù)據(jù)的靜態(tài)表不同,動(dòng)態(tài)表是隨時(shí)間變化的。可以像查詢靜態(tài)批處理表一樣查詢它們。查詢動(dòng)態(tài)表將生成一個(gè)連續(xù)查詢(Continuous Query) 。一個(gè)連續(xù)查詢永遠(yuǎn)不會(huì)終止,結(jié)果會(huì)生成一個(gè)動(dòng)態(tài)表。查詢不斷更 新其(動(dòng)態(tài))結(jié)果表,以反映其(動(dòng)態(tài))輸入表上的更改。本質(zhì)上,動(dòng)態(tài)表上的連續(xù)查詢非常類似于定 義物化視圖的查詢。

需要注意的是,連續(xù)查詢的結(jié)果在語(yǔ)義上總是等價(jià)于以批處理模式在輸入表快照上執(zhí)行的相同查詢的結(jié)果。

與spark、hive組件中的表最大不同之處在于flink SQL中的表是動(dòng)態(tài)表。flink核心就是對(duì)有界或者*的數(shù)據(jù)流處理,是流式持續(xù)處理的過程。

  • 連續(xù)查詢

在動(dòng)態(tài)表上計(jì)算一個(gè)連續(xù)查詢,生成一個(gè)新的動(dòng)態(tài)表。與批處理查詢不同,連續(xù)查詢從不終止,根據(jù)其輸入表上的更新其結(jié)果表。在任何時(shí)候,連續(xù)查詢的結(jié)果在語(yǔ)義上與批處理模式在輸入表快照上執(zhí)行相同查詢的結(jié)果相同。

  • 事件時(shí)間

創(chuàng)建表的DDL,增加一個(gè)字段,通過watermark語(yǔ)句來定義事件時(shí)間屬性。

WATERMARK 語(yǔ)句主要用來定義水位線(watermark)的生成表達(dá)式,這個(gè)表達(dá)式會(huì)將帶有事件 時(shí)間戳的字段標(biāo)記為事件時(shí)間屬性,并在它基礎(chǔ)上給出水位線的延遲時(shí)間。

//水位線   設(shè)置延遲時(shí)間5s
        String eventTime="create table proc_dept_tbl(" +
                "deptno int," +
                "dname string," +
                "loc string," +
                "ts timestamp_ltz(3) metadata from 'timestamp'," +
                "watermark for ts as ts-interval '5' second" +    // pt是事件處理
                ")with( " +
                "'connector'='kafka'," +
                "'topic'='flink_kafka_sink'," +
                "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
                "'properties.group.id'='zwf'," +
                "'scan.startup.mode'='earliest-offset'," +
                "'format'='json')";
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.executeSql(eventTime);
        tblEnv.sqlQuery("select * from proc_dept_tbl").execute().print();


//Table API
 tblEnv.createTable("t_water_mark", TableDescriptor.forConnector("kafka")
                         .option("topic","flink_kafka_sink")
                         .option("properties.bootstrap.servers","node1:9092,master:9092,node2:9092")
                         .option("properties.group.id","zwf")
                         .option("scan.startup.mode","earliest-offset")
                         .format("json")
                         .schema(Schema.newBuilder()
                                 .column("deptno",DataTypes.INT())
                                 .column("dname",DataTypes.STRING())
                                 .column("loc",DataTypes.STRING())
                                 .columnByMetadata("ts",DataTypes.TIMESTAMP_LTZ(3),"timestamp",true)
                                 .watermark("ts","ts-interval '5' second").build()).build());
        tblEnv.sqlQuery("select deptno,dname,ts from t_water_mark").execute().print();
  • 處理時(shí)間

定義處理時(shí)間屬性時(shí),必須要額外聲明一個(gè)字段,專門用來保存當(dāng)前的處理時(shí)間

在創(chuàng)建表的 DDL(CREATE TABLE 語(yǔ)句)中,可以增加一個(gè)額外的字段,通過調(diào)用系統(tǒng)內(nèi)置的 PROCTIME()函數(shù)來指定當(dāng)前的處理時(shí)間屬性,返回的類型是 TIMESTAMP_LTZ

  • 案例
   //Flink SQL 水位線   處理時(shí)間
        String procTime="create table proc_dept_tbl(" +
                "deptno int," +
                "dname string," +
                "loc string," +
                "pt as proctime()" +    // pt是事件處理
                ")with( " +
                "'connector'='kafka'," +
                "'topic'='flink_kafka_sink'," +
                "'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
                "'properties.group.id'='zwf'," +
                "'scan.startup.mode'='earliest-offset'," +
                "'format'='json')";
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.executeSql(procTime);
        tblEnv.sqlQuery("select * from proc_dept_tbl").execute().print();


//使用TableApi執(zhí)行
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
        Table table = tblEnv.fromDataStream(source, Schema.newBuilder()
                        .column("empno",DataTypes.INT())
                        .column("ename", DataTypes.STRING())
                        .column("job",DataTypes.STRING())
                        .column("mgr",DataTypes.INT())
                        .column("hiredate",DataTypes.BIGINT())
                        .column("sal",DataTypes.DOUBLE())
                        .column("comm",DataTypes.DOUBLE())
                        .column("deptno",DataTypes.INT())
                        .columnByExpression("ts","proctime()")
                .build());
 tblEnv.sqlQuery("select empno,ename,ts from"+table.toString()).execute().print();
  • DataStream定義時(shí)間

處理時(shí)間屬性同樣可以在將DataStream轉(zhuǎn)換為表的時(shí)候來定義。我們調(diào)用fromDataStream()方法 創(chuàng)建表時(shí),可以用.proctime()后綴來指定處理時(shí)間屬性字段

由于處理時(shí)間是系統(tǒng)時(shí)間,原始數(shù)據(jù)中并沒有這個(gè)字段,所以處理時(shí)間屬性一定不能定義在一個(gè)已 有字段上,只能定義在表結(jié)構(gòu)所有字段的最后,作為額外的邏輯字段出現(xiàn)

      //快速入門
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//        environment.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment);
        //讀取文本文件數(shù)據(jù)轉(zhuǎn)為Table 對(duì)象
        DataStream<Emp> source = environment.readTextFile("data/emp.txt")
                .map(lines ->JSONObject.parseObject(lines, Emp.class));
        //把JAVA對(duì)象轉(zhuǎn)為table對(duì)象
        //注意Emp對(duì)象中hiredate時(shí)間戳是Long類型
//        {"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":351446400000,"sal":1600.0,"comm":300.0,"deptno":30}
        Table table = tableEnv.fromDataStream(source,$("empno"),$("ename"),$("ts").proctime());

        table.select($("*")).execute().print();

7、FlinkSQL 窗口TVF

  • TVF窗口化表值函數(shù)
  • 目前flink提供了以下幾個(gè)窗口:
    • 滑動(dòng)窗口
    • 滾動(dòng)窗口
    • 累積窗口
    • 會(huì)話窗口
  • 窗口TVF的返回值中,除去原始表中的所有列,增加描述窗口的額外3個(gè)列:

? 窗口起始點(diǎn):窗口開始起始時(shí)間

? 窗口結(jié)束點(diǎn):窗口結(jié)束時(shí)間

? 窗口時(shí)間:窗口結(jié)束時(shí)間-1

滾動(dòng)窗口在DataStream API中的定義完全一樣,是長(zhǎng)度固定、時(shí)間對(duì)齊、無重疊的窗口,一般用于周期性的統(tǒng)計(jì)計(jì)算。

Tumble(table data,timecol,size[,offset])函數(shù)三個(gè)必需參數(shù):

data:表參數(shù),此表需要包含一個(gè)時(shí)間屬性列

timecol:一個(gè)描述符,指示數(shù)據(jù)的哪個(gè)時(shí)間屬性列應(yīng)該映射到滾動(dòng)的窗口

size:指定滾動(dòng)窗口的大小

  • 案例
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        //執(zhí)行SQL  隨機(jī)生成gid和sales   gid隨機(jī)值取10到20 sales隨機(jī)值取1到9
        //ts 使用本地時(shí)間 水位線是本地時(shí)間延遲5s
        tblEnv.executeSql("CREATE TABLE t_goods (\n" +
                " gid INT,\n" +
                " sales INT,\n" +
                " ts AS localtimestamp,\n" +
                " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='1',\n" +
                " 'fields.gid.min'='10',\n" +
                " 'fields.gid.max'='20',\n" +
                " 'fields.sales.min'='1',\n" +
                " 'fields.sales.max'='9'\n" +
                ")");
//           tblEnv.sqlQuery("select * from t_goods").execute().print();
        //使用滾動(dòng)窗口  5s滾動(dòng)計(jì)算一次
//        String tumbleWin="select * from table(tumble(table t_goods,descriptor(ts),interval '5' second))";
//        tblEnv.sqlQuery(tumbleWin).execute().print();
        //每個(gè)時(shí)間窗口中每個(gè)guid中總銷售額信息
        tblEnv.sqlQuery(
                "select window_start,window_end,gid,sum(sales) as sum_sales " +
                        "from table(tumble(table t_goods,descriptor(ts),interval '5' second))" +
                        "group by window_start,window_end,gid"
        ).execute().print();
  • 滑動(dòng)窗口

Hopping windows也稱為"sliding windows"

HOP函數(shù)分配的窗口覆蓋大小間隔內(nèi)的行,并根據(jù)時(shí)間屬性性列移動(dòng)每個(gè)窗口

HOP函數(shù)有三個(gè)必需的參數(shù):HOP(Table data,slide,size[,offset])

  • data:表格值,帶有時(shí)間戳字段的表格。
  • slide:指定順序hopping窗口開始之間的持續(xù)時(shí)間。
  • size:指定hopping窗口寬度的持續(xù)時(shí)間,size必須是slide的整數(shù)倍

  • 案例
  //滑動(dòng)窗口表值函數(shù)  窗口表值函數(shù)
        //隨機(jī)生成gid大小是10到20 sales大小是1到10
       String datagen="create table t_datagen(" +
               "gid int," +
               "sales int," +
               "ts as localtimestamp," +
               "watermark for ts as ts-interval '5' second" +
               ") with (" +
               "'connector'='datagen'," +
               "'rows-per-second'='10'," +
               "'fields.gid.min'='10'," +
               "'fields.gid.max'='20'," +
               "'fields.sales.min'='1'," +
               "'fields.sales.max'='10')";
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.executeSql(datagen);
//        tblEnv.sqlQuery("select * from t_datagen").execute().print();
        //窗口大小是15s 滑動(dòng)3s
         tblEnv.sqlQuery("select gid,sum(sales),window_start,window_end from table(hop(table t_datagen,descriptor(ts),interval '3' second,interval '15' second)) group by window_start,window_end,gid").execute().print();
  • 累積窗口

CUMULATE函數(shù)將元素分配給覆蓋在初始步長(zhǎng)間隔內(nèi)的行,并將每一步擴(kuò)展為多一個(gè)步長(zhǎng)(保持 window start固定),直到最大窗口大小。

可以把cumulative函數(shù)看作應(yīng)用TUMBLE窗口,首先使用最大窗口大小,然后將每個(gè)滾動(dòng)窗口分 割成幾個(gè)具有相同窗口開始和窗口結(jié)束步長(zhǎng)差異的窗口。

因此,累積窗口確實(shí)是重疊的,而且沒有固定的大小。

cumulate函數(shù)有三個(gè)必須的參數(shù):

cumulate(table data,descriptor(timecol),step,size)——必須參數(shù)有以下:

  • data:表格參數(shù),表格必須包含一個(gè)時(shí)間屬性列
  • timecol:時(shí)間屬性字段,也就是使用那個(gè)時(shí)間。
  • step:連續(xù)累積窗口結(jié)束之間增加的窗口大小的持續(xù)時(shí)間。
  • size:累積窗口的最大寬度的持續(xù)時(shí)間。大小必須是步長(zhǎng)的整數(shù)倍。
  • 案例
 //累加窗口大小時(shí)間
        //滑動(dòng)窗口表值函數(shù)  窗口表值函數(shù)
        //隨機(jī)生成gid大小是10到20 sales大小是1到10
        String datagen="create table t_datagen(" +
                "gid int," +
                "sales int," +
                "ts as localtimestamp," +
                "watermark for ts as ts-interval '5' second" +
                ") with (" +
                "'connector'='datagen'," +
                "'rows-per-second'='10'," +
                "'fields.gid.min'='10'," +
                "'fields.gid.max'='20'," +
                "'fields.sales.min'='1'," +
                "'fields.sales.max'='10')";
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
           environment.setParallelism(1);
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.executeSql(datagen);
//每3s計(jì)算一次 并進(jìn)行累加  比如19:45-19:48:10  19:48-19:51:20=>19:45-19:51:30
        tblEnv.sqlQuery("select window_start,window_end,gid,sum(sales) as sales_sum from table(cumulate(table t_datagen,descriptor(ts),interval '3' second,interval '15' second)) group by window_start,window_end,gid").execute().print();
  • 分組去重

group+distinct:表示分組+去重,在用于uv統(tǒng)計(jì)時(shí)就需要!

  • 案例
   //用于網(wǎng)站統(tǒng)計(jì) uv 用戶訪問數(shù)  pv 頁(yè)面訪問數(shù)
        String websiteSQL="create table wbSiteNum(" +
                "gid int," +
                "url string," +
                "ts as localtimestamp," +
                "watermark for ts as ts-interval '5' second" +
                ")with(" +
                "'connector'='datagen'," +
                "'fields.gid.min'='1000'," +
                "'fields.gid.max'='2000'," +
                "'fields.url.length'='10')";
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.executeSql(websiteSQL);
//        tblEnv.sqlQuery("select * from wbSiteNum").execute().print();
      tblEnv.sqlQuery(
              "select count(distinct gid) as uv,count(url) as pv\n" +
                      "from wbSiteNum"
      ).execute().print();

8、FlinkSQL聚合函數(shù)

  • 分組聚合

在SQL中一般所說的聚合,通過一些內(nèi)置的函數(shù)來實(shí)現(xiàn),比如SUM、MAX、MIN、AVG、以及count。

它得特點(diǎn)是對(duì)多條輸入數(shù)據(jù)進(jìn)行計(jì)算,得到一個(gè)唯一的值,屬于多對(duì)一的轉(zhuǎn)換。比如我們可以通過下面的代碼計(jì)算輸入數(shù)據(jù)的個(gè)數(shù)。更多時(shí)候,我們通過group by子句指定分組的鍵,從而對(duì)數(shù)據(jù)按照某個(gè)字段做一個(gè)分組統(tǒng)計(jì)。

  • 案例
 //分組求和
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.sqlQuery("SELECT pid, sum(num) AS total\n" +
                "FROM (VALUES\n" +
                " ('省1','市1','縣1',100),\n" +
                " ('省1','市2','縣2',101),\n" +
                " ('省1','市2','縣1',102),\n" +
                " ('省2','市1','縣4',103),\n" +
                " ('省2','市2','縣1',104),\n" +
                " ('省2','市2','縣1',105),\n" +
                " ('省3','市1','縣1',106),\n" +
                " ('省3','市2','縣1',107),\n" +
                " ('省3','市2','縣2',108),\n" +
                " ('省4','市1','縣1',109),\n" +
                " ('省4','市2','縣1',110))\n" +
                "AS t_person_num(pid, cid, xid,num)\n" +
                "GROUP BY pid;").execute().print();
  • rollup

維度的上卷,字段維度從細(xì)粒度上轉(zhuǎn)變粗粒度!

  //分組求和  rollup(pid,cid,xid)  維度從粗粒度到細(xì)粒度 pid->cid->xid
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.sqlQuery("SELECT pid, sum(num) AS total\n" +
                "FROM (VALUES\n" +
                " ('省1','市1','縣1',100),\n" +
                " ('省1','市2','縣2',101),\n" +
                " ('省1','市2','縣1',102),\n" +
                " ('省2','市1','縣4',103),\n" +
                " ('省2','市2','縣1',104),\n" +
                " ('省2','市2','縣1',105),\n" +
                " ('省3','市1','縣1',106),\n" +
                " ('省3','市2','縣1',107),\n" +
                " ('省3','市2','縣2',108),\n" +
                " ('省4','市1','縣1',109),\n" +
                " ('省4','市2','縣1',110))\n" +
                "AS t_person_num(pid, cid, xid,num)\n" +
                "GROUP BY rollup(pid,cid,xid)").execute().print();
  • cube

所有維度分組顯示,也就是正方體原則!比如(col1,col2,col3)2^3個(gè)維度表示。

tableEnvironment.sqlQuery("SELECT pid, cid, xid, sum(num) AS total\n" +
            "FROM (VALUES\n" +
            " ('省1','市1','縣1',100),\n" +
          " ('省1','市2','縣2',101),\n" +
           " ('省1','市2','縣1',102),\n" +
          " ('省2','市1','縣4',103),\n" +
            " ('省2','市2','縣1',104),\n" +
           " ('省2','市2','縣1',105),\n" +
             " ('省3','市1','縣1',106),\n" +
          " ('省3','市2','縣1',107),\n" +
           " ('省3','市2','縣2',108),\n" +
            " ('省4','市1','縣1',109),\n" +
                 " ('省4','市2','縣1',110))\n" +
           "AS t_person_num(pid, cid, xid, num)\n" +
          "GROUP BY CUBE(pid, cid, xid)").execute().print();
  • grouping Sets

自定義維度分組,以下案例(pid, cid, xid),(pid, cid),(pid), ()自定義四個(gè)維度分組。

  //自定義維度分組 
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.sqlQuery("SELECT pid, sum(num) AS total\n" +
                "FROM (VALUES\n" +
                " ('省1','市1','縣1',100),\n" +
                " ('省1','市2','縣2',101),\n" +
                " ('省1','市2','縣1',102),\n" +
                " ('省2','市1','縣4',103),\n" +
                " ('省2','市2','縣1',104),\n" +
                " ('省2','市2','縣1',105),\n" +
                " ('省3','市1','縣1',106),\n" +
                " ('省3','市2','縣1',107),\n" +
                " ('省3','市2','縣2',108),\n" +
                " ('省4','市1','縣1',109),\n" +
                " ('省4','市2','縣1',110))\n" +
                "AS t_person_num(pid, cid, xid,num)\n" +
                "GROUP BY GROUPING SETS ((pid, cid, xid),(pid, cid),(pid), ())").execute().print();

9、開窗函數(shù)

比如說,我們可以以每一行數(shù)據(jù)為基準(zhǔn),計(jì)算它之前 1 小時(shí)內(nèi)所有數(shù)據(jù)的平均值;也可以計(jì)算它 之前 10 個(gè)數(shù)的平均值。 就好像是在每一行上打開了一扇窗戶、收集數(shù)據(jù)進(jìn)行統(tǒng)計(jì)一樣,這就是所謂的“開窗函數(shù)”。

分組聚合、窗口 TVF聚合都是“多對(duì)一”的關(guān)系,將數(shù)據(jù)分組之后每組只會(huì)得到一個(gè)聚合結(jié)果;

而開窗函數(shù)是對(duì)每行都要做一次開窗聚合,因此聚合之后表中的行數(shù)不會(huì)有任何減少,是一 個(gè)“多對(duì)多”的關(guān)系.

  • 基本語(yǔ)法
SELECT 
<聚合函數(shù)> OVER ( [PARTITION BY <字段 1>[, <字段 2>, ...]] ORDER BY <時(shí)間屬性字段> <開窗范圍>)
, ... 
FROM ...
  • over():關(guān)鍵字前面是一個(gè)聚合函數(shù),它會(huì)應(yīng)用在后面over定義的窗口上,有如下參數(shù):

? 1、partition by(可選)

? 用來指定分區(qū)的鍵,類似于group by的分組,這部分是可選的。

? 2、order by (必選)

? OVER 窗口是基于當(dāng)前行擴(kuò)展出的一段數(shù)據(jù)范圍,選擇的標(biāo)準(zhǔn)可以 基于時(shí)間也可以基于數(shù)量 。

? 在 Flink 的流處理中,目前只支持按照時(shí)間屬性的升序排列,所以這里 ORDER BY 后面 的字段必須是定義好的時(shí)間屬性

? 開窗范圍:

? 1、對(duì)于開窗函數(shù)而言,還有一個(gè)必須要指定的就是開窗的范圍,也就是到底要擴(kuò)展多少行來做聚合。

? 2、這個(gè)范圍是由between<下界>and<上界>來定義,也就是"從下界到上界"的范圍。

? 3、目前上界只能是current row,也就是定義一個(gè)”從之前某一行到當(dāng)前行“的范圍

? 4、開窗選擇的范圍可以基于時(shí)間,也可以基于數(shù)據(jù)的數(shù)量。所以開窗范圍還應(yīng)該在兩種模式之間做出選擇:

  • 行間隔(rows intervals )
  1. 行間隔以rows為前綴,就是直接確定要選多少行,由當(dāng)前行出發(fā)向前選取多少行。
  2. 例如開窗函數(shù)選擇當(dāng)前行之前的5行數(shù)據(jù):ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
  • 范圍間隔(range intervals 以時(shí)間劃分范圍)

    • 范圍間隔:范圍間隔以range為前綴,就是基于order by指定時(shí)間字段去選擇一個(gè)范圍,一般就是當(dāng)前行時(shí)間戳之前的一段時(shí)間。
    • 例如:開窗范圍選擇當(dāng)前行之前1小時(shí)的數(shù)據(jù):RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  • 案例

//執(zhí)行環(huán)境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
        //執(zhí)行SQL
        tableEnvironment.executeSql("CREATE TABLE t_goods (\n" +
                " gid STRING,\n" +
                " type INT,\n" +
                " price INT,\n" +
                " ts AS localtimestamp,\n" +
                " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='1',\n" +
                " 'fields.gid.length'='10',\n" +
                " 'fields.type.min'='1',\n" +
                " 'fields.type.max'='5',\n" +
                " 'fields.price.min'='1',\n" +
                " 'fields.price.max'='9'\n" +
                ")");
        //截止當(dāng)前 前10s的每個(gè)類型的平均價(jià)格
        tableEnvironment.sqlQuery(
                "select tg.*,avg(price) over(partition by type order by ts range between interval '10' second preceding and current row) as price_avg\n" +
                        "from t_goods tg"
        ).execute().print();



        //截止當(dāng)前 前10行的每個(gè)類型商品的平均價(jià)格
        tableEnvironment.sqlQuery(
                "select tg.*,avg(price) over(partition by type order by ts rows between 10 preceding and current row) as price_avg\n" +
                        "from t_goods tg"
        ).execute().print();
  • TopN

在 Flink SQL 中,是通過 OVER 聚合和一個(gè)條件篩選來實(shí)現(xiàn)TopN的。

利用row_number()函數(shù)為每一行數(shù)據(jù)聚合得到一個(gè)排序之后的行號(hào),行號(hào)為row_num,并在外層的查詢中以row_num<=N作為條件進(jìn)行篩選,就可以得到根據(jù)排序字段統(tǒng)計(jì)的topN結(jié)果了。

FlinkSQL專門用over聚合做了優(yōu)化實(shí)現(xiàn),只有在topN的應(yīng)用場(chǎng)景中,over窗口oder by后才可以指定其他排序字段,要實(shí)現(xiàn)top N要嚴(yán)格按照上面格式定義,否則FlinkSQL優(yōu)化器將無法正常解析。而且目前TableApi不支持row_number()函數(shù),只有SQL API實(shí)現(xiàn)TopN方式

SELECT ... FROM ( SELECT ..., 
ROW_NUMBER() OVER ( [PARTITION BY <字段 1>[, <字段 1>...]] ORDER BY <排序字段 1> [asc|desc][, <排序字段 2> [asc|desc]...] ) AS row_num FROM ...) 
WHERE row_num <= N [AND <其它條件>]
  • 案例
 //窗口排序
        String dataGenDemo="create table t_datagen(" +
                "gid string," +
                "price int," +
                "type int," +
                "ts as localtimestamp," +
                "watermark for ts as ts-interval '10' second" +
                ")with(" +
                "'connector'='datagen'," +
                "'fields.gid.length'='10'," +
                "'rows-per-second'='10'," +
                "'fields.price.min'='100'," +
                "'fields.price.max'='999'," +
                "'fields.type.min'='1'," +
                "'fields.type.max'='1')";
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.executeSql(dataGenDemo);
//        tblEnv.sqlQuery("select * from t_datagen").execute().print();
        String topNStr="select * from\n" +
                "(select d.*,row_number() over(partition by type order by price desc) as row_num from\n"+
                "t_datagen d) where row_num<=3";
         tblEnv.sqlQuery(topNStr).execute().print();

=========================================================================================
      //滾動(dòng)窗口每5s滾動(dòng)一次 每種類型排名前3的商品信息
        String topNWin=" select *\n" +
                "         from(\n" +
                "          select *,row_number() over(partition by type order by price desc) as row_num\n" +
                "          from table(tumble(table t_datagen,descriptor(ts),interval '5' second))\n" +
                "          ) where row_num<=3";
        tblEnv.sqlQuery(topNWin).execute().print();
=========================================================================================

  • 窗口TopN
//查詢10秒內(nèi)  每個(gè)窗口銷售總額最高的前三名的種類
        String topNWinSql="select * " +
                "        from(select type,t_price,window_start,window_end,row_number() over(partition by window_start,window_end order by t_price desc) as row_num\n" +
                "        from (\n" +
                "        select type,window_start,window_end,sum(price) as t_price\n" +
                "         from table(tumble(table t_datagen,descriptor(ts),interval '10' second))\n" +
                "         group by type,window_start,window_end\n" +
                "        ))where row_num<=3";
        tblEnv.sqlQuery(topNWinSql).execute().print();

//查詢10秒內(nèi)  每個(gè)種類中銷售總額最高的前三名的商品
        String topNWinSql="select * " +
                "  from (select gid,type,window_start,window_end,row_number() over(partition by window_start,window_end,type,gid order by price desc) as row_num\n" +
                "        from (\n" +
                "        select *\n" +
                "         from table(tumble(table t_datagen,descriptor(ts),interval '10' second))\n" +
                "        ) )" +
                "where row_num<=3";
        tblEnv.sqlQuery(topNWinSql).execute().print();

10、Join窗口聯(lián)結(jié)

與標(biāo)準(zhǔn)SQL一致,F(xiàn)link SQL的常規(guī)聯(lián)結(jié)分為內(nèi)聯(lián)結(jié)(inner join)和外聯(lián)結(jié)(outer join),區(qū)別在于結(jié)果中是否包含不符合聯(lián)結(jié)條件的行。目前僅支持等值條件作為聯(lián)結(jié)條件,也就是關(guān)鍵字ON后面必須是判斷兩表中字段相等的邏輯表達(dá)式

  • 等值內(nèi)聯(lián)結(jié),會(huì)返回兩表中符合聯(lián)接條件的所有行組合(動(dòng)態(tài)表關(guān)聯(lián))
    //生成兩股數(shù)據(jù)流
        String dataStr="create table  dataGen_demo(" +
                "gid string," +
                "type int," +
                "price int," +
                "ts1 as localtimestamp," +
                "watermark for ts1 as ts1-interval '5' second" +
                ") with (" +
                "'connector'='datagen'," +
                "'rows-per-second'='1'," +
                "'fields.gid.length'='10'," +
                "'fields.type.min'='1'," +
                "'fields.type.max'='30'," +
                "'fields.price.min'='100'," +
                "'fields.price.max'='999')";
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        tblEnv.executeSql(dataStr);
//        tblEnv.sqlQuery("select * from dataGen_demo").execute().print();

        String dataStr1="create table  dataGen_demo1(" +
                "type int," +
                "tname string," +
                "price int," +
                "ts2 as localtimestamp," +
                "watermark for ts2 as ts2-interval '5' second" +
                ") with (" +
                "'connector'='datagen'," +
                "'rows-per-second'='1'," +
                "'fields.tname.length'='10'," +
                "'fields.type.kind'='sequence'," +
                "'fields.type.start'='1'," +
                "'fields.type.end'='50'," +
                "'fields.price.min'='300'," +
                "'fields.price.max'='400')";
        tblEnv.executeSql(dataStr1);
        tblEnv.sqlQuery("select * from dataGen_demo inner join dataGen_demo1 on dataGen_demo.type=dataGen_demo1.type").execute().print();
  • 等值外聯(lián)結(jié)

left join: 左外連接 ,左表數(shù)據(jù)全部顯示,在內(nèi)存等待數(shù)據(jù)匹配,匹配后刪除原來未匹配的數(shù)據(jù)重新顯示。

right join: 右外連接,右表數(shù)據(jù)全部顯示,在內(nèi)存等待數(shù)據(jù)匹配,匹配后刪除原來未匹配的數(shù)據(jù)重新顯示。

full join:不管數(shù)據(jù)是否匹配,左右表的數(shù)據(jù)全部顯示,不管哪個(gè)表在內(nèi)存中匹配到數(shù)據(jù)都先刪除未匹配的數(shù)據(jù),重新顯示已經(jīng)匹配的數(shù)據(jù)。

tblEnv.sqlQuery("select * from dataGen_demo left join dataGen_demo1 on dataGen_demo.type=dataGen_demo1.type").execute().print();

tblEnv.sqlQuery("select * from dataGen_demo full join dataGen_demo1 on dataGen_demo.type=dataGen_demo1.type").execute().print();
  • 時(shí)間間隔聯(lián)接查詢

兩條流的join對(duì)應(yīng)著SQL中兩個(gè)表的join,是流處理中特有的聯(lián)結(jié)方式。

目前 Flink SQL 還不支持窗口聯(lián)結(jié),而間隔聯(lián)結(jié)則已經(jīng)實(shí)現(xiàn),這里除了符合約束條件的兩條中數(shù)據(jù)的笛卡爾積,多了一個(gè)時(shí)間間隔的限制。

具體語(yǔ)法:間隔聯(lián)結(jié)不需要用join關(guān)鍵字,直接在from后將聯(lián)結(jié)兩表列出來的就可以,用逗號(hào)分割。聯(lián)結(jié)條件用where子句來定義,用一個(gè)等值表達(dá)式描述。交叉聯(lián)結(jié)之后用where進(jìn)行條件篩選,效果跟內(nèi)聯(lián)結(jié)inner join... on ... 非常類似,我們可以在where子句中,聯(lián)結(jié)條件后用and追加一個(gè)時(shí)間間隔的限制條件。

 String dataStr1="create table  dataGen_demo1(" +
                "type int," +
                "tname string," +
                "price int," +
                "ts2 as localtimestamp," +
                "watermark for ts2 as ts2-interval '5' second" +
                ") with (" +
                "'connector'='datagen'," +
                "'rows-per-second'='1'," +
                "'fields.tname.length'='10'," +
                "'fields.type.kind'='sequence'," +
                "'fields.type.start'='1'," +
                "'fields.type.end'='50'," +
                "'fields.price.min'='300'," +
                "'fields.price.max'='400')";
        tblEnv.executeSql(dataStr1);
        tblEnv.sqlQuery("select * from dataGen_demo d,dataGen_demo1 g where d.type=g.type and d.ts1 between g.ts2-interval '5' second and g.ts2+interval '5' second").execute().print();

11、FlinkSQL Client

Flink提供了SQL Client,有了它我們可以向hive的beeline一樣直接在控制臺(tái)編寫SQL并提交作業(yè)。

Flink SQL client支持運(yùn)行在standalone集群和yarn集群上。提交任務(wù)的命令有所不同。

  • Standalone集群(普通模式啟動(dòng))
##啟動(dòng)集群、前提已經(jīng)配置好flink環(huán)境變量
start-cluster.sh
##啟動(dòng)客戶端
sql-client.sh embedded
  • Yarn集群

前提要開啟hadoop-yarn大數(shù)據(jù)架構(gòu)。

flink每次啟動(dòng)yarn-session,都會(huì)創(chuàng)建一個(gè)/temp/.yarn-properties-root文件,記錄了最近一次提交的yarn session對(duì)應(yīng)的Application ID。注意:?jiǎn)?dòng)Yarn Session和SQL client必須使用相同的用戶。

##啟動(dòng)YarnSession模式 前提已經(jīng)配置好flink環(huán)境變量
yarn-session.sh -n 3 -jm 1024 -tm 1024

##啟動(dòng)客戶端  必須與上面命令在同一個(gè)服務(wù)器節(jié)點(diǎn)上
sql-client.sh embedded -s yarn-session

## 客戶端控制臺(tái)測(cè)試
select 'hello word';  #測(cè)試連接是否成功
SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;  #測(cè)試數(shù)據(jù)

# client界面執(zhí)行下面命令
# 在專門的界面展示,使用分頁(yè)table格式。可按照界面下方說明,使用快捷鍵前后翻頁(yè)和退出到SQL命令行
SET sql-client.execution.result-mode = table;
# changelog格式展示,可展示數(shù)據(jù)增(I)刪(D)改(U)
SET sql-client.execution.result-mode = changelog;
# 接近傳統(tǒng)數(shù)據(jù)庫(kù)的展示方式,不使用專門界面
SET sql-client.execution.result-mode = tableau;
  • 安裝依賴

如果運(yùn)行sql client時(shí),需要使用第三方依賴包時(shí),就需要將項(xiàng)目中用到的依賴放入flink安裝位置的lib目錄下

例如:flink-connector-kafka_2.11-1.13.2.jar: 讀寫Kafka支持。

12、FlinkSQL 官方文檔

  • Table API

https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/tableapi/

Table API 是批處理和流處理的統(tǒng)一的關(guān)系型 API。Table API 的查詢不需要修改代碼就可以采用批 輸入或流輸入來運(yùn)行。Table API 是 SQL 語(yǔ)言的超集,并且是針對(duì) Apache Flink 專門設(shè)計(jì)的。 Table API 集成了 Scala,Java 和 Python 語(yǔ)言的 API。Table API 的查詢是使用 Java,Scala 或 Python 語(yǔ)言嵌入的風(fēng)格定義的,有諸如自動(dòng)補(bǔ)全和語(yǔ)法校驗(yàn)的 IDE 支持,而不是像普通 SQL 一樣 使用字符串類型的值來指定查詢。

  • SQL API

https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sql/overview/

Flink 所支持的 SQL 語(yǔ)言,包括數(shù)據(jù)定義語(yǔ)言(Data Definition Language,DDL)、數(shù)據(jù)操縱語(yǔ)(Data Manipulation Language,DML)以及查詢語(yǔ)言。Flink 對(duì) SQL 的支持基于實(shí)現(xiàn)了 SQL 標(biāo)準(zhǔn)的 Apache Calcite。

13、FlinkSQL函數(shù)

SQL中,我們可以把一些數(shù)據(jù)的轉(zhuǎn)換操作包裝起來,嵌入到SQL查詢中統(tǒng)一調(diào)用,這是函數(shù)。

Flink的Table API和SQL同樣提供了函數(shù)的功能。兩者在調(diào)用時(shí)略有不同:

  • Table API中的函數(shù)是通過數(shù)據(jù)對(duì)象的方法調(diào)用來實(shí)現(xiàn)的
  • SQL則是直接引用函數(shù)名稱,傳入數(shù)據(jù)作為參數(shù)。
  • Table API是內(nèi)嵌在java語(yǔ)言中,很多方法需要在類中額外添加,目前支持的函數(shù)比較少。

官方文檔:https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/functions/overview/

  • 函數(shù)類型

Flink 中的函數(shù)有兩個(gè)劃分標(biāo)準(zhǔn):

  • 一個(gè)劃分標(biāo)準(zhǔn)是:系統(tǒng)函數(shù)和catalog函數(shù)。
  • 一個(gè)劃分標(biāo)準(zhǔn)是臨時(shí)函數(shù)和持久函數(shù)
  • 因此提供了4種函數(shù):臨時(shí)性系統(tǒng)函數(shù)、系統(tǒng)函數(shù)、臨時(shí)性catalog函數(shù)、catalog函數(shù)

flink中可以通過精確、模糊兩種引用方式引用函數(shù):精確函數(shù)允許用戶跨catalog、數(shù)據(jù)庫(kù),也就是指定catalog和database函數(shù);模糊函數(shù)不用指定catalog和database使用默認(rèn)catalog和database。

  • 系統(tǒng)函數(shù)

系統(tǒng)函數(shù)(System Functions)也叫內(nèi)置函數(shù)(Built-in Functions),是在系統(tǒng)中預(yù)先實(shí)現(xiàn)好的 功能模塊。可以通過固定的函數(shù)名直接調(diào)用,實(shí)現(xiàn)想要的轉(zhuǎn)換操作。又分為兩大類:標(biāo)量函數(shù)和聚合函數(shù)。

函數(shù)分類:標(biāo)量函數(shù)、聚合函數(shù)、時(shí)間間隔單位和時(shí)間點(diǎn)標(biāo)識(shí)符、列函數(shù)

  • 標(biāo)量函數(shù):

  • 自定義函數(shù)

Flink 的 Table API 和 SQL 提供了多種自定義函數(shù)的接口,以抽象類的形式定義。

當(dāng)前UDF主要有以下幾類:

  • 標(biāo)量函數(shù):將輸入的標(biāo)量值轉(zhuǎn)換成一個(gè)新的標(biāo)量值
  • 表函數(shù):將標(biāo)量值轉(zhuǎn)換成一個(gè)或多個(gè)新的行數(shù)據(jù),也就是擴(kuò)展成一個(gè)表。
  • 聚合函數(shù):將多行數(shù)據(jù)里的標(biāo)量值轉(zhuǎn)換成一個(gè)新的標(biāo)量值。
  • 表聚合函數(shù):將多行數(shù)據(jù)里的標(biāo)量值轉(zhuǎn)換成一個(gè)或多個(gè)新的 行數(shù)據(jù)。
  • UDF標(biāo)量函數(shù)

自定義方式:需要自定義一個(gè)類來繼承抽象類 ScalarFunction,并實(shí)現(xiàn)叫作 eval() 的求值方法。

標(biāo)量函數(shù)的行為就取決于求值方法的定義,它必須是公有的(public),而且名字必須是 eval

求值方法 eval 可以重載多次,任何數(shù)據(jù)類型都可作為求值方法的參數(shù)和返回值類型,寫完后將類注冊(cè)到表環(huán)境就可以直接在SQL中調(diào)用了。

  • 代碼實(shí)現(xiàn)
import org.apache.flink.table.functions.ScalarFunction;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-13 21:34
 */
//自定義標(biāo)量函數(shù)
public class ScalarUDFDemo extends ScalarFunction {
    // 接受任意類型輸入,返回 INT 型輸出 必須使用公共權(quán)限的eval方法
    public String eval(String input) {
        //字符串連接字符串長(zhǎng)度
        return input.concat(String.valueOf(input.length()));
    }

}

 //創(chuàng)建模擬數(shù)據(jù)
        //執(zhí)行環(huán)境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //執(zhí)行SQL
        tableEnvironment.executeSql("CREATE TABLE t_datagen (\n" +
                " f_sequence INT,\n" +
                " f_random INT,\n" +
                " f_random_str STRING,\n" +
                " ts AS localtimestamp,\n" +
                " WATERMARK FOR ts AS ts\n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='1',\n" +
                " 'fields.f_sequence.kind'='sequence',\n" +
                " 'fields.f_sequence.start'='1',\n" +
                " 'fields.f_sequence.end'='1000',\n" +
                " 'fields.f_random.min'='1',\n" +
                " 'fields.f_random.max'='1000',\n" +
                " 'fields.f_random_str.length'='10'\n" +
                ")");
//         tableEnvironment.sqlQuery("select * from t_datagen").execute().print();
       //使用Table API 直接內(nèi)嵌函數(shù)執(zhí)行  第一種方式
//        tableEnvironment.from("t_datagen").select(call(ScalarUDFDemo.class, $("f_random_str"))).execute().print();
       //第二種方式
        tableEnvironment.createTemporarySystemFunction("sfsl",ScalarUDFDemo.class);
        tableEnvironment.sqlQuery("select sfsl(f_random_str) from t_datagen").execute().print();
  • UDF表值函數(shù)

自定義方式:

要實(shí)現(xiàn)自定義的表函數(shù),需要自定義類來繼承抽象類 TableFunction,內(nèi)部必須要實(shí)現(xiàn)的也 是一個(gè)名為 eval 的求值方法

與標(biāo)量函數(shù)不同的是,TableFunction 類本身是有一個(gè)泛型參數(shù)T 的,這就是表函數(shù)返回?cái)?shù)據(jù)的類型。

eval()方法沒有返回類型,內(nèi)部也沒有 return語(yǔ)句,是通過調(diào)用 collect()方法來發(fā)送想要 輸出的行數(shù)據(jù)的

  • 數(shù)據(jù)
1,尋夢(mèng)環(huán)游記,喜劇:8_動(dòng)畫:7_冒險(xiǎn):3_音樂:9_家庭:6
2,至愛梵高,劇情:8_傳記:7_動(dòng)畫:3
3,小丑回魂,劇情:6_兒童:7_恐怖:9
  • 案例代碼
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-13 21:53
 */

/**
 * Row<type STRING,score INT> 輸出字段名type、score 數(shù)據(jù)類型分別是STRING、INT
 */
@FunctionHint(output = @DataTypeHint("Row<type STRING,score INT>"))
public class UDFTableFunction extends TableFunction<Row> {
    //輸入數(shù)據(jù)類型是字符串

    /**
     * 喜劇:8_動(dòng)畫:7_冒險(xiǎn):3_音樂:9_家庭:6
     * @param line
     */
    public void eval(String line){
        String[] split = line.split("_");
        for (String s : split) {
            String[] v = s.split(":");
            collect(Row.of(v[0],Integer.parseInt(v[1])));
        }
    }
}


=========================================================================================

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
        //使用FileSystem讀取文件
        String fs="create table t_movie(" +
                "id int," +
                "name string," +
                "types string" +
                ") with (" +
                "'connector'='filesystem'," +
                "'path'='data/movie.txt'," +
                "'format'='csv')";
        //sql讀取數(shù)據(jù)
        tblEnv.executeSql(fs);
//        tblEnv.sqlQuery("select * from t_movie").execute().print();
        //Table API
           tblEnv
                 .from("t_movie")
                 .joinLateral(call(UDFTableFunction.class, $("types")).as("type", "score"))
                 .select($("id"),$("name"),$("type"),$("score"))
                 .execute()
                 .print();
           //SQL API
        tblEnv.createTemporarySystemFunction("tbl_f",UDFTableFunction.class);
        tblEnv.sqlQuery("select id,name,type,score from t_movie ,lateral table(tbl_f(types))").execute().print();

  • UDF聚合函數(shù)

自定義方式:

  • 自定義聚合函數(shù)需要繼承抽象類 AggregateFunction。
  • AggregateFunction 有兩個(gè)泛型參數(shù),T 表示聚合輸出的結(jié)果類型,ACC 則表示聚 合的中間狀態(tài)類型。
  • 每個(gè) AggregateFunction 都 必須 實(shí)現(xiàn)以下幾個(gè)方法:
  • createAccumulator():這是創(chuàng)建累加器的方法。沒有輸入?yún)?shù),返回類型為累加器類型 ACC
  • accumulate(): 這是進(jìn)行聚合計(jì)算的核心方法,每來一行數(shù)據(jù)都會(huì)調(diào)用。它的第一個(gè)參數(shù)是確定 的,就是當(dāng)前的累加器,類型為 ACC,表示當(dāng)前聚合的中間狀態(tài);
  • getValue():這是得到最終返回結(jié)果的方法。輸入?yún)?shù)是 ACC 類型的累加器,輸出類型為 T。 在遇到復(fù)雜類型時(shí),F(xiàn)link 的類型推導(dǎo)可能會(huì)無法得到正確的結(jié)果。
  • 代碼實(shí)現(xiàn)
package com.zwf.udf;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.AggregateFunction;
/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-13 22:31
 */

/**
 * AggregateFunction<Double, Tuple2<Integer,Integer>> 輸出類型是Double  中間狀態(tài)類型是Tuple2<Integer,Integer>
 *     必須要實(shí)現(xiàn)getValue()  createAccumulator() accumulate() 三個(gè)方法
 */
public class UDFAggregationDemo extends AggregateFunction<Double, Tuple2<Integer,Integer>> {

    /**
     * 輸出的函數(shù)邏輯代碼
     * @param integerIntegerTuple2
     * @return
     */
    @Override
    public Double getValue(Tuple2<Integer, Integer> integerIntegerTuple2) {
        if (integerIntegerTuple2.f0==0){
            return 0.0;
        }
        return integerIntegerTuple2.f0*1.0/integerIntegerTuple2.f1;
    }

    /**
     *
     * @return 初始化中間狀態(tài)值
     */
    @Override
    public Tuple2<Integer, Integer> createAccumulator() {
        return Tuple2.of(0,0);
    }
     //輸入類型是兩個(gè)int類型數(shù)據(jù)

    /**
     *   如果不加 @FunctionHint(input = {@DataTypeHint("INT"), @DataTypeHint("INT")})注解
     *   傳入的字段數(shù)據(jù)類必須有not null的約束
     * @param acc
     * @param weight
     * @param price
     */
     @FunctionHint(input = {@DataTypeHint("INT"), @DataTypeHint("INT")})
    public void accumulate(Tuple2<Integer,Integer> acc ,Integer weight,Integer price){
        acc.f0+=weight*price;
        acc.f1+=weight;
    }

}

========================================================================================

package com.zwf.flinkSQL;

import com.zwf.udf.UDFAggregationDemo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-13 22:43
 */
public class UDFDemo3 {
    public static void main(String[] args) {
        //執(zhí)行環(huán)境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //執(zhí)行SQL
        tableEnvironment.executeSql("CREATE TABLE t_order (\n" +
                " id INT,\n" +
                " type INT,\n" +
                " weight INT,\n" +
                " price INT\n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='1',\n" +
                " 'fields.id.kind'='sequence',\n" +
                " 'fields.id.start'='1',\n" +
                " 'fields.id.end'='1000',\n" +
                " 'fields.type.min'='1',\n" +
                " 'fields.type.max'='3',\n" +
                " 'fields.weight.min'='10',\n" +
                " 'fields.weight.max'='20',\n" +
                " 'fields.price.min'='100',\n" +
                " 'fields.price.max'='200'\n" +
                ")");
        tableEnvironment.createTemporarySystemFunction("aggre", UDFAggregationDemo.class);
        tableEnvironment.sqlQuery("select type,aggre(weight,price) from t_order group by type").execute().print();

    }

}

  • UDF表值聚合函數(shù)

用戶自定義表聚合函數(shù)(UDTAGG)可以把一行或多行數(shù)據(jù)(也就是一個(gè)表)聚合成另一張表,結(jié)果表中可以有多行多列

自定義方式:

  • createAccumulator():創(chuàng)建累加器的方法,與 AggregateFunction 中用法相同
  • accumulate():聚合計(jì)算的核心方法,與 AggregateFunction 中用法相同
  • emitValue():所有輸入行處理完成后,輸出最終計(jì)算結(jié)果的方法。這個(gè)方法對(duì)應(yīng)著 AggregateFunction中的 getValue()方法;區(qū)別在于 emitValue 沒 有輸出類型,而輸入?yún)?shù)有兩個(gè):第一個(gè)是 ACC類型的累加器 第二個(gè)則是用于輸出數(shù)據(jù)的“收集器”out,它的類型為 Collect。
  • 代碼
package com.zwf.udf;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-13 22:56
 */

/**
 * TableAggregateFunction<out,acc>: out 輸出類型  acc中間值類型
 */
public class TableAggregateUDF extends TableAggregateFunction<String, Tuple3<Integer,Integer,Boolean>> {
    /**
     * 初始化中間值
     * @return
     */
    @Override
    public Tuple3<Integer, Integer, Boolean> createAccumulator() {
        return Tuple3.of(0,0,false);
    }

    /**
     *
     * @param acc 中間值
     * @param price 輸入值
     */
    public void accumulate(Tuple3<Integer,Integer,Boolean> acc,Integer price){
        if(price>acc.f0){
            acc.f0=price;
            acc.f1=acc.f0;
            acc.f2=true;
        }else if (price>acc.f1){
          acc.f1=price;
          acc.f2=true;
        }else {
            acc.f2=false;
        }

    }

    /**
     *
     * @param acc 中間值
     * @param out 輸出集合
     */
    public void emitValue(Tuple3<Integer, Integer, Boolean> acc, Collector<String> out){
        if(acc.f2){
         acc.f2=false;
         out.collect("First[" + acc.f0 + "]Second[" + acc.f1 + "]");
        }
    }
}

=========================================================================================
    
package com.zwf.flinkSQL;

import com.zwf.udf.TableAggregateUDF;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-13 23:06
 */
public class UDFDemo4 {

    public static void main(String[] args) {
        //執(zhí)行環(huán)境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //執(zhí)行SQL
        tableEnvironment.executeSql("CREATE TABLE t_order (\n" +
                " id INT,\n" +
                " type INT,\n" +
                " price INT\n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='1',\n" +
                " 'fields.id.kind'='sequence',\n" +
                " 'fields.id.start'='1',\n" +
                " 'fields.id.end'='1000',\n" +
                " 'fields.type.min'='1',\n" +
                " 'fields.type.max'='3',\n" +
                " 'fields.price.min'='100',\n" +
                " 'fields.price.max'='200'\n" +
                ")");

        //普通查詢
        // tableEnvironment.sqlQuery("select * from t_order").execute().print();

        // 注冊(cè)函數(shù)
        tableEnvironment.createTemporarySystemFunction("tafop", TableAggregateUDF.class);
        tableEnvironment.sqlQuery("select type,tafop(price) from t_order group by type").execute().print();

    }
}

14、FlinkSQL CDC

CDC,Change Data Capture變動(dòng)數(shù)據(jù)獲取的簡(jiǎn)稱,使用CDC從數(shù)據(jù)庫(kù)獲取已提交的更改并將這些更改發(fā)送到下游,供下游使用。

  • Flink CDC

在以前的數(shù)據(jù)同步中,如果想實(shí)時(shí)獲取數(shù)據(jù)庫(kù)的數(shù)據(jù),一般采用架構(gòu)就是采用第三方工具,比如canal、debezium等,實(shí)時(shí)采集數(shù)據(jù)庫(kù)的變更日志,然后將數(shù)據(jù)發(fā)送到kafka消息隊(duì)列,最后通過其他組件、比如flink、spark等消費(fèi)kafka中的數(shù)據(jù),計(jì)算之后發(fā)送到下游系統(tǒng)。

新架構(gòu)下flink直接消費(fèi)數(shù)據(jù)庫(kù)的增量日志,替代了原來的數(shù)據(jù)采集層,然后直接對(duì)數(shù)據(jù)進(jìn)行計(jì)算, 最后將計(jì)算結(jié)果發(fā)送到下游.

工作原理:?jiǎn)?dòng)MySQL CDC源時(shí),它將獲取一個(gè)全局讀取鎖(FLUSH TABLES WITH READ LOCK),該 鎖將阻止其他數(shù)據(jù)庫(kù)的寫入。然后,它讀取當(dāng)前binlog位置以及數(shù)據(jù)庫(kù)和表的schema之后, 將釋放 全局讀取鎖。然后,它掃描數(shù)據(jù)庫(kù)表并從先前記錄的位置讀取binlog。Flink將定期執(zhí) 行checkpoints以記錄binlog位置。如果發(fā)生故障,作業(yè)將重新啟動(dòng)并從checkpoint完成的 binlog位置恢復(fù)。因此,它保證了僅一次的語(yǔ)義。

優(yōu)點(diǎn):開箱即用,簡(jiǎn)單易上手 減少維護(hù)的組件,簡(jiǎn)化實(shí)時(shí)鏈路,減輕部署成本 減小端到端延遲

  • ChangeLOg

Flink SQL 內(nèi)部支持了完整的 changelog 機(jī)制,所以 Flink 對(duì)接 CDC 數(shù)據(jù)只需要把CDC 數(shù)據(jù)轉(zhuǎn)換成 Flink 認(rèn)識(shí)的數(shù)據(jù),以便更好支持和集成 CDC

重構(gòu)后的 TableSource 輸出的都是 RowData 數(shù)據(jù)結(jié)構(gòu),代表了一行的數(shù)據(jù)。在RowData 上面會(huì) 有一個(gè)元數(shù)據(jù)的信息,我們稱為 RowKind.

RowKind 里面包括了插入、更新前、更新后、刪除,這樣和數(shù)據(jù)庫(kù)里面的 binlog 概念十分類似。

通過 Debezium 采集的 JSON 格式,包含了舊數(shù)據(jù)和新數(shù)據(jù)行以及原數(shù)據(jù)信息,對(duì)接 Debezium JSON 的數(shù)據(jù),其實(shí)就是將這種原始的 JSON 數(shù)據(jù)轉(zhuǎn)換成 Flink 認(rèn)識(shí)的 RowData。

  • mysql CDC

官方文檔:https://github.com/ververica/flink-cdc-connectors

mysql數(shù)據(jù)庫(kù)的數(shù)據(jù)新增或者修改,將實(shí)時(shí)獲取到flink上進(jìn)行計(jì)算處理并傳輸?shù)较掠危?/mark>

目前支持的數(shù)據(jù)庫(kù)有以下:

  • Mysql修改配置文件 (vim /etc/my.cnf)
# 服務(wù)器ID
server_id=12345
log_bin=/var/lib/mysql/mysql-bin
expire_logs_days=7
# 必須為ROW
binlog_format=ROW
binlog_cache_size=16M
max_binlog_size=100M
max_binlog_cache_size=256M
relay_log_recovery=1
# 必須為FULL,MySQL-5.7后才有該參數(shù)
binlog_row_image=FULL
expire_logs_days=30
binlog_do_db=scott
  • 創(chuàng)建數(shù)據(jù)庫(kù)表
DROP TABLE IF EXISTS `dept`;
CREATE TABLE `dept` (
`deptno` int(11) NOT NULL,
`dname` varchar(255) DEFAULT NULL,
`loc` varchar(255) DEFAULT NULL,
PRIMARY KEY (`deptno`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
--代碼運(yùn)行之后再開始插入數(shù)據(jù)
INSERT INTO `dept` VALUES ('10', 'ACCOUNTING', 'NEW YORK');
INSERT INTO `dept` VALUES ('20', 'RESEARCH', 'DALLAS');
INSERT INTO `dept` VALUES ('30', 'SALES', 'CHICAGO');
INSERT INTO `dept` VALUES ('40', 'OPERATIONS', 'BOSTON');
  • pom.xml
<!-- Flink CDC 的依賴 -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<!--驅(qū)動(dòng)包版本必須是8.0.27及其以上版本-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>

  • 代碼實(shí)現(xiàn)
 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //創(chuàng)建表
        tableEnvironment.executeSql("CREATE TABLE flink_cdc_dept (\n" +
                "     deptno INT,\n" +
                "     dname STRING,\n" +
                "     loc STRING,\n" +
                "     PRIMARY KEY(deptno) NOT ENFORCED\n" +
                "     ) WITH (\n" +
                "     'connector' = 'mysql-cdc',\n" +
                "     'hostname' = '192.168.147.120',\n" +
                "     'port' = '3306',\n" +
                "     'username' = 'root',\n" +
                "     'password' = 'Root@123456.',\n" +
                "     'database-name' = 'scott',\n" +
                "     'table-name' = 'dept')");

        //簡(jiǎn)單查詢
        tableEnvironment.sqlQuery("select * from flink_cdc_dept").execute().print();

Catalog 提供了元數(shù)據(jù)信息,例如數(shù)據(jù)庫(kù)、表、分區(qū)、視圖以及數(shù)據(jù)庫(kù)或其他外部系統(tǒng)中存儲(chǔ)的函 數(shù)和信息。

元數(shù)據(jù)可以是臨時(shí)的,例如臨時(shí)表、或者通過 TableEnvironment 注冊(cè)的 UDF。 元數(shù)據(jù)也可以是持久化的,例如 Hive Metastore 中的元數(shù)據(jù)。 Catalog 提供了一個(gè)統(tǒng)一的API,用于管理元數(shù)據(jù),并使其可以從 Table API 和 SQL 查詢語(yǔ)句中來 訪問。

GenericInMemoryCatalog: 基于內(nèi)存實(shí)現(xiàn),所有元數(shù)據(jù)只在session聲明周期可用

JdbcCatalog:將flink通過jdbc協(xié)議連接到關(guān)系數(shù)據(jù)庫(kù)。Postgres Catalog 和 MySQL Catalog 是目前 JDBC Catalog 僅有的兩種實(shí)現(xiàn)

HiveCatalog:作為原生 Flink 元數(shù)據(jù)的持久化存儲(chǔ),以及作為讀寫現(xiàn)有 Hive 元數(shù)據(jù)的接口

用戶自定義Catalog:編寫類實(shí)現(xiàn)對(duì)應(yīng)的 CatalogFactory 接口來自定義開發(fā)Catalog

  • 連接hive集群

將flink catalog中的元數(shù)據(jù)信息持久化存儲(chǔ)到hive metastore對(duì)應(yīng)的元數(shù)據(jù)庫(kù)中,flink打通hive集成,如同使用spark SQL或者impala操作hive中的數(shù)據(jù)一樣,直接使用flink直接讀寫hive中的表。

  • pom.xml
<!-- Flink On Hive-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>1.15.2</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
  • 連接hive寄去哪
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir = "/opt/hive-conf";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
//獲取hive中元數(shù)據(jù)注冊(cè)flink中的catalog。
tableEnv.registerCatalog("myhive", hive);
// set the HiveCatalog as the current catalog of the session
//使用hive中的catalog
tableEnv.useCatalog("myhive") 
  • 相關(guān)配置參數(shù)參考:

flink提供了兩種優(yōu)化器:

  • RBO(基于規(guī)則的優(yōu)化器)
  • CBO(基于成本的優(yōu)化器)

優(yōu)化方案:

  • 基于 Apache Calcite 的子查詢解相關(guān)
  • 投影下推(Projection Pushdown)
  • 分區(qū)剪裁(Partition Prune)
  • 謂詞下推(Predicate Pushdown)
  • 常量折疊(Constant Folding)
  • 子計(jì)劃消除重復(fù)數(shù)據(jù)以避免重復(fù)計(jì)算
  • 特殊子查詢重寫:使用left semi-joins left anti-join
  • 可選 join 重新排序: 通過 table.optimizer.join-reorder-enabled 啟用

優(yōu)化器不僅基于計(jì)劃,而且還基于可從數(shù)據(jù)源獲得的豐富統(tǒng)計(jì)信息以及每個(gè)算子(例如 io,cpu, 網(wǎng)絡(luò)和內(nèi)存)的細(xì)粒度成本來做出明智的決策。

  • 常量折疊(常量替換)

常量折疊:對(duì)sql中的常量的加減乘除等操作進(jìn)行預(yù)計(jì)算,避免執(zhí)行過程頻繁對(duì)常量重復(fù)執(zhí)行加減 乘除計(jì)算: 折疊前:1+2+t1.value;折疊后:3+t1.value.

  • 謂詞下推

在from數(shù)據(jù)源中過濾出重要數(shù)據(jù),降低了數(shù)據(jù)的掃描范圍,提升了數(shù)據(jù)庫(kù)查詢的效率!

  • 投影下推(列裁剪)

投影下推:可以用來避免加載不需要的字段,只需要查詢出需要查詢的數(shù)據(jù)庫(kù)字段。由于SQL中沒用到,加載多余字段就是浪費(fèi),所以將project操作下推執(zhí)行,就不需要加載無 用字段。而且此時(shí)假如是列存儲(chǔ),只需要加載指定的列,優(yōu)化更大。

  • Hash Join

兩表進(jìn)行join時(shí),先把大表中的重要數(shù)據(jù)過濾出來變成小表,然后通過sortmergejoin, hashjoin, boradcasthashjoin,把表中數(shù)據(jù)過濾后再進(jìn)行join,減少笛卡爾積值。

  • Transformation Tree

  • 性能調(diào)整

MiniBatch 聚合:MiniBatch 聚合的核心思想是將一組輸入的數(shù)據(jù)緩存在聚合算子內(nèi)部的緩沖區(qū)中。當(dāng)輸入的數(shù)據(jù)被觸發(fā)處理時(shí),每個(gè) key 只需一個(gè)操作即可訪問狀態(tài)。這樣可以大大減少狀態(tài)開銷并獲得更好的吞 吐量。但是,這可能會(huì)增加一些延遲,因?yàn)樗鼤?huì)緩沖一些記錄而不是立即處理它們。這是吞吐量和 延遲之間的權(quán)衡。

  • Local-Global 聚合

Local-Global 聚合是為解決數(shù)據(jù)傾斜問題提出的,通過將一組聚合分為兩個(gè)階段,首先在上游進(jìn)行 本地聚合,然后在下游進(jìn)行全局聚合,類似于 MapReduce 中的 Combine + Reduce 模式。

  • 拆分distinct 聚合

把要去重的字段中的使用hash shuffle打散到不同分區(qū)中進(jìn)行分區(qū),然后進(jìn)行去重字段聚合計(jì)算!

SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day

  • distinct 聚合過濾

使用filter對(duì)去重的字段進(jìn)行過濾,過濾后去重字段值后最后進(jìn)行分組聚合!

SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone'))
AS app_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS
web_uv
FROM T
GROUP BY day

17、SQL時(shí)間日期轉(zhuǎn)換

-- flinksql里面最常用的事情就是時(shí)間格式轉(zhuǎn)換,比如各種時(shí)間格式轉(zhuǎn)換成TIMESTAMP(3).
now() bigint      -- CAST(TO_TIMESTAMP(log_time) as TIMESTAMP(3)) ,log_time=now()
localtimestamp timestamp(3)
timestamp  -- 不帶括號(hào)數(shù)字表示timestamp(6)
now() 1403006911000 bigint -- 毫秒時(shí)間戳數(shù)值 1528257600000

localtimestamp 1636272032500 timestamp(3) -- 毫秒時(shí)間戳
timestamp(3) 1636272032500 -- 毫秒時(shí)間戳
timestamp(9)
timestamp(6)
TIMESTAMP(9) TO_TIMESTAMP(BIGINT time)
TIMESTAMP(9) TO_TIMESTAMP(STRING time)
TIMESTAMP(9) TO_TIMESTAMP(STRING time, STRING format)
BIGINT TIMESTAMP_TO_MS(TIMTSTAMP time)

BIGINT TIMESTAMP_TO_MS(STRING time, STRING format)

TO_DATE(CAST(LOCALTIMESTAMP AS VARCHAR))
FROM_UNIXTIME(TIMESTAMP_TO_MS(localtimestamp)/1000, ‘yyyy-MM-dd HH:mm:ss’) event_time   -- 6點(diǎn)到6點(diǎn)
time_pt as cast(to_timestamp(eventTime - 6 * 3600 * 1000) as TIMESTAMP(3)) -- 偏移6小時(shí)

總結(jié)

以上是生活随笔為你收集整理的FlinkSQL实战开发的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

精品成人av一区二区三区 | 日韩人妻无码一区二区三区久久99 | 亚洲天堂2017无码中文 | 黑人巨大精品欧美黑寡妇 | 亚洲一区二区三区在线观看网站 | 欧美日韩色另类综合 | 亚洲中文字幕成人无码 | 亚洲小说图区综合在线 | 日日摸天天摸爽爽狠狠97 | 荫蒂添的好舒服视频囗交 | 无码人妻丰满熟妇区五十路百度 | 国产精品福利视频导航 | 爱做久久久久久 | 亚洲码国产精品高潮在线 | 成在人线av无码免费 | 一本久道久久综合婷婷五月 | 日本xxxx色视频在线观看免费 | 狠狠cao日日穞夜夜穞av | 免费无码午夜福利片69 | 香蕉久久久久久av成人 | 国产av人人夜夜澡人人爽麻豆 | 久久久精品456亚洲影院 | 国产成人人人97超碰超爽8 | 色情久久久av熟女人妻网站 | 97精品国产97久久久久久免费 | 国内少妇偷人精品视频免费 | 最近中文2019字幕第二页 | 色偷偷人人澡人人爽人人模 | 人人妻人人澡人人爽精品欧美 | 亚洲欧美日韩国产精品一区二区 | 2020久久超碰国产精品最新 | 性史性农村dvd毛片 | 国产美女极度色诱视频www | 色五月五月丁香亚洲综合网 | 精品无码一区二区三区爱欲 | 久久综合给久久狠狠97色 | 中文字幕人妻无码一区二区三区 | 亚洲国产精品美女久久久久 | 嫩b人妻精品一区二区三区 | 日韩无码专区 | 青青青爽视频在线观看 | 免费无码av一区二区 | 国产精品18久久久久久麻辣 | 四十如虎的丰满熟妇啪啪 | 麻豆精品国产精华精华液好用吗 | 无码免费一区二区三区 | 麻豆国产丝袜白领秘书在线观看 | 成人免费视频一区二区 | 人人妻人人澡人人爽欧美一区九九 | 午夜精品久久久久久久 | www国产亚洲精品久久久日本 | 欧美人妻一区二区三区 | 人妻插b视频一区二区三区 | 成人影院yy111111在线观看 | 性开放的女人aaa片 | 在线а√天堂中文官网 | 午夜理论片yy44880影院 | 国产精品亚洲lv粉色 | 国产猛烈高潮尖叫视频免费 | 成人无码视频在线观看网站 | 乱人伦人妻中文字幕无码久久网 | 麻豆国产人妻欲求不满谁演的 | 国内精品九九久久久精品 | 国产精品内射视频免费 | 国产精品亚洲专区无码不卡 | 日本乱人伦片中文三区 | 国产xxx69麻豆国语对白 | 日韩视频 中文字幕 视频一区 | 77777熟女视频在线观看 а天堂中文在线官网 | 日韩欧美中文字幕公布 | 国产精品亚洲综合色区韩国 | 国产高清不卡无码视频 | 人妻少妇精品视频专区 | 无码纯肉视频在线观看 | 亚洲国产精品成人久久蜜臀 | 蜜臀aⅴ国产精品久久久国产老师 | 午夜肉伦伦影院 | 欧美 亚洲 国产 另类 | av人摸人人人澡人人超碰下载 | 亚洲人成网站在线播放942 | 日产国产精品亚洲系列 | 久久久成人毛片无码 | 久精品国产欧美亚洲色aⅴ大片 | 5858s亚洲色大成网站www | 熟妇人妻中文av无码 | 久久久久成人片免费观看蜜芽 | 性生交大片免费看女人按摩摩 | 国产成人一区二区三区在线观看 | 欧美丰满老熟妇xxxxx性 | 亚洲中文字幕乱码av波多ji | 国产精品沙发午睡系列 | 国产熟女一区二区三区四区五区 | 久久www免费人成人片 | 漂亮人妻洗澡被公强 日日躁 | 国产人妻大战黑人第1集 | 少妇愉情理伦片bd | 亚洲精品国产第一综合99久久 | 色婷婷综合中文久久一本 | 丝袜 中出 制服 人妻 美腿 | 极品尤物被啪到呻吟喷水 | 亚洲国产精品毛片av不卡在线 | 狠狠色噜噜狠狠狠狠7777米奇 | 国产内射爽爽大片视频社区在线 | 青青草原综合久久大伊人精品 | 天天躁日日躁狠狠躁免费麻豆 | 男女爱爱好爽视频免费看 | 免费观看的无遮挡av | 亚洲理论电影在线观看 | 麻豆人妻少妇精品无码专区 | 成人亚洲精品久久久久软件 | 精品 日韩 国产 欧美 视频 | 日韩av无码中文无码电影 | 国产精品第一区揄拍无码 | 亚洲国产午夜精品理论片 | 无码人妻丰满熟妇区五十路百度 | 久久国语露脸国产精品电影 | 亚洲精品一区三区三区在线观看 | 伊人色综合久久天天小片 | 亚洲国产精品无码一区二区三区 | 无码人妻丰满熟妇区五十路百度 | 日韩人妻少妇一区二区三区 | 色偷偷人人澡人人爽人人模 | 免费人成网站视频在线观看 | 久久精品国产99精品亚洲 | 又大又紧又粉嫩18p少妇 | 亚洲国产欧美日韩精品一区二区三区 | 一本无码人妻在中文字幕免费 | 欧美性生交活xxxxxdddd | 精品无码av一区二区三区 | 日产精品高潮呻吟av久久 | 黑人大群体交免费视频 | 久久久久成人精品免费播放动漫 | 国产精品久久久久久无码 | 久久综合狠狠综合久久综合88 | 国产区女主播在线观看 | 亚洲男人av香蕉爽爽爽爽 | 国产国语老龄妇女a片 | 日韩欧美群交p片內射中文 | 好男人社区资源 | 人妻夜夜爽天天爽三区 | 国产亚洲精品久久久久久大师 | 中文精品无码中文字幕无码专区 | 人妻少妇被猛烈进入中文字幕 | 亚洲一区av无码专区在线观看 | 精品久久久久久人妻无码中文字幕 | 国产亚洲精品久久久久久国模美 | 国产免费观看黄av片 | 青草视频在线播放 | 久久久av男人的天堂 | 麻豆成人精品国产免费 | 国产农村妇女aaaaa视频 撕开奶罩揉吮奶头视频 | 国产明星裸体无码xxxx视频 | 久久综合久久自在自线精品自 | 精品一区二区三区波多野结衣 | 亚洲精品一区二区三区四区五区 | 又色又爽又黄的美女裸体网站 | 最近中文2019字幕第二页 | 久久精品无码一区二区三区 | 欧美日韩精品 | 99久久婷婷国产综合精品青草免费 | 亚洲大尺度无码无码专区 | 国产熟女一区二区三区四区五区 | 亚洲精品一区二区三区四区五区 | 国产精品va在线观看无码 | 国产精品沙发午睡系列 | 亚洲色偷偷偷综合网 | 国产成人无码av在线影院 | 久久熟妇人妻午夜寂寞影院 | 娇妻被黑人粗大高潮白浆 | 无码精品国产va在线观看dvd | 色婷婷综合激情综在线播放 | 天天拍夜夜添久久精品 | 国产人妻精品一区二区三区 | 国产熟妇高潮叫床视频播放 | 狠狠亚洲超碰狼人久久 | 欧美丰满熟妇xxxx | 少妇久久久久久人妻无码 | 亚洲爆乳精品无码一区二区三区 | 精品久久久中文字幕人妻 | 久久无码专区国产精品s | 亚洲国产精品久久久久久 | 亚洲欧美日韩成人高清在线一区 | 亚洲а∨天堂久久精品2021 | 亚洲国产欧美国产综合一区 | 国产高清av在线播放 | 国产真实夫妇视频 | 国产成人无码专区 | 精品久久久中文字幕人妻 | 大胆欧美熟妇xx | 亚洲综合无码久久精品综合 | 国产成人无码av一区二区 | 麻豆果冻传媒2021精品传媒一区下载 | 99精品国产综合久久久久五月天 | 人人妻人人澡人人爽人人精品浪潮 | 亚洲欧美日韩综合久久久 | 亚洲中文字幕在线无码一区二区 | 日本www一道久久久免费榴莲 | 免费视频欧美无人区码 | 亚洲gv猛男gv无码男同 | 国产人妻大战黑人第1集 | 啦啦啦www在线观看免费视频 | 国产午夜手机精彩视频 | 欧美性生交活xxxxxdddd | 国产欧美亚洲精品a | 人妻夜夜爽天天爽三区 | 国内精品久久久久久中文字幕 | 国产亚洲精品久久久久久国模美 | 国产人妻精品一区二区三区 | 精品久久综合1区2区3区激情 | 欧美熟妇另类久久久久久不卡 | 特大黑人娇小亚洲女 | 久久亚洲精品中文字幕无男同 | 久久国产自偷自偷免费一区调 | 成人女人看片免费视频放人 | 国产精品沙发午睡系列 | 中文字幕av伊人av无码av | 中文字幕无码免费久久9一区9 | 人人妻人人澡人人爽人人精品浪潮 | 国产精品久久久av久久久 | 内射巨臀欧美在线视频 | 成人免费无码大片a毛片 | 人人爽人人澡人人人妻 | 国产福利视频一区二区 | 日本熟妇人妻xxxxx人hd | 日韩人妻无码一区二区三区久久99 | 久久99久久99精品中文字幕 | 人人妻人人澡人人爽欧美一区九九 | 日本熟妇人妻xxxxx人hd | 清纯唯美经典一区二区 | 高中生自慰www网站 | 天海翼激烈高潮到腰振不止 | 国产午夜无码精品免费看 | 少妇激情av一区二区 | 亚洲 激情 小说 另类 欧美 | 久久精品国产大片免费观看 | 无码国产乱人伦偷精品视频 | 奇米影视7777久久精品 | 国产综合色产在线精品 | 麻豆av传媒蜜桃天美传媒 | 亚洲中文字幕无码一久久区 | 国产三级精品三级男人的天堂 | 黑人粗大猛烈进出高潮视频 | 在线观看国产午夜福利片 | 老熟女重囗味hdxx69 | 超碰97人人射妻 | 日产精品99久久久久久 | 久久久久成人精品免费播放动漫 | 波多野结衣av一区二区全免费观看 | 国产在线aaa片一区二区99 | 中文字幕色婷婷在线视频 | 水蜜桃亚洲一二三四在线 | 人妻与老人中文字幕 | 99精品视频在线观看免费 | 又黄又爽又色的视频 | 亚洲 欧美 激情 小说 另类 | 少妇性荡欲午夜性开放视频剧场 | 亚洲 高清 成人 动漫 | 婷婷丁香六月激情综合啪 | 国产精品.xx视频.xxtv | 久久精品国产日本波多野结衣 | 色婷婷综合中文久久一本 | 精品人妻人人做人人爽夜夜爽 | 国产成人无码av在线影院 | 玩弄人妻少妇500系列视频 | 日韩无码专区 | 熟妇女人妻丰满少妇中文字幕 | 99久久精品午夜一区二区 | 日日碰狠狠躁久久躁蜜桃 | 水蜜桃亚洲一二三四在线 | 牲欲强的熟妇农村老妇女视频 | 无码人妻精品一区二区三区不卡 | 久久久精品欧美一区二区免费 | 日日橹狠狠爱欧美视频 | 日本在线高清不卡免费播放 | 久久精品视频在线看15 | 国产一区二区不卡老阿姨 | 欧美黑人巨大xxxxx | 亚洲日韩精品欧美一区二区 | 日本xxxx色视频在线观看免费 | 麻豆国产丝袜白领秘书在线观看 | 国产人妻久久精品二区三区老狼 | 在线观看欧美一区二区三区 | 亚洲国产精品美女久久久久 | av无码不卡在线观看免费 | 亚洲色欲色欲天天天www | 18无码粉嫩小泬无套在线观看 | 日本一区二区三区免费高清 | 嫩b人妻精品一区二区三区 | 色偷偷人人澡人人爽人人模 | 中文字幕精品av一区二区五区 | 人妻夜夜爽天天爽三区 | 国产av无码专区亚洲a∨毛片 | 亚洲精品成人av在线 | 国产人成高清在线视频99最全资源 | 扒开双腿吃奶呻吟做受视频 | 美女扒开屁股让男人桶 | 午夜成人1000部免费视频 | 大地资源中文第3页 | 激情国产av做激情国产爱 | 久久久亚洲欧洲日产国码αv | 国产亚洲精品久久久久久久久动漫 | 日本va欧美va欧美va精品 | 欧美一区二区三区视频在线观看 | 无码人中文字幕 | 18无码粉嫩小泬无套在线观看 | 亚洲中文字幕av在天堂 | 好爽又高潮了毛片免费下载 | 人人妻人人澡人人爽精品欧美 | 亚洲色无码一区二区三区 | 激情内射日本一区二区三区 | 丰满人妻翻云覆雨呻吟视频 | 日韩成人一区二区三区在线观看 | 学生妹亚洲一区二区 | 日本熟妇人妻xxxxx人hd | 美女黄网站人色视频免费国产 | 伦伦影院午夜理论片 | 亚洲国产精品无码一区二区三区 | 领导边摸边吃奶边做爽在线观看 | 东京热男人av天堂 | 扒开双腿吃奶呻吟做受视频 | 精品国产精品久久一区免费式 | 国产亚洲tv在线观看 | 人妻熟女一区 | 亚洲娇小与黑人巨大交 | 帮老师解开蕾丝奶罩吸乳网站 | 欧美人与禽zoz0性伦交 | 国产xxx69麻豆国语对白 | 成年美女黄网站色大免费视频 | 在线a亚洲视频播放在线观看 | 国语自产偷拍精品视频偷 | 少妇太爽了在线观看 | 亚洲国产精品无码一区二区三区 | 亚洲中文字幕久久无码 | 人人爽人人澡人人人妻 | 国产午夜视频在线观看 | 日韩人妻无码中文字幕视频 | 精品偷拍一区二区三区在线看 | 国产成人无码a区在线观看视频app | 宝宝好涨水快流出来免费视频 | 国产综合色产在线精品 | 亚洲欧美日韩国产精品一区二区 | 青春草在线视频免费观看 | 牲欲强的熟妇农村老妇女 | 在教室伦流澡到高潮hnp视频 | 国产人成高清在线视频99最全资源 | 一本久道久久综合婷婷五月 | 一本久久a久久精品vr综合 | 国产莉萝无码av在线播放 | 国产成人一区二区三区在线观看 | 亚洲国产成人a精品不卡在线 | 久久综合九色综合欧美狠狠 | 99在线 | 亚洲 | av无码电影一区二区三区 | 啦啦啦www在线观看免费视频 | 无遮挡啪啪摇乳动态图 | 人妻互换免费中文字幕 | 亚洲另类伦春色综合小说 | 亚洲熟悉妇女xxx妇女av | 天海翼激烈高潮到腰振不止 | 国产精品爱久久久久久久 | 国产做国产爱免费视频 | 高清国产亚洲精品自在久久 | 未满小14洗澡无码视频网站 | 国产午夜精品一区二区三区嫩草 | 日韩欧美中文字幕在线三区 | 人妻有码中文字幕在线 | 人人爽人人爽人人片av亚洲 | 国产精品无码mv在线观看 | 久久久久久国产精品无码下载 | 人人妻人人澡人人爽人人精品 | 鲁大师影院在线观看 | 国产疯狂伦交大片 | 性生交大片免费看l | 高潮毛片无遮挡高清免费 | 欧美国产日产一区二区 | 亚洲成在人网站无码天堂 | 精品 日韩 国产 欧美 视频 | 又大又紧又粉嫩18p少妇 | 成熟人妻av无码专区 | 免费人成在线视频无码 | 九一九色国产 | 宝宝好涨水快流出来免费视频 | 妺妺窝人体色www婷婷 | 免费观看又污又黄的网站 | 精品亚洲韩国一区二区三区 | 伊人久久婷婷五月综合97色 | 疯狂三人交性欧美 | 久久国产精品二国产精品 | 成熟妇人a片免费看网站 | 日韩精品久久久肉伦网站 | 国内精品人妻无码久久久影院 | 夜精品a片一区二区三区无码白浆 | 国产熟妇另类久久久久 | av在线亚洲欧洲日产一区二区 | 久久99精品久久久久久 | 中文字幕 亚洲精品 第1页 | 久久国产36精品色熟妇 | 久精品国产欧美亚洲色aⅴ大片 | 久久精品国产99精品亚洲 | 久久久久久久女国产乱让韩 | 久久久久免费精品国产 | 成人无码视频在线观看网站 | 久久97精品久久久久久久不卡 | 少妇久久久久久人妻无码 | 波多野42部无码喷潮在线 | 少妇厨房愉情理9仑片视频 | 久久精品视频在线看15 | 久久久亚洲欧洲日产国码αv | 色婷婷av一区二区三区之红樱桃 | 久久久久久久久蜜桃 | 又大又硬又黄的免费视频 | 少妇一晚三次一区二区三区 | 麻豆成人精品国产免费 | 无码国产色欲xxxxx视频 | 国产精品第一国产精品 | 在线观看国产午夜福利片 | 国产深夜福利视频在线 | 伊人久久大香线焦av综合影院 | 兔费看少妇性l交大片免费 | 亚洲精品中文字幕乱码 | 亚洲综合在线一区二区三区 | 国精品人妻无码一区二区三区蜜柚 | 精品久久久久久人妻无码中文字幕 | 国产精品美女久久久 | 国产色精品久久人妻 | 日本精品人妻无码77777 天堂一区人妻无码 | 久久99精品久久久久久 | 欧美亚洲国产一区二区三区 | 亚洲天堂2017无码中文 | 久久久国产精品无码免费专区 | 国产极品视觉盛宴 | 成人精品天堂一区二区三区 | 精品熟女少妇av免费观看 | 亚洲国产精品无码一区二区三区 | 精品国产一区二区三区四区 | 欧美成人午夜精品久久久 | 少妇一晚三次一区二区三区 | 亚洲综合伊人久久大杳蕉 | 国产欧美精品一区二区三区 | 精品水蜜桃久久久久久久 | 熟妇人妻无码xxx视频 | 女人高潮内射99精品 | 亚洲精品一区二区三区在线观看 | 国产无遮挡吃胸膜奶免费看 | 成人欧美一区二区三区黑人免费 | 国产国产精品人在线视 | 久久这里只有精品视频9 | 亚洲无人区午夜福利码高清完整版 | 精品国产一区二区三区av 性色 | 欧美性生交活xxxxxdddd | 国产精品爱久久久久久久 | 水蜜桃av无码 | 久久无码专区国产精品s | 色狠狠av一区二区三区 | 亚洲日本一区二区三区在线 | 男女猛烈xx00免费视频试看 | 伦伦影院午夜理论片 | 在线看片无码永久免费视频 | 色婷婷欧美在线播放内射 | 我要看www免费看插插视频 | 免费人成网站视频在线观看 | 国产口爆吞精在线视频 | 亚洲综合色区中文字幕 | 国产精品人人妻人人爽 | 激情五月综合色婷婷一区二区 | 人人妻人人澡人人爽精品欧美 | 久久精品人妻少妇一区二区三区 | yw尤物av无码国产在线观看 | 欧美国产日韩亚洲中文 | 精品偷自拍另类在线观看 | 国产av无码专区亚洲a∨毛片 | 久久伊人色av天堂九九小黄鸭 | 三上悠亚人妻中文字幕在线 | 人妻插b视频一区二区三区 | 欧洲极品少妇 | 玩弄人妻少妇500系列视频 | 久久99国产综合精品 | 久久久www成人免费毛片 | 伊人久久婷婷五月综合97色 | 国产成人综合美国十次 | 国产精品无码永久免费888 | 久久久中文字幕日本无吗 | 天天躁夜夜躁狠狠是什么心态 | 亚洲色成人中文字幕网站 | 成人亚洲精品久久久久 | 青青青爽视频在线观看 | 在线观看国产一区二区三区 | 午夜性刺激在线视频免费 | 夜先锋av资源网站 | 中文字幕无码日韩专区 | 午夜福利不卡在线视频 | 久久午夜无码鲁丝片秋霞 | 久久精品人妻少妇一区二区三区 | 免费无码午夜福利片69 | 日韩精品a片一区二区三区妖精 | 一本久久a久久精品亚洲 | 国产成人综合色在线观看网站 | 国产偷抇久久精品a片69 | 亲嘴扒胸摸屁股激烈网站 | 一本大道伊人av久久综合 | 国产色在线 | 国产 | 久久精品视频在线看15 | 超碰97人人做人人爱少妇 | 亚洲精品久久久久中文第一幕 | 正在播放东北夫妻内射 | 人人妻在人人 | 国产精品欧美成人 | 99久久精品国产一区二区蜜芽 | www一区二区www免费 | 国产成人综合色在线观看网站 | 亚洲a无码综合a国产av中文 | 小泽玛莉亚一区二区视频在线 | 男人扒开女人内裤强吻桶进去 | 天天躁夜夜躁狠狠是什么心态 | 国产两女互慰高潮视频在线观看 | 图片小说视频一区二区 | 激情五月综合色婷婷一区二区 | 搡女人真爽免费视频大全 | 宝宝好涨水快流出来免费视频 | 天天拍夜夜添久久精品大 | 97精品国产97久久久久久免费 | 国产9 9在线 | 中文 | 亚洲综合另类小说色区 | 牲欲强的熟妇农村老妇女视频 | 性欧美videos高清精品 | 人人澡人人透人人爽 | 国产乱人伦偷精品视频 | 亚洲aⅴ无码成人网站国产app | 97久久精品无码一区二区 | 九九久久精品国产免费看小说 | aa片在线观看视频在线播放 | 久久婷婷五月综合色国产香蕉 | 日韩无套无码精品 | 中文无码伦av中文字幕 | 人人妻人人澡人人爽欧美一区九九 | 一本大道伊人av久久综合 | 国产精品.xx视频.xxtv | 亚洲精品中文字幕久久久久 | 国产亚洲人成在线播放 | 性欧美熟妇videofreesex | 久久精品国产大片免费观看 | 久久婷婷五月综合色国产香蕉 | 人妻少妇精品无码专区动漫 | 欧美激情一区二区三区成人 | 丝袜人妻一区二区三区 | 日本一卡2卡3卡四卡精品网站 | 国产在线一区二区三区四区五区 | 欧美丰满熟妇xxxx性ppx人交 | 亚洲国产欧美在线成人 | 兔费看少妇性l交大片免费 | 老熟妇乱子伦牲交视频 | 久久精品无码一区二区三区 | 青青草原综合久久大伊人精品 | 午夜嘿嘿嘿影院 | 日本熟妇浓毛 | 六月丁香婷婷色狠狠久久 | 美女黄网站人色视频免费国产 | 日本爽爽爽爽爽爽在线观看免 | 日本熟妇人妻xxxxx人hd | 人妻与老人中文字幕 | 国产精品办公室沙发 | 国产片av国语在线观看 | 无码吃奶揉捏奶头高潮视频 | 99久久婷婷国产综合精品青草免费 | 久久久久久久人妻无码中文字幕爆 | 亚洲欧美国产精品专区久久 | 亚洲熟妇色xxxxx欧美老妇y | 免费观看又污又黄的网站 | 亚洲欧美精品伊人久久 | 亚洲色欲久久久综合网东京热 | 国产精品久久久久久亚洲影视内衣 | 亚洲乱亚洲乱妇50p | v一区无码内射国产 | 亚洲精品久久久久久久久久久 | 女人色极品影院 | 少妇无码吹潮 | 国产亚洲视频中文字幕97精品 | 中文毛片无遮挡高清免费 | 欧美性猛交内射兽交老熟妇 | 青青久在线视频免费观看 | 正在播放老肥熟妇露脸 | 日韩成人一区二区三区在线观看 | 精品久久久久久人妻无码中文字幕 | 色狠狠av一区二区三区 | 成人亚洲精品久久久久 | 55夜色66夜色国产精品视频 | 亚洲精品久久久久中文第一幕 | 51国偷自产一区二区三区 | 久久综合香蕉国产蜜臀av | 久9re热视频这里只有精品 | 麻豆av传媒蜜桃天美传媒 | 国产日产欧产精品精品app | 青青久在线视频免费观看 | 成人性做爰aaa片免费看 | 丰满少妇弄高潮了www | 色综合久久中文娱乐网 | 黑人巨大精品欧美一区二区 | 国产精品国产自线拍免费软件 | 中国女人内谢69xxxxxa片 | 亚洲区欧美区综合区自拍区 | 人人妻人人澡人人爽欧美精品 | 久久精品国产精品国产精品污 | 欧美猛少妇色xxxxx | 玩弄人妻少妇500系列视频 | 一本色道婷婷久久欧美 | 精品国产一区二区三区av 性色 | 国产精品无码久久av | 国产一区二区三区四区五区加勒比 | 无码午夜成人1000部免费视频 | 中文字幕无码免费久久99 | 人人妻人人藻人人爽欧美一区 | 国产亚洲精品久久久久久大师 | 六月丁香婷婷色狠狠久久 | 久久伊人色av天堂九九小黄鸭 | 亚洲乱亚洲乱妇50p | 亚洲日韩精品欧美一区二区 | 欧美日韩亚洲国产精品 | 黑人大群体交免费视频 | 131美女爱做视频 | 亚洲精品鲁一鲁一区二区三区 | 在线看片无码永久免费视频 | √天堂中文官网8在线 | 欧美丰满老熟妇xxxxx性 | 国产网红无码精品视频 | 国产69精品久久久久app下载 | 麻豆果冻传媒2021精品传媒一区下载 | 精品国产精品久久一区免费式 | 国产无套粉嫩白浆在线 | 亚洲大尺度无码无码专区 | 4hu四虎永久在线观看 | 亚洲日韩乱码中文无码蜜桃臀网站 | 中国大陆精品视频xxxx | 日本成熟视频免费视频 | 成人免费视频在线观看 | 亚洲国产精品美女久久久久 | 亚洲欧洲日本综合aⅴ在线 | 亚洲一区二区三区香蕉 | 樱花草在线社区www | 亚洲色欲久久久综合网东京热 | 乌克兰少妇性做爰 | 一本久道高清无码视频 | 久久久久99精品国产片 | 扒开双腿疯狂进出爽爽爽视频 | 国产欧美精品一区二区三区 | 日本一卡2卡3卡4卡无卡免费网站 国产一区二区三区影院 | 色综合久久中文娱乐网 | 中文字幕 亚洲精品 第1页 | 亚洲中文字幕av在天堂 | 国产成人综合在线女婷五月99播放 | 一区二区三区乱码在线 | 欧洲 | 亚洲日本一区二区三区在线 | 国产精品高潮呻吟av久久 | 亚洲成a人片在线观看日本 | 中文精品无码中文字幕无码专区 | 国产熟妇另类久久久久 | 好男人www社区 | 粉嫩少妇内射浓精videos | 小sao货水好多真紧h无码视频 | 88国产精品欧美一区二区三区 | 人人妻人人澡人人爽欧美一区 | 欧美人与牲动交xxxx | 蜜桃视频插满18在线观看 | 日韩人妻系列无码专区 | 国产精品无码mv在线观看 | 久久久久久久人妻无码中文字幕爆 | 亚洲熟妇色xxxxx欧美老妇 | 国产超级va在线观看视频 | 黑人巨大精品欧美一区二区 | 国产精品-区区久久久狼 | 任你躁国产自任一区二区三区 | 野狼第一精品社区 | 久久精品国产日本波多野结衣 | 亚洲の无码国产の无码影院 | 中文字幕中文有码在线 | 欧美xxxx黑人又粗又长 | 久久熟妇人妻午夜寂寞影院 | av人摸人人人澡人人超碰下载 | 动漫av网站免费观看 | 久久精品丝袜高跟鞋 | 丝袜足控一区二区三区 | 久久久精品人妻久久影视 | 国产一区二区三区日韩精品 | 国产午夜精品一区二区三区嫩草 | 国产艳妇av在线观看果冻传媒 | 国产后入清纯学生妹 | 中文亚洲成a人片在线观看 | 天天拍夜夜添久久精品 | 天堂а√在线中文在线 | 丰满人妻被黑人猛烈进入 | 精品国产青草久久久久福利 | 久久久婷婷五月亚洲97号色 | 无遮挡国产高潮视频免费观看 | 亚洲色偷偷男人的天堂 | 日韩精品a片一区二区三区妖精 | 欧美精品在线观看 | 夜夜影院未满十八勿进 | 131美女爱做视频 | 牲欲强的熟妇农村老妇女视频 | 老司机亚洲精品影院 | 亚洲欧美日韩综合久久久 | 国产无套粉嫩白浆在线 | 任你躁在线精品免费 | 黄网在线观看免费网站 | 日本www一道久久久免费榴莲 | 麻豆精产国品 | 久久午夜夜伦鲁鲁片无码免费 | 性色欲情网站iwww九文堂 | 日本熟妇人妻xxxxx人hd | 日本大香伊一区二区三区 | 欧美老妇与禽交 | 国产 浪潮av性色四虎 | 国产av人人夜夜澡人人爽麻豆 | 蜜臀aⅴ国产精品久久久国产老师 | 欧洲极品少妇 | 真人与拘做受免费视频 | a片在线免费观看 | 亚洲区小说区激情区图片区 | 人人澡人人妻人人爽人人蜜桃 | 一本久久伊人热热精品中文字幕 | 欧美猛少妇色xxxxx | 午夜无码区在线观看 | 美女毛片一区二区三区四区 | 日本高清一区免费中文视频 | 国产精品自产拍在线观看 | 色综合久久久久综合一本到桃花网 | 久久 国产 尿 小便 嘘嘘 | 久久精品国产一区二区三区 | 精品偷自拍另类在线观看 | aa片在线观看视频在线播放 | 国产午夜亚洲精品不卡下载 | 女人和拘做爰正片视频 | 亚洲日韩精品欧美一区二区 | 日韩亚洲欧美精品综合 | 国产av一区二区三区最新精品 | 中文字幕无码免费久久9一区9 | 波多野结衣av一区二区全免费观看 | 久精品国产欧美亚洲色aⅴ大片 | 玩弄中年熟妇正在播放 | 国产又爽又黄又刺激的视频 | а天堂中文在线官网 | 又大又紧又粉嫩18p少妇 | 岛国片人妻三上悠亚 | 无套内谢的新婚少妇国语播放 | 亚洲欧美国产精品久久 | 国产精品对白交换视频 | 亚洲最大成人网站 | 日日夜夜撸啊撸 | 人妻互换免费中文字幕 | 福利一区二区三区视频在线观看 | 精品人妻人人做人人爽夜夜爽 | 小sao货水好多真紧h无码视频 | 少妇无码av无码专区在线观看 | 国内精品一区二区三区不卡 | 国产精品igao视频网 | 纯爱无遮挡h肉动漫在线播放 | 性色欲情网站iwww九文堂 | 亚洲精品无码人妻无码 | 欧美人与善在线com | av香港经典三级级 在线 | 无码纯肉视频在线观看 | 伦伦影院午夜理论片 | 无码任你躁久久久久久久 | 免费人成网站视频在线观看 | 亚洲天堂2017无码 | 中文字幕 人妻熟女 | 国产精品人人爽人人做我的可爱 | 男女爱爱好爽视频免费看 | 精品久久久久久亚洲精品 | 久久久久久a亚洲欧洲av冫 | 欧美日韩色另类综合 | 色窝窝无码一区二区三区色欲 | 国内精品九九久久久精品 | 波多野结衣 黑人 | 免费无码的av片在线观看 | 麻豆蜜桃av蜜臀av色欲av | 少妇高潮喷潮久久久影院 | 欧美日韩色另类综合 | 性欧美牲交在线视频 | 亚洲色无码一区二区三区 | 久久综合香蕉国产蜜臀av | 精品国产福利一区二区 | 久久精品人人做人人综合 | 亚洲男人av香蕉爽爽爽爽 | 熟妇人妻中文av无码 | 人人妻人人澡人人爽欧美一区 | 亚洲大尺度无码无码专区 | 中文字幕无码乱人伦 | 久久久久久av无码免费看大片 | 丁香花在线影院观看在线播放 | 国产亚洲精品久久久ai换 | 国产真实乱对白精彩久久 | 国产成人综合色在线观看网站 | 久久99国产综合精品 | 中文亚洲成a人片在线观看 | 人妻夜夜爽天天爽三区 | 国产精品人人爽人人做我的可爱 | 成熟女人特级毛片www免费 | 亚洲人成影院在线无码按摩店 | 六十路熟妇乱子伦 | 亚洲gv猛男gv无码男同 | 久久综合九色综合97网 | 久久久久成人片免费观看蜜芽 | 国产性猛交╳xxx乱大交 国产精品久久久久久无码 欧洲欧美人成视频在线 | 亚洲国产精品无码久久久久高潮 | 67194成是人免费无码 | 精品无码一区二区三区的天堂 | 亚洲色无码一区二区三区 | 综合人妻久久一区二区精品 | 久久99精品国产麻豆蜜芽 | 国产香蕉尹人视频在线 | 亚洲精品一区二区三区婷婷月 | 午夜不卡av免费 一本久久a久久精品vr综合 | 好男人社区资源 | 欧美成人高清在线播放 | 天天爽夜夜爽夜夜爽 | 国产sm调教视频在线观看 | 亚洲精品一区二区三区婷婷月 | 久久精品国产99久久6动漫 | 免费乱码人妻系列无码专区 | 丰满肥臀大屁股熟妇激情视频 | aa片在线观看视频在线播放 | 国产高潮视频在线观看 | 99久久久无码国产精品免费 | 人妻中文无码久热丝袜 | 色诱久久久久综合网ywww | 中文字幕无码视频专区 | 欧美日本免费一区二区三区 | 搡女人真爽免费视频大全 | 青春草在线视频免费观看 | 人人妻人人澡人人爽人人精品浪潮 | 久久精品中文字幕一区 | 国精品人妻无码一区二区三区蜜柚 | 日本饥渴人妻欲求不满 | 欧洲vodafone精品性 | 亚洲综合伊人久久大杳蕉 | 精品久久久久香蕉网 | 色综合久久久无码中文字幕 | 蜜桃臀无码内射一区二区三区 | 国产午夜福利亚洲第一 | 国产亚洲精品久久久久久久久动漫 | 亚洲高清偷拍一区二区三区 | 在线а√天堂中文官网 | 中文字幕 人妻熟女 | 乱人伦人妻中文字幕无码久久网 | 亚洲一区二区观看播放 | 午夜精品久久久内射近拍高清 | 国产无遮挡吃胸膜奶免费看 | 97久久国产亚洲精品超碰热 | 人妻aⅴ无码一区二区三区 | 亚洲精品中文字幕久久久久 | 夜夜影院未满十八勿进 | 一本久久伊人热热精品中文字幕 | 国产午夜手机精彩视频 | 亚洲国产午夜精品理论片 | 国产suv精品一区二区五 | 极品尤物被啪到呻吟喷水 | 男人扒开女人内裤强吻桶进去 | 久青草影院在线观看国产 | 久久午夜无码鲁丝片 | 欧洲精品码一区二区三区免费看 | 亚洲熟悉妇女xxx妇女av | 欧美成人午夜精品久久久 | 日本欧美一区二区三区乱码 | 中文字幕无码乱人伦 | 狠狠色噜噜狠狠狠狠7777米奇 | 又大又黄又粗又爽的免费视频 | 日日橹狠狠爱欧美视频 | 欧美亚洲日韩国产人成在线播放 | 人妻与老人中文字幕 | 激情内射日本一区二区三区 | 黄网在线观看免费网站 | 日韩av无码一区二区三区不卡 | 色综合视频一区二区三区 | √天堂资源地址中文在线 | 亚洲精品国产品国语在线观看 | 两性色午夜免费视频 | 亚洲成av人综合在线观看 | 狠狠亚洲超碰狼人久久 | 中文无码伦av中文字幕 | 国产成人无码a区在线观看视频app | 暴力强奷在线播放无码 | 日韩精品无码免费一区二区三区 | 亚洲一区二区三区四区 | 中文字幕久久久久人妻 | 亚洲精品久久久久avwww潮水 | 国产午夜福利100集发布 | 玩弄人妻少妇500系列视频 | 久久久久99精品成人片 | 亚洲 欧美 激情 小说 另类 | 最近的中文字幕在线看视频 | 国产无遮挡吃胸膜奶免费看 | 1000部啪啪未满十八勿入下载 | 国产成人亚洲综合无码 | 麻豆国产丝袜白领秘书在线观看 | 99视频精品全部免费免费观看 | 丝袜美腿亚洲一区二区 | 欧美亚洲国产一区二区三区 | 国产尤物精品视频 | 在线天堂新版最新版在线8 | 7777奇米四色成人眼影 | 国产精品国产自线拍免费软件 | 少妇激情av一区二区 | 正在播放东北夫妻内射 | 激情亚洲一区国产精品 | 亚洲日韩一区二区三区 | 精品熟女少妇av免费观看 | 久在线观看福利视频 | 国产亚洲精品精品国产亚洲综合 | 国产又爽又黄又刺激的视频 | 亚洲国产成人av在线观看 | 在线播放亚洲第一字幕 | 亚洲欧美色中文字幕在线 | 国产成人无码av一区二区 | 东京热一精品无码av | 九九久久精品国产免费看小说 | 牲欲强的熟妇农村老妇女 | 18精品久久久无码午夜福利 | 中文字幕无码av激情不卡 | 国产莉萝无码av在线播放 | 人妻无码αv中文字幕久久琪琪布 | 久久久久久av无码免费看大片 | 婷婷五月综合缴情在线视频 | 黑人玩弄人妻中文在线 | 亚洲国产一区二区三区在线观看 | 成年美女黄网站色大免费全看 | 熟女体下毛毛黑森林 | 国产另类ts人妖一区二区 | 国产乱人无码伦av在线a | 成人免费无码大片a毛片 | 精品无码一区二区三区的天堂 | 日产精品高潮呻吟av久久 | 亚洲欧美精品伊人久久 | 国产手机在线αⅴ片无码观看 | 精品久久久无码中文字幕 | 亚洲国产精品久久久久久 | 狠狠噜狠狠狠狠丁香五月 | 亚洲日韩精品欧美一区二区 | 亚洲一区二区三区含羞草 | 久久亚洲日韩精品一区二区三区 | 欧美猛少妇色xxxxx | 亚洲中文字幕无码一久久区 | 夜先锋av资源网站 | 国产特级毛片aaaaaaa高清 | 亚洲中文字幕乱码av波多ji | 国产精品99爱免费视频 | 日韩 欧美 动漫 国产 制服 | 免费男性肉肉影院 | 人人妻人人澡人人爽精品欧美 | 老司机亚洲精品影院无码 | 亲嘴扒胸摸屁股激烈网站 | 无码国产乱人伦偷精品视频 | 在线观看免费人成视频 | 少妇性l交大片欧洲热妇乱xxx | 成人三级无码视频在线观看 | 成年美女黄网站色大免费视频 | 亚洲色偷偷偷综合网 | 我要看www免费看插插视频 | 中文字幕 人妻熟女 | 国产精品无码一区二区三区不卡 | 国产人妻精品午夜福利免费 | 亚洲一区二区三区播放 | av在线亚洲欧洲日产一区二区 | 日欧一片内射va在线影院 | 黑人巨大精品欧美黑寡妇 | 国产激情无码一区二区app | 波多野结衣av在线观看 | 日韩少妇内射免费播放 | 大乳丰满人妻中文字幕日本 | 亚洲综合无码一区二区三区 | 国产偷自视频区视频 | 久久人人爽人人爽人人片av高清 | 九九在线中文字幕无码 | 国产亚洲精品久久久闺蜜 | 成年美女黄网站色大免费视频 | 亚洲爆乳精品无码一区二区三区 | 中文字幕av无码一区二区三区电影 | 中文字幕无码日韩专区 | 久久aⅴ免费观看 | 欧美国产日韩久久mv | 乱码午夜-极国产极内射 | 国产精品嫩草久久久久 | 中文无码伦av中文字幕 | 亚洲国产午夜精品理论片 | 亚洲高清偷拍一区二区三区 | 色综合久久久无码中文字幕 | 伊人久久婷婷五月综合97色 | 99精品视频在线观看免费 | 久久久久久亚洲精品a片成人 | 人妻aⅴ无码一区二区三区 | 国产成人无码一二三区视频 | 东京一本一道一二三区 | 无人区乱码一区二区三区 | 动漫av一区二区在线观看 | 人妻天天爽夜夜爽一区二区 | 十八禁视频网站在线观看 | 久久精品国产99久久6动漫 | 亚洲综合色区中文字幕 | 国产国语老龄妇女a片 | 中文字幕乱码中文乱码51精品 | 亚洲国产一区二区三区在线观看 | 东北女人啪啪对白 | www一区二区www免费 | 樱花草在线播放免费中文 | 日本在线高清不卡免费播放 | 天堂一区人妻无码 | 天天爽夜夜爽夜夜爽 | 日韩人妻无码中文字幕视频 | 国产精品亚洲五月天高清 | 一本无码人妻在中文字幕免费 | 国产超碰人人爽人人做人人添 | 色狠狠av一区二区三区 | 国产人妻精品一区二区三区不卡 | 久久久婷婷五月亚洲97号色 | 亚洲热妇无码av在线播放 | 无套内射视频囯产 | 国产精品怡红院永久免费 | 亚洲国产欧美日韩精品一区二区三区 | 久久久国产精品无码免费专区 | 国产成人精品视频ⅴa片软件竹菊 | 亚洲热妇无码av在线播放 | 97人妻精品一区二区三区 | 亚洲色www成人永久网址 | 亚洲一区二区三区偷拍女厕 | 欧美性生交活xxxxxdddd | 亚洲精品中文字幕久久久久 | 在线播放无码字幕亚洲 | 激情内射日本一区二区三区 | ass日本丰满熟妇pics | 少女韩国电视剧在线观看完整 | 国产精品内射视频免费 | 麻豆md0077饥渴少妇 | 国产在线一区二区三区四区五区 | 老熟妇仑乱视频一区二区 | 精品无人区无码乱码毛片国产 | 3d动漫精品啪啪一区二区中 | 婷婷丁香五月天综合东京热 | 领导边摸边吃奶边做爽在线观看 | 美女扒开屁股让男人桶 | 国产在线aaa片一区二区99 | 色老头在线一区二区三区 | 国产激情无码一区二区 | 国产精品va在线播放 | 色一情一乱一伦 | 久久久久se色偷偷亚洲精品av | 又大又紧又粉嫩18p少妇 | 性色欲网站人妻丰满中文久久不卡 | 亚洲综合久久一区二区 | 国产人妻人伦精品 | 在线天堂新版最新版在线8 | 国产9 9在线 | 中文 | 久久天天躁狠狠躁夜夜免费观看 | 日韩视频 中文字幕 视频一区 | 1000部啪啪未满十八勿入下载 | 国产性猛交╳xxx乱大交 国产精品久久久久久无码 欧洲欧美人成视频在线 | 婷婷丁香五月天综合东京热 | 97人妻精品一区二区三区 | 精品国产一区二区三区av 性色 | 99久久无码一区人妻 | 欧美人与善在线com | 精品国产aⅴ无码一区二区 | 国产精品高潮呻吟av久久4虎 | 亚洲午夜福利在线观看 | 台湾无码一区二区 | 国产精品久久久久影院嫩草 | 77777熟女视频在线观看 а天堂中文在线官网 | 日本乱人伦片中文三区 | 男人的天堂av网站 | 国产美女精品一区二区三区 | 麻豆av传媒蜜桃天美传媒 | 性色av无码免费一区二区三区 | 国产又粗又硬又大爽黄老大爷视 | 综合网日日天干夜夜久久 | 日本乱人伦片中文三区 | 在线播放免费人成毛片乱码 | 纯爱无遮挡h肉动漫在线播放 | 狂野欧美性猛xxxx乱大交 | 久久亚洲日韩精品一区二区三区 | 欧美性生交活xxxxxdddd | 亚洲乱亚洲乱妇50p | 成人无码视频免费播放 | 久久99久久99精品中文字幕 | 久久久国产一区二区三区 | 内射白嫩少妇超碰 | av无码久久久久不卡免费网站 | 成人欧美一区二区三区 | 婷婷五月综合缴情在线视频 | 亚洲自偷自偷在线制服 | 又湿又紧又大又爽a视频国产 | 天下第一社区视频www日本 | 国产免费观看黄av片 | 亚洲伊人久久精品影院 | 久久久久久国产精品无码下载 | 成熟人妻av无码专区 | 爱做久久久久久 | 成人免费无码大片a毛片 | 日本熟妇大屁股人妻 | 天天摸天天透天天添 | 中文字幕 人妻熟女 | 久久久久久av无码免费看大片 | 国产亚洲人成在线播放 | 国产精品久久久av久久久 | √8天堂资源地址中文在线 | 九九久久精品国产免费看小说 | 色 综合 欧美 亚洲 国产 | 无遮挡啪啪摇乳动态图 | 黑人玩弄人妻中文在线 | 丰满少妇女裸体bbw | 国产麻豆精品精东影业av网站 | 久久午夜夜伦鲁鲁片无码免费 | 亚洲а∨天堂久久精品2021 | 久久久久久久女国产乱让韩 | 98国产精品综合一区二区三区 | 西西人体www44rt大胆高清 | 亚洲精品一区二区三区婷婷月 | 色五月五月丁香亚洲综合网 | 日日躁夜夜躁狠狠躁 | 在线欧美精品一区二区三区 | 国产又粗又硬又大爽黄老大爷视 | 十八禁视频网站在线观看 | 最新国产乱人伦偷精品免费网站 | 人人妻人人澡人人爽欧美一区九九 | 在线观看国产午夜福利片 | 最新版天堂资源中文官网 | 国内老熟妇对白xxxxhd | 亚洲精品国产第一综合99久久 | 国产人妻人伦精品1国产丝袜 | 欧美精品在线观看 | 国产精品久久久久久亚洲毛片 | 国产亚洲精品久久久久久 | 精品厕所偷拍各类美女tp嘘嘘 | 国产精品怡红院永久免费 | 国产猛烈高潮尖叫视频免费 | 在线播放无码字幕亚洲 | 精品无人国产偷自产在线 | 九九热爱视频精品 | 日产国产精品亚洲系列 | 久久久久99精品成人片 | 亚洲欧美精品伊人久久 | 亚洲综合无码久久精品综合 | 国产成人人人97超碰超爽8 | 无套内谢的新婚少妇国语播放 | 在线播放免费人成毛片乱码 | 久久精品人人做人人综合试看 | 爱做久久久久久 | 国产情侣作爱视频免费观看 | 最新国产乱人伦偷精品免费网站 | 国产又粗又硬又大爽黄老大爷视 | 国产精品18久久久久久麻辣 | 久久婷婷五月综合色国产香蕉 | 中文字幕人妻无码一区二区三区 | 日本熟妇大屁股人妻 | 国产在线精品一区二区高清不卡 | 亚洲国产精品一区二区第一页 | 国产艳妇av在线观看果冻传媒 | 俺去俺来也www色官网 | 熟妇女人妻丰满少妇中文字幕 | 久久久久久a亚洲欧洲av冫 | 亚洲欧美日韩综合久久久 | 国产尤物精品视频 | 一本久道高清无码视频 | 亚洲日韩一区二区三区 | 国产精品久久久久久亚洲毛片 | 久久伊人色av天堂九九小黄鸭 | 熟女体下毛毛黑森林 | 爆乳一区二区三区无码 | 中文字幕无线码 | 国产成人精品久久亚洲高清不卡 | 亚洲国产精品久久久天堂 | 俄罗斯老熟妇色xxxx | 亚洲精品综合一区二区三区在线 | 曰本女人与公拘交酡免费视频 | 天堂а√在线地址中文在线 | 5858s亚洲色大成网站www | 国产真人无遮挡作爱免费视频 | 亚洲阿v天堂在线 | 亚洲国产欧美日韩精品一区二区三区 | 日日碰狠狠躁久久躁蜜桃 | 午夜福利一区二区三区在线观看 | 中国大陆精品视频xxxx | 99久久精品日本一区二区免费 | 欧美性猛交xxxx富婆 | 亚洲伊人久久精品影院 | 精品久久久久香蕉网 | 欧美丰满老熟妇xxxxx性 | 亚拍精品一区二区三区探花 | 一个人免费观看的www视频 | 无码人妻丰满熟妇区五十路百度 | 在线а√天堂中文官网 | 国产尤物精品视频 | 爽爽影院免费观看 | 在线а√天堂中文官网 | 欧美日韩久久久精品a片 | 人人妻人人澡人人爽欧美精品 | 久久午夜夜伦鲁鲁片无码免费 | 亚洲精品www久久久 | 亚洲 高清 成人 动漫 | 成人免费视频在线观看 | 日本护士毛茸茸高潮 | 丰满人妻翻云覆雨呻吟视频 | 久久精品国产99久久6动漫 | 国产9 9在线 | 中文 | 亚洲综合在线一区二区三区 | 日本熟妇乱子伦xxxx | 99久久99久久免费精品蜜桃 | 男人扒开女人内裤强吻桶进去 | 四虎国产精品一区二区 | 亚洲欧美日韩综合久久久 | 亚洲中文无码av永久不收费 | 久久久久成人精品免费播放动漫 | 色偷偷人人澡人人爽人人模 | 国产香蕉尹人视频在线 | 国产舌乚八伦偷品w中 | 精品无码国产自产拍在线观看蜜 | 亚洲成av人影院在线观看 | 高潮毛片无遮挡高清免费 | 兔费看少妇性l交大片免费 | 人妻少妇精品久久 | 国产在热线精品视频 | 3d动漫精品啪啪一区二区中 | 久久人人爽人人爽人人片av高清 | 亚洲综合无码一区二区三区 | 色窝窝无码一区二区三区色欲 | 特级做a爰片毛片免费69 | 久久久久久久女国产乱让韩 | 国产区女主播在线观看 | 中文字幕色婷婷在线视频 | 最新国产乱人伦偷精品免费网站 | 亚洲精品一区三区三区在线观看 | 岛国片人妻三上悠亚 | 丰满少妇弄高潮了www | 最新国产麻豆aⅴ精品无码 | 蜜桃视频韩日免费播放 | 久精品国产欧美亚洲色aⅴ大片 | 久久久国产一区二区三区 | 夜精品a片一区二区三区无码白浆 | 免费人成网站视频在线观看 | 老熟妇仑乱视频一区二区 | 无套内射视频囯产 | 无码成人精品区在线观看 | 国产色xx群视频射精 | 欧美老妇与禽交 | 国产成人无码一二三区视频 | 亚洲无人区一区二区三区 | 在线观看欧美一区二区三区 | 成人无码精品1区2区3区免费看 | 国产精品无套呻吟在线 | 无码人妻精品一区二区三区下载 | 国产成人一区二区三区在线观看 | 欧美亚洲国产一区二区三区 | 欧美 丝袜 自拍 制服 另类 | 强奷人妻日本中文字幕 | 国产综合久久久久鬼色 | 激情内射日本一区二区三区 | 少妇无码av无码专区在线观看 | 欧美日韩视频无码一区二区三 | 日日躁夜夜躁狠狠躁 | 国产精品美女久久久久av爽李琼 | 四虎永久在线精品免费网址 | 中文字幕无码视频专区 | 中文字幕无线码 | 乱码午夜-极国产极内射 | 午夜福利一区二区三区在线观看 | 亚洲精品鲁一鲁一区二区三区 | 中文字幕无码日韩专区 | 日本va欧美va欧美va精品 | 国产精品鲁鲁鲁 | 国产成人无码a区在线观看视频app | 亚洲欧美日韩国产精品一区二区 | 一本加勒比波多野结衣 | 国产xxx69麻豆国语对白 | 小sao货水好多真紧h无码视频 | 性开放的女人aaa片 | 色欲av亚洲一区无码少妇 | 亚拍精品一区二区三区探花 | 久精品国产欧美亚洲色aⅴ大片 | 国产精品二区一区二区aⅴ污介绍 | 精品无码国产自产拍在线观看蜜 | 精品无人国产偷自产在线 | аⅴ资源天堂资源库在线 | 亚洲成a人一区二区三区 | 麻豆国产97在线 | 欧洲 | 露脸叫床粗话东北少妇 | 色综合久久久久综合一本到桃花网 | 精品偷自拍另类在线观看 | 99久久精品午夜一区二区 | 亚洲 日韩 欧美 成人 在线观看 | 久久aⅴ免费观看 | 好男人社区资源 | 国产乱人偷精品人妻a片 | 精品国产一区二区三区av 性色 | 国产av一区二区精品久久凹凸 | 国产精品亚洲一区二区三区喷水 | 性欧美大战久久久久久久 | 亚洲综合无码久久精品综合 | 欧美老熟妇乱xxxxx | 中文毛片无遮挡高清免费 | 小鲜肉自慰网站xnxx | 人人妻人人澡人人爽人人精品浪潮 | 蜜臀av在线播放 久久综合激激的五月天 | 中文字幕日韩精品一区二区三区 | 国内精品人妻无码久久久影院 | 蜜桃av抽搐高潮一区二区 | 亚洲色无码一区二区三区 | 人妻插b视频一区二区三区 | 丝袜足控一区二区三区 | 精品无码一区二区三区爱欲 | 午夜精品久久久久久久久 | 亚洲精品一区二区三区在线观看 | 少妇被粗大的猛进出69影院 | 日本大香伊一区二区三区 | 精品久久久无码人妻字幂 | 国产精品久久久久7777 | 亚洲精品一区二区三区婷婷月 | 亚洲色大成网站www国产 | 国产亲子乱弄免费视频 | 国产精品久久久久9999小说 | 日产精品99久久久久久 | 国内精品人妻无码久久久影院蜜桃 | 精品人妻人人做人人爽 | 精品少妇爆乳无码av无码专区 | 久久人人97超碰a片精品 | 日韩亚洲欧美中文高清在线 | 波多野结衣乳巨码无在线观看 | yw尤物av无码国产在线观看 | 樱花草在线社区www | 亚洲精品一区二区三区在线观看 | 亚洲精品综合一区二区三区在线 | 高中生自慰www网站 | 99久久精品午夜一区二区 | 日韩无套无码精品 | 国产suv精品一区二区五 | 成年美女黄网站色大免费全看 | 国产亚洲精品久久久ai换 | 国产69精品久久久久app下载 | 麻豆国产人妻欲求不满 | 九九久久精品国产免费看小说 | 欧美日韩色另类综合 | 亚洲中文字幕久久无码 | www成人国产高清内射 | 麻豆md0077饥渴少妇 | 欧美日韩在线亚洲综合国产人 | 国产av一区二区三区最新精品 | 久久精品国产精品国产精品污 | 午夜丰满少妇性开放视频 | 又粗又大又硬毛片免费看 | 在线看片无码永久免费视频 | 久久久久久九九精品久 | 欧美日本免费一区二区三区 | 国产性生大片免费观看性 | 亚洲精品成人av在线 | 欧美熟妇另类久久久久久多毛 | 又湿又紧又大又爽a视频国产 | 久久人妻内射无码一区三区 | 美女毛片一区二区三区四区 | 日韩av激情在线观看 | 精品久久久久久亚洲精品 | 久久亚洲中文字幕无码 | 久久国内精品自在自线 | 欧美人与物videos另类 | 无码帝国www无码专区色综合 | 亚洲狠狠婷婷综合久久 | 性开放的女人aaa片 | 国产亚洲精品久久久闺蜜 | 中文字幕无码免费久久9一区9 | 成人精品天堂一区二区三区 | 国产午夜视频在线观看 | 熟女体下毛毛黑森林 | 亚洲七七久久桃花影院 | 国产口爆吞精在线视频 | 国产内射爽爽大片视频社区在线 | 曰韩少妇内射免费播放 | 亚洲欧美精品aaaaaa片 | 亚洲中文字幕在线观看 | 无码人妻丰满熟妇区毛片18 | 精品人妻人人做人人爽 | 亚洲码国产精品高潮在线 | 久在线观看福利视频 | 性做久久久久久久免费看 | 亚洲精品久久久久中文第一幕 | 正在播放老肥熟妇露脸 | 精品久久综合1区2区3区激情 | 久久精品国产99久久6动漫 | 最新国产乱人伦偷精品免费网站 | 免费无码一区二区三区蜜桃大 | 秋霞成人午夜鲁丝一区二区三区 | 一个人看的视频www在线 | 亚洲日韩一区二区三区 | 中文毛片无遮挡高清免费 | 久久精品国产亚洲精品 | 综合人妻久久一区二区精品 | 野外少妇愉情中文字幕 | 国内精品人妻无码久久久影院蜜桃 | 人人妻人人藻人人爽欧美一区 | 人妻无码久久精品人妻 | 国产精品a成v人在线播放 | 久久久久久久久蜜桃 | 亚洲中文字幕乱码av波多ji | 久久国内精品自在自线 | 久久久精品国产sm最大网站 | 国产人妻人伦精品 | 天干天干啦夜天干天2017 | www国产亚洲精品久久久日本 | 国产精品va在线播放 | 婷婷综合久久中文字幕蜜桃三电影 | 人妻少妇精品无码专区动漫 | 国产精品成人av在线观看 | 亚洲va欧美va天堂v国产综合 | 无码午夜成人1000部免费视频 | 色爱情人网站 | 日日摸天天摸爽爽狠狠97 | 99国产欧美久久久精品 | 久久综合狠狠综合久久综合88 | 国内精品人妻无码久久久影院 | 国产97在线 | 亚洲 | 高清无码午夜福利视频 | 免费观看激色视频网站 | 男女下面进入的视频免费午夜 | av小次郎收藏 | 在教室伦流澡到高潮hnp视频 | 欧美日本免费一区二区三区 | 精品无码一区二区三区爱欲 | 国产乱人伦av在线无码 | 亚洲精品一区三区三区在线观看 | 国产特级毛片aaaaaaa高清 | 亚洲国产午夜精品理论片 | 亚洲日本在线电影 | 国产人成高清在线视频99最全资源 | 久久精品99久久香蕉国产色戒 | 无套内谢的新婚少妇国语播放 | 精品成在人线av无码免费看 | 亚洲日韩精品欧美一区二区 | 天堂亚洲免费视频 | 全黄性性激高免费视频 | 自拍偷自拍亚洲精品被多人伦好爽 | 亚洲天堂2017无码中文 | 色一情一乱一伦一区二区三欧美 | 欧美人与牲动交xxxx | 日韩精品无码一区二区中文字幕 | 亚洲成av人片天堂网无码】 | 18禁黄网站男男禁片免费观看 | 东京无码熟妇人妻av在线网址 | 秋霞特色aa大片 | 精品久久久无码中文字幕 | 无码免费一区二区三区 | 一本久久a久久精品亚洲 | 亚洲中文字幕无码一久久区 | 无码乱肉视频免费大全合集 | 日韩精品乱码av一区二区 | 亚洲经典千人经典日产 | 红桃av一区二区三区在线无码av | 亚洲精品国产第一综合99久久 | 精品日本一区二区三区在线观看 | 东京热一精品无码av | 最新国产乱人伦偷精品免费网站 | 国产熟妇另类久久久久 | 欧美激情一区二区三区成人 | 国产 精品 自在自线 | 免费中文字幕日韩欧美 | 桃花色综合影院 | 免费无码午夜福利片69 | 日日鲁鲁鲁夜夜爽爽狠狠 | 未满小14洗澡无码视频网站 | av无码电影一区二区三区 | 97久久超碰中文字幕 | 国产精品久久久一区二区三区 | 亚洲午夜福利在线观看 | 国产九九九九九九九a片 | 日本精品少妇一区二区三区 | 国产特级毛片aaaaaaa高清 | 亚洲一区二区三区在线观看网站 | 日韩精品无码一本二本三本色 | а√天堂www在线天堂小说 | 一区二区三区高清视频一 | 天堂а√在线地址中文在线 | 国产精品亚洲五月天高清 | 男女超爽视频免费播放 | 免费播放一区二区三区 | 日韩欧美成人免费观看 | 综合激情五月综合激情五月激情1 | 永久免费观看美女裸体的网站 | 国产精品国产自线拍免费软件 | 国产人妻精品午夜福利免费 | 兔费看少妇性l交大片免费 | 日韩亚洲欧美精品综合 | 18禁黄网站男男禁片免费观看 | 成人免费无码大片a毛片 | 成熟妇人a片免费看网站 | 97久久国产亚洲精品超碰热 | 蜜桃视频插满18在线观看 | 亚洲 欧美 激情 小说 另类 | 国产成人无码午夜视频在线观看 | 乱人伦人妻中文字幕无码久久网 | 福利一区二区三区视频在线观看 | 亚洲热妇无码av在线播放 | 色一情一乱一伦 | 天天拍夜夜添久久精品 | 亚洲成av人片在线观看无码不卡 | 青青青爽视频在线观看 | 久久精品女人的天堂av | 四虎影视成人永久免费观看视频 | 日产精品高潮呻吟av久久 | 婷婷六月久久综合丁香 | 纯爱无遮挡h肉动漫在线播放 | 亚洲欧洲中文日韩av乱码 | 无码人妻精品一区二区三区下载 | 国产精品无码永久免费888 | 综合网日日天干夜夜久久 | 国产精品久久福利网站 | 久久久久99精品国产片 | 国产一区二区三区精品视频 | 欧美性猛交内射兽交老熟妇 | 曰韩少妇内射免费播放 | 亚洲男人av天堂午夜在 | 国产乱人偷精品人妻a片 | 中文字幕精品av一区二区五区 | 东京无码熟妇人妻av在线网址 | 日本www一道久久久免费榴莲 | 精品熟女少妇av免费观看 | 任你躁在线精品免费 | 国精品人妻无码一区二区三区蜜柚 | 无码人妻精品一区二区三区下载 | 强辱丰满人妻hd中文字幕 | 久久久久av无码免费网 | 亚洲 欧美 激情 小说 另类 | 国产亚洲人成a在线v网站 | 欧美色就是色 | 暴力强奷在线播放无码 | 国产小呦泬泬99精品 | 精品无人区无码乱码毛片国产 | 久久无码专区国产精品s | 成年美女黄网站色大免费全看 | av香港经典三级级 在线 | 国产美女极度色诱视频www | 99在线 | 亚洲 | 国产乱人偷精品人妻a片 | 野外少妇愉情中文字幕 | 97精品国产97久久久久久免费 | 红桃av一区二区三区在线无码av | 亚洲色无码一区二区三区 | 亚洲aⅴ无码成人网站国产app | 久久久久久国产精品无码下载 | 亚洲中文字幕成人无码 | 国产成人无码一二三区视频 | 国产人妻精品一区二区三区不卡 | 久久99精品国产.久久久久 | 日日躁夜夜躁狠狠躁 | 国产亚洲精品久久久久久大师 | 亚洲国产欧美国产综合一区 | 久久久久免费精品国产 | 久久久久久av无码免费看大片 | 人妻少妇被猛烈进入中文字幕 | 久久综合九色综合欧美狠狠 | 色窝窝无码一区二区三区色欲 | 亚洲日韩乱码中文无码蜜桃臀网站 | аⅴ资源天堂资源库在线 | 亚洲中文字幕va福利 | 一本一道久久综合久久 | 少妇无码一区二区二三区 | 欧美日韩综合一区二区三区 | 东北女人啪啪对白 | 久久综合九色综合欧美狠狠 | 乱码午夜-极国产极内射 | 亚洲日韩av一区二区三区四区 | 国产97色在线 | 免 | 狠狠亚洲超碰狼人久久 | 无码免费一区二区三区 | 99久久久无码国产精品免费 | 日本精品人妻无码77777 天堂一区人妻无码 | 麻豆国产人妻欲求不满 | 日韩在线不卡免费视频一区 | 亚洲成色在线综合网站 | 久久这里只有精品视频9 | 久久久国产精品无码免费专区 | 久久综合狠狠综合久久综合88 | 国产精品久久久久久久9999 | 扒开双腿吃奶呻吟做受视频 | 人妻有码中文字幕在线 | 久久精品国产亚洲精品 | 人妻尝试又大又粗久久 | 日日干夜夜干 | 国产内射老熟女aaaa | 久久久国产精品无码免费专区 | 亚洲精品午夜无码电影网 | 漂亮人妻洗澡被公强 日日躁 | 亚洲精品www久久久 | 亚洲人成人无码网www国产 | 久久国内精品自在自线 | 澳门永久av免费网站 | 欧美熟妇另类久久久久久不卡 | 中文字幕 亚洲精品 第1页 | 欧美日韩一区二区三区自拍 | 乱码av麻豆丝袜熟女系列 | 性欧美疯狂xxxxbbbb | 九月婷婷人人澡人人添人人爽 | 一本无码人妻在中文字幕免费 | 国产免费久久精品国产传媒 | 国产女主播喷水视频在线观看 | 蜜桃无码一区二区三区 | 欧美国产日产一区二区 | 日韩亚洲欧美精品综合 | 国产精品丝袜黑色高跟鞋 | www国产亚洲精品久久久日本 | 一本色道久久综合狠狠躁 | av无码久久久久不卡免费网站 | 精品熟女少妇av免费观看 | 欧美人与禽猛交狂配 | 国产精华av午夜在线观看 | 久久久婷婷五月亚洲97号色 | 国产人妖乱国产精品人妖 | 欧美日韩在线亚洲综合国产人 | 欧美亚洲日韩国产人成在线播放 | 少妇人妻大乳在线视频 | 性生交大片免费看l | 奇米影视888欧美在线观看 | 亚洲人交乣女bbw | 国产在线精品一区二区三区直播 | 色综合久久久久综合一本到桃花网 | 国产精品自产拍在线观看 | 乱中年女人伦av三区 | 综合激情五月综合激情五月激情1 | 欧美性猛交内射兽交老熟妇 | 曰韩无码二三区中文字幕 | 久久无码专区国产精品s | 精品久久综合1区2区3区激情 | 男人和女人高潮免费网站 | 99久久久国产精品无码免费 | 亚洲熟妇色xxxxx欧美老妇 | 77777熟女视频在线观看 а天堂中文在线官网 | 亚洲乱码日产精品bd | 国产精品久久久久影院嫩草 | 国产精品久久久午夜夜伦鲁鲁 | 人妻少妇精品无码专区动漫 | 亚洲中文字幕av在天堂 | 欧美熟妇另类久久久久久多毛 | 亚洲日韩中文字幕在线播放 | 无码国产激情在线观看 | 国产麻豆精品一区二区三区v视界 | 天堂久久天堂av色综合 | 在线精品亚洲一区二区 | 丁香啪啪综合成人亚洲 | 色婷婷香蕉在线一区二区 | 三级4级全黄60分钟 | 国产69精品久久久久app下载 | 欧洲vodafone精品性 | 国内精品九九久久久精品 | 2020久久香蕉国产线看观看 | 精品国产一区二区三区av 性色 | 亚洲中文字幕久久无码 | 日本熟妇乱子伦xxxx | 欧美日本日韩 | 亚洲乱亚洲乱妇50p | 午夜嘿嘿嘿影院 | 国产精品内射视频免费 | 国产av一区二区三区最新精品 | 国产精品久免费的黄网站 | 一二三四社区在线中文视频 | 国产超碰人人爽人人做人人添 | 国产艳妇av在线观看果冻传媒 | 小sao货水好多真紧h无码视频 | 欧美人与动性行为视频 | 精品国产乱码久久久久乱码 | 日韩精品无码免费一区二区三区 | 精品少妇爆乳无码av无码专区 | 18无码粉嫩小泬无套在线观看 | 欧美大屁股xxxxhd黑色 | 久久久亚洲欧洲日产国码αv | 国产成人精品优优av | 香港三级日本三级妇三级 | 免费播放一区二区三区 | 扒开双腿疯狂进出爽爽爽视频 | 小sao货水好多真紧h无码视频 | 久久亚洲中文字幕精品一区 | 久久人人97超碰a片精品 | 亚洲熟熟妇xxxx | 成人无码视频在线观看网站 | 国产莉萝无码av在线播放 |