久久精品国产精品国产精品污,男人扒开添女人下部免费视频,一级国产69式性姿势免费视频,夜鲁夜鲁很鲁在线视频 视频,欧美丰满少妇一区二区三区,国产偷国产偷亚洲高清人乐享,中文 在线 日韩 亚洲 欧美,熟妇人妻无乱码中文字幕真矢织江,一区二区三区人妻制服国产

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring Cloud Alibaba RocketMQ 快速入门

發布時間:2023/12/14 javascript 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring Cloud Alibaba RocketMQ 快速入门 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Spring Cloud Alibaba 消息隊列 RocketMQ 入門_weixin_42073629的博客-CSDN博客_spring cloud alibaba rocketmq1. 概述本文我們來學習Spring Cloud Alibaba提供的Spring Cloud Stream RocketMQ組件,基于Spring Cloud Stream的編程模型,接入 RocketMQ 作為消息中間件,實現消息驅動的微服務。RocketMQ是一款開源的分布式消息系統,基于高可用分布式集群技術,提供低延時的、高可靠的消息發布與訂閱服務。同時,廣泛應用于多個領域,包括異步通信解耦、企業解決方案、金融支付、電信、電子商務、快遞物流、廣告營銷、社交、即時通信、移動應用...https://blog.csdn.net/weixin_42073629/article/details/106535675

?

1. 概述

本文我們來學習?Spring Cloud Alibaba?提供的?Spring Cloud Stream RocketMQ?組件,基于?Spring Cloud Stream?的編程模型,接入 RocketMQ 作為消息中間件,實現消息驅動的微服務。

RocketMQ?是一款開源的分布式消息系統,基于高可用分布式集群技術,提供低延時的、高可靠的消息發布與訂閱服務。同時,廣泛應用于多個領域,包括異步通信解耦、企業解決方案、金融支付、電信、電子商務、快遞物流、廣告營銷、社交、即時通信、移動應用、手游、視頻、物聯網、車聯網等。

具有以下特點:

  • 能夠保證嚴格的消息順序
  • 提供豐富的消息拉取模式
  • 高效的訂閱者水平擴展能力
  • 實時的消息訂閱機制
  • 億級消息堆積能力

在開始本文之前,胖友需要對 RocketMQ 進行簡單的學習。可以閱讀《RocketMQ 極簡入門》文章,將第一二小節看完,在本機搭建一個 RocketMQ 服務。

2. Spring Cloud Stream 介紹

Spring Cloud Stream?是一個用于構建基于消息的微服務應用框架,使用?Spring Integration?與 Broker 進行連接。

友情提示:可能有胖友對 Broker 不太了解,我們來簡單解釋下。

一般來說,消息隊列中間件都有一個?Broker Server(代理服務器),消息中轉角色,負責存儲消息、轉發消息。

例如說在 RocketMQ 中,Broker 負責接收從生產者發送來的消息并存儲、同時為消費者的拉取請求作準備。另外,Broker 也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。

Spring Cloud Stream 提供了消息中間件的統一抽象,推出了 publish-subscribe、consumer groups、partition 這些統一的概念。

Spring Cloud Stream 內部有兩個概念:Binder?和?Binding

①?Binder,跟消息中間件集成的組件,用來創建對應的 Binding。各消息中間件都有自己的 Binder 具體實現。

public interface Binder<T, C extends ConsumerProperties, // 消費者配置P extends ProducerProperties> { // 生產者配置// 創建消費者的 BindingBinding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);// 創建生產者的 BindingBinding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);}
  • Kafka 實現了?KafkaMessageChannelBinder
  • RabbitMQ 實現了?RabbitMessageChannelBinder
  • RocketMQ 實現了?RocketMQMessageChannelBinder

②?Binding,包括 Input Binding 和 Output Binding。Binding 在消息中間件與應用程序提供的 Provider 和 Consumer 之間提供了一個橋梁,實現了開發者只需使用應用程序的 Provider 或 Consumer 生產或消費數據即可,屏蔽了開發者與底層消息中間件的接觸。

最終整體交互如下圖所示:

可能看完之后,胖友對 Spring Cloud Stream 還是有點懵逼,并且覺得概念怎么這么多呢?不要慌,我們先來快速入個門,會有更加具象的感受。

3. 快速入門

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-demo
  • 消費者:labx-06-sca-stream-rocketmq-consumer-demo

本小節,我們一起來快速入門下,會創建 2 個項目,分別作為生產者和消費者。最終項目如下圖所示:

3.1 搭建生產者

創建?labx-06-sca-stream-rocketmq-producer-demo?項目,作為生產者。

3.1.1 引入依賴

創建?pom.xml?文件中,引入 Spring Cloud Alibaba RocketMQ 相關依賴。

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>labx-06</artifactId><groupId>cn.iocoder.springboot.labs</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>labx-06-sca-stream-rocketmq-producer-demo</artifactId><properties><maven.compiler.target>1.8</maven.compiler.target><maven.compiler.source>1.8</maven.compiler.source><spring.boot.version>2.2.4.RELEASE</spring.boot.version><spring.cloud.version>Hoxton.SR1</spring.cloud.version><spring.cloud.alibaba.version>2.2.0.RELEASE</spring.cloud.alibaba.version></properties><!--引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,進行依賴版本的管理,防止不兼容。在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 開發團隊推薦了三者的依賴關系--><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>${spring.boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring.cloud.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring.cloud.alibaba.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><!-- 引入 SpringMVC 相關依賴,并實現對其的自動配置 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 引入 Spring Cloud Alibaba Stream RocketMQ 相關依賴,將 RocketMQ 作為消息隊列,并實現對其的自動配置 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency></dependencies></project>

通過引入?spring-cloud-starter-stream-rocketmq?依賴,引入并實現 RocketMQ 的自動配置。在該依賴中,已經幫我們自動引入 RocketMQ 的大量依賴,非常方便,如下圖所示:

3.1.2 配置文件

創建?application.yaml?配置文件,添加 Spring Cloud Alibaba RocketMQ 相關配置。

spring:application:name: demo-producer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-output:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSON# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-output:# RocketMQ Producer 配置項,對應 RocketMQProducerProperties 類producer:group: test # 生產者分組sync: true # 是否同步發送消息,默認為 false 異步。server:port: 18080

①?spring.cloud.stream?為 Spring Cloud Stream 配置項,對應?BindingServiceProperties?類。配置的層級有點深,我們一層一層來看看。

②?spring.cloud.stream.bindings?為 Binding 配置項,對應?BindingProperties?Map。其中,key?為 Binding 的名字。要注意,雖然說 Binding 分成 Input 和 Output 兩種類型,但是在配置項中并不會體現出來,而是要在稍后搭配?@Input?還是?@Output?注解,才會有具體的區分。

這里,我們配置了一個名字為?demo01-output?的 Binding。從命名上,我們的意圖是想作為 Output Binding,用于生產者發送消息。

  • destination:目的地。在 RocketMQ 中,使用 Topic 作為目的地。這里我們設置為?DEMO-TOPIC-01。

    主題(Topic):表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬于一個主題,是 RocketMQ 進行消息訂閱的基本單位。

  • content-type:內容格式。這里使用 JSON 格式,因為稍后我們將發送消息的類型為 POJO,使用 JSON 進行序列化。

③?spring.cloud.stream.rocketmq?為 Spring Cloud Stream RocketMQ 配置項。

④?spring.cloud.stream.rocketmq.binder?為 RocketMQ Binder 配置項,對應?RocketMQBinderConfigurationProperties?類。

  • name-server:RocketMQ Namesrv 地址。

    名字服務(Name Server):名稱服務充當路由消息的提供者。生產者或消費者能夠通過名字服務查找各主題相應的 Broker IP 列表。多個 Namesrv 實例組成集群,但相互獨立,沒有信息交換。

⑤?spring.cloud.stream.rocketmq.bindings?為 RocketMQ?自定義?Binding 配置項,用于對通用的?spring.cloud.stream.bindings?配置項的增強,實現 RocketMQ Binding 獨特的配置。該配置項對應?RocketMQBindingProperties?Map,其中?key?為 Binding 的名字,需要對應上噢。

這里,我們對名字為?demo01-output?的 Binding 進行增強,進行 Producer 的配置。其中,producer?為 RocketMQ Producer 配置項,對應?RocketMQProducerProperties?類。

  • group:生產者分組。

    生產者組(Producer Group):同一類 Producer 的集合,這類 Producer 發送同一類消息且發送邏輯一致。如果發送的是事務消息且原始生產者在發送之后崩潰,則 Broker 服務器會聯系同一生產者組的其他生產者實例以提交或回溯消費。

  • sync:是否同步發送消息,默認為?false?異步。一般業務場景下,使用同步發送消息較多,所以這里我們設置為?true?同步消息。

    使用 RocketMQ 發送三種類型的消息:同步消息(sync)、異步消息(async)和單向消息(oneway)。其中前兩種消息是可靠的,因為會有發送是否成功的應答。

3.1.3 MySource

創建?MySource?接口,聲明名字為 Output Binding。代碼如下:

public interface MySource {@Output("demo01-output")MessageChannel demo01Output();}

這里,我們通過?@Output?注解,聲明了一個名字為?demo01-output?的 Output Binding。注意,這個名字要和我們配置文件中的?spring.cloud.stream.bindings?配置項對應上。

同時,@Output?注解的方法的返回結果為?MessageChannel?類型,可以使用它發送消息。MessageChannel 提供的發送消息的方法如下:

@FunctionalInterface public interface MessageChannel {long INDEFINITE_TIMEOUT = -1;default boolean send(Message<?> message) {return send(message, INDEFINITE_TIMEOUT);}boolean send(Message<?> message, long timeout);}

那么,我們是否要實現 MySource 接口呢?答案是不需要,全部交給 Spring Cloud Stream 的?BindableProxyFactory?來解決。BindableProxyFactory 會通過動態代理,自動實現 MySource 接口。 而?@Output?注解的方法的返回值,BindableProxyFactory 會掃描帶有?@Output?注解的方法,自動進行創建。

例如說,#demo01Output()?方法被自動創建返回結果為?DirectWithAttributesChannel,它是 MessageChannel 的子類。

友情提示:感興趣的胖友,可以在 BindableProxyFactory 的?#afterPropertiesSet()?和?#invoke(MethodInvocation invocation)?方法上,都打上一個斷點,然后進行愉快的調試。

3.1.4 Demo01Message

創建?Demo01Message?類,示例 Message 消息。代碼如下:

public class Demo01Message {/*** 編號*/private Integer id;// ... 省略 setter/getter/toString 方法}

3.1.5 Demo01Controller

創建?Demo01Controller?類,提供發送消息的 HTTP 接口。代碼如下:

@RestController @RequestMapping("/demo01") public class Demo01Controller {@Autowiredprivate MySource mySource; // <X>@GetMapping("/send")public boolean send() {// <1> 創建 MessageDemo01Message message = new Demo01Message().setId(new Random().nextInt());// <2> 創建 Spring Message 對象Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).build();// <3> 發送消息return mySource.demo01Output().send(springMessage);}}
  • <X>?處,使用?@Autowired?注解,注入 MySource Bean。
  • <1>?處,創建 Demo01Message 對象。
  • <2>?處,使用?MessageBuilder?創建 Spring?Message?對象,并設置消息內容為 Demo01Message 對象。
  • <3>?處,通過 MySource 獲得 MessageChannel 對象,然后發送消息。

3.1.6 ProducerApplication

創建?ProducerApplication?類,啟動應用。代碼如下:

@SpringBootApplication @EnableBinding(MySource.class) public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class, args);}}

使用?@EnableBinding?注解,聲明指定接口開啟 Binding 功能,掃描其?@Input?和?@Output?注解。這里,我們設置為 MySource 接口。

3.2 搭建消費者

創建?labx-06-sca-stream-rocketmq-consumer-demo?項目,作為消費者。

3.2.1 引入依賴

創建?pom.xml?文件中,引入 Spring Cloud Alibaba RocketMQ 相關依賴。

友情提示:和「3.1.1 引入依賴」基本一樣,點擊?鏈接?查看。

3.2.2 配置文件

創建?application.yaml?配置文件,添加 Spring Cloud Alibaba RocketMQ 相關配置。

spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSONgroup: demo01-consumer-group-DEMO-TOPIC-01 # 消費者分組# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: false # 是否使用廣播消費,默認為 false 使用集群消費server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者

總體來說,和「3.1.2 配置文件」是比較接近的,所以我們只說差異點噢。

①?spring.cloud.stream.bindings?為 Binding 配置項。

這里,我們配置了一個名字為?demo01-input?的 Binding。從命名上,我們的意圖是想作為 Input Binding,用于消費者消費消息。

  • group:消費者分組。

    消費者組(Consumer Group):同一類 Consumer 的集合,這類 Consumer 通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者實例必須訂閱完全相同的 Topic。RocketMQ 支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。

②?spring.cloud.stream.rocketmq.bindings?為 RocketMQ?自定義?Binding 配置項。

這里,我們對名字為?demo01-input?的 Binding 進行增強,進行 Consumer 的配置。其中,consumer?為 RocketMQ Producer 配置項,對應?RocketMQConsumerProperties?類。

  • enabled:是否開啟消費,默認為?true。在日常開發時,如果在本地環境不想消費,可以通過設置?enabled?為?false?進行關閉。

  • broadcasting: 是否使用廣播消費,默認為?false?使用集群消費。

    • 集群消費(Clustering):集群消費模式下,相同 Consumer Group 的每個 Consumer 實例平均分攤消息。
    • 廣播消費(Broadcasting):廣播消費模式下,相同 Consumer Group 的每個 Consumer 實例都接收全量的消息。

這里一點要注意!!!艿艿加了三個感嘆號,一定要理解集群消費和廣播消費的差異。我們來舉個例子,以有兩個消費者分組 A 和 B 的場景舉例子:

  • 假設每個消費者分組各啟動一個實例,此時我們發送一條消息,該消息會被兩個消費者分組?"consumer_group_01"?和?"consumer_group_02"?都各自消費一次。
  • 假設每個消費者分組各啟動一個實例,此時我們發送一條消息,該消息會被分組 A 的某個實例消費一次,被分組 B 的某個實例也消費一次

通過集群消費的機制,我們可以實現針對相同 Topic ,不同消費者分組實現各自的業務邏輯。例如說:用戶注冊成功時,發送一條 Topic 為?"USER_REGISTER"?的消息。然后,不同模塊使用不同的消費者分組,訂閱該 Topic ,實現各自的拓展邏輯:

  • 積分模塊:判斷如果是手機注冊,給用戶增加 20 積分。
  • 優惠劵模塊:因為是新用戶,所以發放新用戶專享優惠劵。
  • 站內信模塊:因為是新用戶,所以發送新用戶的歡迎語的站內信。
  • ... 等等

這樣,我們就可以將注冊成功后的業務拓展邏輯,實現業務上的解耦,未來也更加容易拓展。同時,也提高了注冊接口的性能,避免用戶需要等待業務拓展邏輯執行完成后,才響應注冊成功。

同時,相同消費者分組的多個實例,可以實現高可用,保證在一個實例意外掛掉的情況下,其它實例能夠頂上。并且,多個實例都進行消費,能夠提升消費速度

友情提示:如果還不理解的話,沒有關系,我們下面會演示下我們上面舉的例子。

3.2.3 MySink

創建?MySink?接口,聲明名字為 Input Binding。代碼如下:

public interface MySink {String DEMO01_INPUT = "demo01-input";@Input(DEMO01_INPUT)SubscribableChannel demo01Input();}

這里,我們通過?@Input?注解,聲明了一個名字為?demo01-input?的 Input Binding。注意,這個名字要和我們配置文件中的?spring.cloud.stream.bindings?配置項對應上。

同時,@Input?注解的方法的返回結果為?SubscribableChannel?類型,可以使用它訂閱消息來消費。MessageChannel 提供的訂閱消息的方法如下:

public interface SubscribableChannel extends MessageChannel {boolean subscribe(MessageHandler handler); // 訂閱boolean unsubscribe(MessageHandler handler); // 取消訂閱}

那么,我們是否要實現 MySink 接口呢?答案也是不需要,還是全部交給 Spring Cloud Stream 的?BindableProxyFactory?大兄弟來解決。BindableProxyFactory 會通過動態代理,自動實現 MySink 接口。 而?@Input?注解的方法的返回值,BindableProxyFactory 會掃描帶有?@Input?注解的方法,自動進行創建。

例如說,#demo01Input()?方法被自動創建返回結果為?DirectWithAttributesChannel,它也是 SubscribableChannel 的子類。

友情提示:感興趣的胖友,可以在 BindableProxyFactory 的?#afterPropertiesSet()?和?#invoke(MethodInvocation invocation)?方法上,都打上一個斷點,然后進行愉快的調試。

3.2.4 Demo01Message

創建?Demo01Message?類,示例 Message 消息。

友情提示:和「3.1.4 Demo01Message」基本一樣,點擊?鏈接?查看。

3.2.5 Demo01Consumer

創建?Demo01Consumer?類,消費消息。代碼如下:

@Component public class Demo01Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@StreamListener(MySink.DEMO01_INPUT)public void onMessage(@Payload Demo01Message message) {logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message);}}

在方法上,添加?@StreamListener?注解,聲明對應的?Input?Binding。這里,我們使用?MySink.DEMO01_INPUT。

又因為我們消費的消息是 POJO 類型,所以我們需要添加?@Payload?注解,聲明需要進行反序列化成 POJO 對象。

3.2.6 ConsumerApplication

創建?ConsumerApplication?類,啟動應用。代碼如下:

@SpringBootApplication @EnableBinding(MySink.class) public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}}

使用?@EnableBinding?注解,聲明指定接口開啟 Binding 功能,掃描其?@Input?和?@Output?注解。這里,我們設置為 MySink 接口。

3.3 測試單集群多實例的場景

本小節,我們會在一個消費者集群啟動兩個實例,測試在集群消費的情況下的表現。

① 執行?ConsumerApplication 兩次,啟動兩個消費者的實例,從而實現在消費者分組?demo01-consumer-group-DEMO-TOPIC-01?下有兩個消費者實例。此時在 IDEA 控制臺看到 RocketMQ 相關的日志如下:

2020-02-22 09:32:54.462 INFO 50472 --- [ main] s.b.r.c.RocketMQListenerBindingContainer : running container: RocketMQListenerBindingContainer{consumerGroup='demo01-consumer-group-DEMO-TOPIC-01', nameServer='[127.0.0.1:9876]', topic='DEMO-TOPIC-01', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='null', messageModel=CLUSTERING} 2020-02-22 09:32:54.462 INFO 50472 --- [ main] .c.s.b.r.i.RocketMQInboundChannelAdapter : started com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter@1cd3b138

友情提示:因為 IDEA 默認同一個程序只允許啟動 1 次,所以我們需要配置 DemoProviderApplication 為?Allow parallel run。如下圖所示:

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send?接口三次,發送三條消息。此時在 IDEA 控制臺看到消費者打印日志如下:

// ConsumerApplication 控制臺 01 2020-02-22 09:39:29.073 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=-1682643477}] 2020-02-22 09:41:32.754 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=1890257867}]// ConsumerApplication 控制臺 02 2020-02-22 09:41:32.264 INFO 50534 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:80 消息內容:Demo01Message{id=1401668556}]

符合預期。從日志可以看出,每條消息僅被消費一次。

3.4 測試多集群多實例的場景

本小節,我們會在二個消費者集群啟動兩個實例,測試在集群消費的情況下的表現。

① 執行?ConsumerApplication 兩次,啟動兩個消費者的實例,從而實現在消費者分組?demo01-consumer-group-DEMO-TOPIC-01?下有兩個消費者實例。

② 修改?labx-06-sca-stream-rocketmq-consumer-demo?項目的配置文件,修改?spring.cloud.stream.bindings.demo01-input.group?配置項,將消費者分組改成?X-demo01-consumer-group-DEMO-TOPIC-01。

然后,執行?ConsumerApplication 兩次,再啟動兩個消費者的實例,從而實現在消費者分組?X-demo01-consumer-group-DEMO-TOPIC-01?下有兩個消費者實例。

③ 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send?接口三次,發送三條消息。此時在 IDEA 控制臺看到消費者打印日志如下:

// 消費者分組 `demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制臺 01 2020-02-22 10:17:07.886 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=-276398167}] 2020-02-22 10:17:08.237 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=-250975158}]// 消費者分組 `demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制臺 02 2020-02-22 10:17:08.710 INFO 50534 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:80 消息內容:Demo01Message{id=412281482}]// 消費者分組 `X-demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制臺 01 2020-02-22 10:17:07.887 INFO 51092 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:51 消息內容:Demo01Message{id=-276398167}] 2020-02-22 10:17:08.238 INFO 51092 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:51 消息內容:Demo01Message{id=-250975158}]// 消費者分組 `X-demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制臺 02 2020-02-22 10:17:08.787 INFO 51096 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:77 消息內容:Demo01Message{id=412281482}]

符合預期。從日志可以看出,每條消息被每個消費者集群都進行了消費,且僅被消費一次。

3.5 小結

至此,我們已經完成了 Stream RocketMQ 的快速入門,是不是還是蠻簡答的噢。現在胖友可以在回過頭看看 Binder 和 Binding 的概念,是不是就清晰一些了。

4. 定時消息

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-demo
  • 消費者:labx-06-sca-stream-rocketmq-consumer-demo

在 RocketMQ 中,提供定時消息的功能。

定時消息,是指消息發到 Broker 后,不能立刻被 Consumer 消費,要到特定的時間點或者等待特定的時間后才能被消費。

不過,RocketMQ 暫時不支持任意的時間精度的延遲,而是固化了 18 個延遲級別。如下表格:

延遲級別時間延遲級別時間延遲級別時間
11s73m139m
25s84m1410m
310s95m1520m
430s106m1630m
51m117m171h
62m128m182h

如果胖友想要任一時刻的定時消息,可以考慮借助 MySQL + Job 來實現。又或者考慮使用?DDMQ(滴滴打車基于 RocketMQ 和 Kafka 改造的開源消息隊列)。

下面,我們來搭建一個 RocketMQ 定時消息的使用示例。考慮方便,我們直接復用「2. 快速入門」小節的項目,修改?labx-06-sca-stream-rocketmq-producer-demo?發送定時消息,繼續使用?labx-06-sca-stream-rocketmq-consumer-demo?消費消息。

4.1 Demo01Controller

修改?Demo01Controller?類,增發送定時消息的 HTTP 接口。代碼如下:

private Logger logger = LoggerFactory.getLogger(getClass());@GetMapping("/send_delay") public boolean sendDelay() {// 創建 MessageDemo01Message message = new Demo01Message().setId(new Random().nextInt());// 創建 Spring Message 對象Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // <X> 設置延遲級別為 3,10 秒后消費。.build();// 發送消息boolean sendResult = mySource.demo01Output().send(springMessage);logger.info("[sendDelay][發送消息完成, 結果 = {}]", sendResult);return sendResult; }

在?<X>?處,通過添加頭?MessageConst.PROPERTY_DELAY_TIME_LEVEL,設置消息的延遲級別,從而發送定時消息。

4.2 簡單測試

① 執行?ConsumerApplication,啟動消費者的實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send_delay?接口,發送延遲 10 秒的定時消息。IDEA 控制臺輸出日志如下:

// Producer 的控制臺 2020-02-22 16:32:35.836 INFO 57143 --- [io-18080-exec-5] c.i.s.l.r.p.controller.Demo01Controller : [sendDelay][發送消息完成, 結果 = true]// Consumer 的控制臺 2020-02-22 16:32:45.841 INFO 57133 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:61 消息內容:Demo01Message{id=618574636}]

符合預期。在 Producer 發送的消息之后,Consumer 確實 10 秒后才消費消息。

5. 消費重試

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-demo
  • 消費者:labx-06-sca-stream-rocketmq-consumer-retry

RocketMQ 提供消費重試的機制。在消息消費失敗的時候,RocketMQ 會通過消費重試機制,重新投遞該消息給 Consumer ,讓 Consumer 有機會重新消費消息,實現消費成功。

當然,RocketMQ 并不會無限重新投遞消息給 Consumer 重新消費,而是在默認情況下,達到 16 次重試次數時,Consumer 還是消費失敗時,該消息就會進入到死信隊列

死信隊列用于處理無法被正常消費的消息。當一條消息初次消費失敗,消息隊列會自動進行消息重試;達到最大重試次數后,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息,此時,消息隊列不會立刻將消息丟棄,而是將其發送到該消費者對應的特殊隊列中。

RocketMQ 將這種正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message),將存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)。在 RocketMQ 中,可以通過使用 console 控制臺對死信隊列中的消息進行重發來使得消費者實例再次進行消費。

每條消息的失敗重試,是有一定的間隔時間。實際上,消費重試是基于「5. 定時消息」?來實現,第一次重試消費按照延遲級別為?3?開始。😈 所以,默認為 16 次重試消費,也非常好理解,畢竟延遲級別最高為 18 呀。

不過要注意,只有集群消費模式下,才有消息重試。

下面,我們來搭建一個 RocketMQ 消息重試的使用示例。考慮方便,我們直接復用「2. 快速入門」小節的項目,使用?labx-06-sca-stream-rocketmq-producer-demo?發送消息,從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-retry?來模擬消費失敗后的重試

5.1 復制項目

將「2. 快速入門」小節的?labx-06-sca-stream-rocketmq-consumer-demo,復制出?labx-06-sca-stream-rocketmq-consumer-retry。

5.2 配置文件

修改?application.yml?配置文件,增加消費重試相關的兩個配置項?delay-level-when-next-consume?和?max-attempts。最終配置如下:

spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSONgroup: demo01-consumer-group-DEMO-TOPIC-01 # 消費者分組# Consumer 配置項,對應 ConsumerProperties 類consumer:max-attempts: 1# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: false # 是否使用廣播消費,默認為 false 使用集群消費delay-level-when-next-consume: 0 # 異步消費消息模式下消費失敗重試策略,默認為 0server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者

① 對于?delay-level-when-next-consume?配置項,一共有三種選擇:

  • -1:不重復,直接放入死信隊列
  • 0:RocketMQ Broker 控制重試策略
  • 0:RocketMQ Consumer 控制重試策略

可能胖友對 Broker 和 Consumer 控制重試策略有點懵逼!?每天消息首次消費失敗時,Consumer 會發回給 Broker,并告訴 Broker 按照什么延遲級別開始,不斷重新投遞給 Consumer 直到消費成功或者到達最大延遲級別。

舉個例子,如果這里我們設置了?delay-level-when-next-consume?配置項為 18,則 2 小時后 Broker 會投遞該消息給 Consumer 進行重新消費。

一般情況下,我們設置?delay-level-when-next-consume?配置項為 0 即可,使用 Broker 控制重試策略即可。默認配置下,Broker 會使用延遲級別從 3 開始,10 秒后 Broker 會投遞該消息給 Consumer 進行重新消費。

② 對于?max-attempts?配置項,每次拉取到消息到本地時,如果消費重試,本地重試的最大總次數(包括第一次)。這個是 Spring Cloud Stream 提供的通用消費重試功能,是?Consumer?級別的,而 RocketMQ 提供的獨有消費重試功能,是?Broker?級別的。

因為 Spring Cloud Stream 提供的重試間隔,是通過 sleep 實現,會占掉當前線程,影響 Consumer 的消費速度,所以這里并不推薦使用,因此設置?max-attempts?配置項為 1,禁用 Spring Cloud Stream 提供的重試功能,使用 RocketMQ 提供的重試功能

友情提示:如果胖友無法保證消費重試不會帶來副作用,也就是說無法保證消費的冪等性,建議關閉消費重試功能,即設置?delay-level-when-next-consume?配置項為 -1,max-attempts?配置項為 1。

5.3 Demo01Consumer

修改?Demo01Consumer?類,在消費消息時拋出異常,從而模擬消費錯誤。代碼如下:

@Component public class Demo01Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@StreamListener(MySink.DEMO01_INPUT)public void onMessage(@Payload Demo01Message message) {logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message);// <X> 注意,此處拋出一個 RuntimeException 異常,模擬消費失敗throw new RuntimeException("我就是故意拋出一個異常");}}

5.4 簡單測試

① 執行?ConsumerApplication,啟動消費者的實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send?接口,發送一條消息。IDEA 控制臺輸出日志如下:

// Demo01Consumer 第一次消費失敗,拋出 RuntimeException 異常 2020-02-22 19:18:52.241 INFO 61116 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:69 消息內容:Demo01Message{id=-604160799}] 2020-02-22 19:18:52.245 ERROR 61116 --- [MessageThread_1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: // ... 省略// Demo01Consumer 第一次重試消費失敗,拋出 RuntimeException 異常。間隔了 10 秒,對應延遲級別 3 。 2020-02-22 19:19:02.259 INFO 61116 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:69 消息內容:Demo01Message{id=-604160799}] 2020-02-22 19:19:02.259 ERROR 61116 --- [MessageThread_1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: // ... 省略// Demo01Consumer 第二次重試消費失敗,拋出 RuntimeException 異常。間隔了 30 秒,對應延遲級別 4 。 2020-02-22 19:19:32.266 INFO 61116 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:69 消息內容:Demo01Message{id=-604160799}] 2020-02-22 19:19:32.266 ERROR 61116 --- [MessageThread_1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: // ... 省略// ... 省略,后續還有重試

符合預期。從日志中,我們可以看到,消息因為消費失敗后,又重試消費了多次。

6. 消費異常處理機制

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-demo
  • 消費者:labx-06-sca-stream-rocketmq-consumer-error-handler

在 Spring Cloud Stream 中,提供了通用的消費異常處理機制,可以攔截到消費者消費消息時發生的異常,進行自定義的處理邏輯。

下面,我們來搭建一個 Spring Cloud Stream 消費異常處理機制的示例。考慮方便,我們直接復用「5. 消費重試」小節的項目,使用?labx-06-sca-stream-rocketmq-producer-demo?發送消息,從?labx-06-sca-stream-rocketmq-consumer-retry?復制出?labx-06-sca-stream-rocketmq-consumer-error-handler?來演示消費異常處理機制

6.1 復制項目

將「5. 消費重試」小節的?labx-06-sca-stream-rocketmq-consumer-retry,復制出?labx-06-sca-stream-rocketmq-consumer-error-handler。

6.2 Demo01Consumer

修改?Demo01Consumer?類,增加消費異常處理方法。完整代碼如下:

@Component public class Demo01Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@StreamListener(MySink.DEMO01_INPUT) // 對應 DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01public void onMessage(@Payload Demo01Message message) {logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message);// <X> 注意,此處拋出一個 RuntimeException 異常,模擬消費失敗throw new RuntimeException("我就是故意拋出一個異常");}@ServiceActivator(inputChannel = "DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01.errors")public void handleError(ErrorMessage errorMessage) {logger.error("[handleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));logger.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());logger.error("[handleError][headers:{}]", errorMessage.getHeaders());}@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannelpublic void globalHandleError(ErrorMessage errorMessage) {logger.error("[globalHandleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));logger.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage());logger.error("[globalHandleError][headers:{}]", errorMessage.getHeaders());}}

① 在 Spring Integration 的設定中,若?#onMessage(@Payload Demo01Message message)?方法消費消息發生異常時,會發送錯誤消息(ErrorMessage)到對應的錯誤 Channel(<destination>.<group>.errors)中。同時,所有錯誤 Channel 都橋接到了 Spring Integration 定義的全局錯誤 Channel(errorChannel)

友情提示:先暫時記住 Spring Integration 這樣的設定,艿艿也沒去深究 T T,也是一臉懵逼。

因此,我們有兩種方式來實現異常處理:

  • 局部的異常處理:通過訂閱指定錯誤 Channel
  • 全局的異常處理:通過訂閱全局錯誤 Channel

② 在?#handleError(ErrorMessage errorMessage)?方法上,我們聲明了?@ServiceActivator?注解,訂閱指定錯誤 Channel的錯誤消息,實現?#onMessage(@Payload Demo01Message message)?方法的局部異常處理。如下圖所示:

③ 在?#globalHandleError(ErrorMessage errorMessage)?方法上,我們聲明了?@StreamListener?注解,訂閱全局錯誤 Channel的錯誤消息,實現全局異常處理。

④ 在全局局部異常處理都定義的情況下,錯誤消息僅會被符合條件局部錯誤異常處理。如果沒有符合條件的,錯誤消息才會被全局異常處理。

6.3 簡單測試

① 執行?ConsumerApplication,啟動消費者的實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send?接口,發送一條消息。IDEA 控制臺輸出日志如下:

// onMessage 方法 2020-02-20 00:47:34.487 INFO 67767 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:60 消息內容:Demo01Message{id=-317670393}]// handleError 方法 2020-02-20 00:47:34.496 ERROR 67767 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [handleError][payload:RuntimeException: 我就是故意拋出一個異常] 2020-02-20 00:47:34.496 ERROR 67767 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [handleError][originalMessage:GenericMessage [payload=byte[17], headers={rocketmq_QUEUE_ID=3, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, rocketmq_RECONSUME_TIMES=0, rocketmq_MESSAGE_ID=0A258102FE8918B4AAC2620411310017, rocketmq_SYS_FLAG=0, id=dc6dafb1-b303-7931-5977-45f319b935d9, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, rocketmq_BORN_TIMESTAMP=1582130833713, timestamp=1582130854444}]] 2020-02-20 00:47:34.496 ERROR 67767 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [handleError][headers:{id=cdf37b5d-878c-3d85-1f40-7711a3642a16, timestamp=1582130854489}]

不過要注意,如果異常處理方法成功,沒有重新拋出異常,會認定為該消息被消費成功,所以就不會進行消費重試。

7. 廣播消費

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-demo
  • 消費者:labx-06-sca-stream-rocketmq-consumer-broadcasting

在上述的示例中,我們看到的都是使用集群消費,也是最常用的消費模式。而在一些場景下,我們需要使用廣播消費

廣播消費模式下,相同 Consumer Group 的每個 Consumer 實例都接收全量的消息。

例如說,在應用中,緩存了數據字典等配置表在內存中,可以通過 RocketMQ 廣播消費,實現每個應用節點都消費消息,刷新本地內存的緩存。

又例如說,我們基于 WebSocket 實現了 IM 聊天,在我們給用戶主動發送消息時,因為我們不知道用戶連接的是哪個提供 WebSocket 的應用,所以可以通過 RocketMQ 廣播消費,每個應用判斷當前用戶是否是和自己提供的 WebSocket 服務連接,如果是,則推送消息給用戶。

下面,我們來搭建一個 Spring Cloud Stream 消費異常處理機制的示例。考慮方便,我們直接復用「2. 快速入門」小節的項目,使用?labx-06-sca-stream-rocketmq-producer-demo?發送消息,從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-broadcasting?來演示廣播消費

7.1 復制項目

將「2. 快速入門」小節的?labx-06-sca-stream-rocketmq-consumer-demo,復制出?labx-06-sca-stream-rocketmq-consumer-broadcasting。

7.2 配置文件

修改?application.yml?配置文件,設置?broadcasting?配置項為?true,開啟廣播消費的模式。完整配置如下:

spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSONgroup: demo01-consumer-group-DEMO-TOPIC-01-X # 消費者分組# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: true # 是否使用廣播消費,默認為 false 使用集群消費server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者

7.3 簡單測試

① 執行?ConsumerApplication 兩次,啟動兩個消費者的實例,從而實現在消費者分組?demo01-consumer-group-DEMO-TOPIC-01?下有兩個消費者實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send?接口三次,發送三條消息。此時在 IDEA 控制臺看到消費者打印日志如下:

// ConsumerApplication 控制臺 01 2020-02-20 01:20:06.886 INFO 68510 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=-335590634}] 2020-02-20 01:20:18.368 INFO 68510 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=283364059}] 2020-02-20 01:20:24.422 INFO 68510 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=-1253930234}]// ConsumerApplication 控制臺 02 2020-02-20 01:20:06.884 INFO 68519 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:75 消息內容:Demo01Message{id=-335590634}] 2020-02-20 01:20:18.368 INFO 68519 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:75 消息內容:Demo01Message{id=283364059}] 2020-02-20 01:20:24.422 INFO 68519 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:75 消息內容:Demo01Message{id=-1253930234}]

符合預期。從日志可以看出,每條消息僅被每個消費者消費了一次。

8. 順序消息

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-orderly
  • 消費者:labx-06-sca-stream-rocketmq-consumer-orderly

RocketMQ 提供了兩種順序級別:

  • 普通順序消息:Producer 將相關聯的消息發送到相同的消息隊列。
  • 完全嚴格順序:在【普通順序消息】的基礎上,Consumer 嚴格順序消費。

官方文檔是這么描述的:

消息有序,指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了三條消息分別是訂單創建、訂單付款、訂單完成。消費時要按照這個順序消費才能有意義,但是同時訂單之間是可以并行消費的。RocketMQ 可以嚴格的保證消息有序。

順序消息分為全局順序消息與分區順序消息,全局順序是指某個 Topic 下的所有消息都要保證順序;部分順序消息只要保證每一組消息被順序消費即可。

  • 全局順序:對于指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行發布和消費。適用場景:性能要求不高,所有的消息嚴格按照 FIFO 原則進行消息發布和消費的場景
  • 分區順序:對于指定的一個 Topic,所有消息根據 Sharding key 進行區塊分區。 同一個分區內的消息按照嚴格的 FIFO 順序進行發布和消費。Sharding key 是順序消息中用來區分不同分區的關鍵字段,和普通消息的 Key 是完全不同的概念。適用場景:性能要求高,以 Sharding key 作為分區字段,在同一個區塊中嚴格的按照 FIFO 原則進行消息發布和消費的場景。

注意,分區順序就是普通順序消息,全局順序就是完全嚴格順序。

下面,我們來搭建一個 Spring Cloud Stream 消費異常處理機制的示例。考慮方便,我們直接復用「2. 快速入門」小節的項目:

  • 從?labx-06-sca-stream-rocketmq-producer-demo?復制出?labx-06-sca-stream-rocketmq-producer-orderly?來演示發送順序消息
  • 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-broadcasting?來演示順序消費消息

8.1 搭建生產者

從?labx-06-sca-stream-rocketmq-producer-demo?復制出?labx-06-sca-stream-rocketmq-producer-orderly?來演示發送順序消息

8.1.1 配置文件

修改?application.yml?配置文件,添加?partition-key-expression?配置項,設置 Producer 發送順序消息的 Sharding key。完整配置如下:

spring:application:name: demo-producer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-output:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSON# Producer 配置項,對應 ProducerProperties 類producer:partition-key-expression: payload['id'] # 分區 key 表達式。該表達式基于 Spring EL,從消息中獲得分區 key。# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-output:# RocketMQ Producer 配置項,對應 RocketMQProducerProperties 類producer:group: test # 生產者分組sync: true # 是否同步發送消息,默認為 false 異步。server:port: 18080

①?partition-key-expression?配置項,該表達式基于?Spring EL,從消息中獲得 Sharding key。

這里,我們設置該配置項為?payload['id'],表示從 Spring?Message?的 payload 的?id。稍后我們發送的消息的 payload 為 Demo01Message,那么?id?就是?Demo01Message.id。

如果我們想從消息的 headers 中獲得 Sharding key,可以設置為?headers['partitionKey']。

② Spring Cloud Stream 使用?PartitionHandler?進行 Sharding key 的獲得與計算,最終 Sharding key 的結果為?key.hashCode() % partitionCount。

感興趣的胖友,可以閱讀 PartitionHandler 的?#determinePartition(Message<?> message)?方法。

在獲取到 Sharding key 之后,Spring Cloud Alibaba Stream RocketMQ 提供的?PartitionMessageQueueSelector?選擇消息發送的隊列。

我們以發送一條?id?為 1 的 Demo01Message 消息為示例,最終會發送到對應 RocketMQ Topic 的隊列為 1。計算過程如下:

// 第一步,PartitionHandler 使用 `partition-key-expression` 表達式,從 Message 中獲得 Sharding key key => 1// 第二步,PartitionHandler 計算最終的 Sharding key // 默認情況下,每個 RocketMQ Topic 的隊列總數是 4。 key => key.hashCode() % partitionCount = 1.hashCode() % 4 = 1 % 4 = 1// 第三步,PartitionMessageQueueSelector 獲得對應 RocketMQ Topic 的隊列 隊列 => queues.get(key) = queues.get(1)

這樣,我們就能保證相同 Sharding Key?的消息,發送到相同的對應 RocketMQ Topic 的隊列中。當前,前提是該 Topic 的隊列總數不能變噢,不然計算的 Sharding Key 會發生變化。

8.1.2 Demo01Controller

修改?Demo01Controller?類,增加發送 3 條順序消息的 HTTP 接口。代碼如下:

@GetMapping("/send_orderly") public boolean sendOrderly() {// 發送 3 條相同 id 的消息int id = new Random().nextInt();for (int i = 0; i < 3; i++) {// 創建 MessageDemo01Message message = new Demo01Message().setId(id);// 創建 Spring Message 對象Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).build();// 發送消息mySource.demo01Output().send(springMessage);}return true; }

每次發送的 3 條消息使用相同的?id,配合上我們使用它作為 Sharding key,就可以發送對應 Topic 的相同隊列中。

另外,整列發送的雖然是順序消息,但是和發送普通消息的代碼是一模一樣的。

8.2 搭建消費者

從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-broadcasting?來演示順序消費消息

8.2.1 配置文件

修改?application.yml?配置文件,添加?orderly?配置項,設置 Consumer 順序消費消息。完整配置如下:

spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSONgroup: demo01-consumer-group-DEMO-TOPIC-01 # 消費者分組# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: false # 是否使用廣播消費,默認為 false 使用集群消費orderly: true # 是否順序消費,默認為 false 并發消費。server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者

8.2.2 Demo01Consumer

修改?Demo01Consumer?類,在消費消息時,打印出消息所在隊列編號線程編號,這樣我們通過隊列編號可以判斷消息是否順序發送,通過線程編號可以判斷消息是否順序消費。代碼如下:

@Component public class Demo01Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@StreamListener(MySink.DEMO01_INPUT)public void onMessage(Message<?> message) {logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message);}}

8.3 簡單測試

① 執行?ConsumerApplication,啟動消費者的實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send_orderly?接口,發送順序消息。IDEA 控制臺輸出日志如下:

2020-02-20 21:26:52.044 INFO 74637 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:76 消息內容:GenericMessage [payload={"id":58569988}, headers={rocketmq_QUEUE_ID=0, rocketmq_RECONSUME_TIMES=0, scst_partition=0, rocketmq_BORN_TIMESTAMP=1582205212037, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=0A25810236DE18B4AAC26672FD850006, rocketmq_SYS_FLAG=0, id=945725a1-abfb-218a-d480-b220adff9549, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, timestamp=1582205212044}]] 2020-02-20 21:26:52.046 INFO 74637 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:76 消息內容:GenericMessage [payload={"id":58569988}, headers={rocketmq_QUEUE_ID=0, rocketmq_RECONSUME_TIMES=0, scst_partition=0, rocketmq_BORN_TIMESTAMP=1582205212039, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=0A25810236DE18B4AAC26672FD870007, rocketmq_SYS_FLAG=0, id=86a0e912-3cba-8b5b-3928-a7ef0ad80036, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, timestamp=1582205212046}]] 2020-02-20 21:26:52.046 INFO 74637 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:76 消息內容:GenericMessage [payload={"id":58569988}, headers={rocketmq_QUEUE_ID=0, rocketmq_RECONSUME_TIMES=0, scst_partition=0, rocketmq_BORN_TIMESTAMP=1582205212041, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=0A25810236DE18B4AAC26672FD890008, rocketmq_SYS_FLAG=0, id=b04416a3-60c2-bf42-a5a4-fe3c5079cc55, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, timestamp=1582205212046}]]

id?為 58569988 的消息被發送到 RocketMQ 消息隊列編號為 0,并且在線程編號為 76 的線程中消費。😈 胖友可以自己在多調用幾次接口,繼續嘗試。

9. 消息過濾

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-demo
  • 消費者:labx-06-sca-stream-rocketmq-consumer-filter

RocketMQ 提供了兩種方式給 Consumer 進行消息的過濾:

  • 基于 Tag 過濾

    標簽(Tag):為消息設置的標志,用于同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標簽。標簽能夠有效地保持代碼的清晰度和連貫性,并優化 RocketMQ 提供的查詢系統。消費者可以根據 Tag 實現對不同子主題的不同消費邏輯,實現更好的擴展性。

  • 基于?SQL92?過濾

消息過濾目前是在?Broker?端實現的,優點是減少了 Broker 和 Consumer 之間的無用消息的網絡傳輸,缺點是增加了 Broker 的負擔、而且實現相對復雜。

一般情況下,我們使用 Tag 過濾較多,我們來搭建一個 RocketMQ 使用 Tag 進行消息過濾的示例。考慮方便,我們直接復用「2. 快速入門」小節的項目:

  • 修改?labx-06-sca-stream-rocketmq-producer-demo?發送帶有 Tag 的消息
  • 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-filter?來使用 Tag 過濾消息來消費。

先搭建消費者。

9.1 Demo01Controller

修改?Demo01Controller?類,增加發送 3 條帶 Tag 的消息的 HTTP 接口。代碼如下:

@GetMapping("/send_tag") public boolean sendTag() {for (String tag : new String[]{"yunai", "yutou", "tudou"}) {// 創建 MessageDemo01Message message = new Demo01Message().setId(new Random().nextInt());// 創建 Spring Message 對象Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).setHeader(MessageConst.PROPERTY_TAGS, tag) // <X> 設置 Tag.build();// 發送消息mySource.demo01Output().send(springMessage);}return true; }

在?<X>?處,通過添加頭?MessageConst.PROPERTY_TAGS,設置發送消息的?Tag

再搭建消費者。

9.2 復制項目

從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-filter?來使用 Tag 過濾消息來消費。

9.3 配置文件

修改?application.yml?配置文件,設置?tags?配置項為?yunai || yutou,只消費帶有 Tag 為?yunai?或?yutou?的消息。完整配置如下:

spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSONgroup: demo01-consumer-group-DEMO-TOPIC-01 # 消費者分組# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: false # 是否使用廣播消費,默認為 false 使用集群消費tags: yunai || yutou # 基于 Tag 訂閱,多個 Tag 使用 || 分隔,默認為空sql: # 基于 SQL 訂閱,默認為空server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者

如果胖友想要基于 SQL92 過濾消息,可以通過設置?sql?配置項。

9.4 簡單測試

① 執行?ConsumerApplication,啟動消費者的實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send_tag?接口,發送帶有 Tag 的消息。IDEA 控制臺輸出日志如下:

2020-02-20 22:41:57.639 INFO 81013 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:76 消息內容:Demo01Message{id=687868446}] 2020-02-20 22:41:57.641 INFO 81013 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:76 消息內容:Demo01Message{id=1088622557}]

只消費了兩條消息,目測 Tag 為?tudou?的消息已經被過濾了。要注意,被過濾掉的消息,后續是無法被消費掉了,效果和消費成功是一樣的。

9.5 Demo01Consumer

咳咳咳:不知道如何取這標題,暫時用這個噶。

上面我們看到的是 RocketMQ?獨有的?Broker級別的消息過濾機制,而 Spring Cloud Stream 提供了通用的?Consumer?級別的效率過濾器機制。我們只需要使用?@StreamListener?注解的?condition?屬性,設置消息滿足指定 Spring EL 表達式的情況下,才進行消費。

> /** > * A condition that must be met by all items that are dispatched to this method. > * @return a SpEL expression that must evaluate to a {@code boolean} value. > */ > String condition() default ""; >

修改?Demo01Consumer?類,使用?@StreamListener?注解的?condition?屬性來過濾消息。代碼如下:

@Component public class Demo01Consumer {private Logger logger = LoggerFactory.getLogger(getClass());// @StreamListener(MySink.DEMO01_INPUT) // public void onMessage(@Payload Demo01Message message) { // logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message); // }@StreamListener(value = MySink.DEMO01_INPUT, condition = "headers['rocketmq_TAGS'] == 'yunai'")public void onMessage(@Payload Demo01Message message) {logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message);}}

這里我們設置消息的 Header 帶有的?rocketmq_TAGS?值為?yunai?時,才進行消費。

9.6 再次測試

① 執行?ConsumerApplication,啟動消費者的實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send_tag?接口,發送帶有 Tag 的消息。IDEA 控制臺輸出日志如下:

/ Tag 為 `yunai` 的消息被消費 2020-02-20 22:59:11.597 INFO 81438 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:64 消息內容:Demo01Message{id=124549390}]// Tag 為 `yutou` 的消息被過濾 2020-02-20 22:59:11.599 WARN 81438 --- [MessageThread_1] .DispatchingStreamListenerMessageHandler : Cannot find a @StreamListener matching for message with id: 5edff575-b9a7-e011-154a-532077994685

只消費了一條消息,目測 Tag 為?tudou?的消息被 Broker 過濾,Tag 為?yutou?的消息被 Consumer 過濾。要注意,被過濾掉的消息,后續是無法被消費掉了,效果和消費成功是一樣的。

10. 事務消息

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-transaction
  • 消費者:labx-06-sca-stream-rocketmq-consumer-demo

在分布式消息隊列中,目前唯一提供完整的事務消息的,只有 RocketMQ 。關于這一點,還是可以鼓吹下的。

可能會有胖友怒噴艿艿,RabbitMQ 和 Kafka 也有事務消息啊,也支持發送事務消息的發送,以及后續的事務消息的 commit提交或 rollbackc 回滾。但是要考慮一個極端的情況,在本地數據庫事務已經提交的時時候,如果因為網絡原因,又或者崩潰等等意外,導致事務消息沒有被 commit ,最終導致這條事務消息丟失,分布式事務出現問題。

相比來說,RocketMQ 提供事務回查機制,如果應用超過一定時長未 commit 或 rollback 這條事務消息,RocketMQ 會主動回查應用,詢問這條事務消息是 commit 還是 rollback ,從而實現事務消息的狀態最終能夠被 commit 或是 rollback ,達到最終事務的一致性。

這也是為什么艿艿在上面專門加粗“完整的”三個字的原因。可能上述的描述,對于絕大多數沒有了解過分布式事務的胖友,會比較陌生,所以推薦閱讀如下兩篇文章:

  • 《阿里云消息隊列 MQ —— 事務消息》
  • 《芋道 RocketMQ 源碼解析 —— 事務消息》

熱心的艿艿:雖然說 RabbitMQ、Kafka 并未提供完整的事務消息,但是社區里,已經基于它們之上拓展,提供了事務回查的功能。例如說:Myth?,采用消息隊列解決分布式事務的開源框架, 基于 Java 語言來開發(JDK1.8),支持 Dubbo,Spring Cloud,Motan 等 RPC 框架進行分布式事務。

下面,我們來搭建一個 RocketMQ 定時消息的使用示例。考慮方便,我們直接復用「2. 快速入門」小節的項目,修改?labx-06-sca-stream-rocketmq-producer-transaction?發送事務消息,繼續使用?labx-06-sca-stream-rocketmq-consumer-demo?消費消息。

10.1 復制項目

從?labx-06-sca-stream-rocketmq-producer-demo?復制出?labx-06-sca-stream-rocketmq-producer-transaction?來發送事務消息

10.2 配置文件

修改?application.yml?配置文件,添加?transactional?配置項為?true,設置 Producer 發送事務消息。完整配置如下:

spring:application:name: demo-producer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-output:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSON# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-output:# RocketMQ Producer 配置項,對應 RocketMQProducerProperties 類producer:group: test # 生產者分組sync: true # 是否同步發送消息,默認為 false 異步。transactional: true # 是否發送事務消息,默認為 false。server:port: 18080

10.3 Demo01Controller

修改?Demo01Controller?類,增加發送事務消息的 HTTP 接口。代碼如下:

@GetMapping("/send_transaction") public boolean sendTransaction() {// 創建 MessageDemo01Message message = new Demo01Message().setId(new Random().nextInt());// 創建 Spring Message 對象Args args = new Args().setArgs1(1).setArgs2("2");Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).setHeader("args", JSON.toJSONString(args)) // <X>.build();// 發送消息return mySource.demo01Output().send(springMessage); }public static class Args { // 這里作為示例,所以直接這么寫了private Integer args1;private String args2;// ... 省略 setter、getter、toString 方法 }

因為 Spring Cloud Stream 在設計時,并沒有考慮事務消息,所以我們只好在?<X>?處,通過 Header 傳遞參數。

又因為 Header 后續會被轉換成 String 類型,導致我們無法獲得正確的真實的原始參數,所以這里我們先使用 JSON 將?args?參數序列化成字符串,這樣后續我們可以使用 JSON 反序列化回來。

10.4 TransactionListenerImpl

創建?TransactionListenerImpl?類,實現 MQ 事務的監聽。代碼如下:

@RocketMQTransactionListener(txProducerGroup = "test") public class TransactionListenerImpl implements RocketMQLocalTransactionListener {private Logger logger = LoggerFactory.getLogger(getClass());@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 從消息 Header 中解析到 args 參數,并使用 JSON 反序列化Demo01Controller.Args args = JSON.parseObject(msg.getHeaders().get("args", String.class),Demo01Controller.Args.class);// ... local transaction process, return rollback, commit or unknownlogger.info("[executeLocalTransaction][執行本地事務,消息:{} args:{}]", msg, args);return RocketMQLocalTransactionState.UNKNOWN;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// ... check transaction status and return rollback, commit or unknownlogger.info("[checkLocalTransaction][回查消息:{}]", msg);return RocketMQLocalTransactionState.COMMIT;}}

① 在類上,添加?@RocketMQTransactionListener?注解,聲明監聽器的是生產者分組是?"test"?的 Producer 發送的事務消息。因為 RocketMQ 是回查(請求)指定指定生產分組下的 Producer,從而獲得事務消息的狀態,所以一定要正確設置。

② 實現?RocketMQLocalTransactionListener?接口,實現執行本地事務和檢查本地事務的方法。

③ 實現?#executeLocalTransaction(...)?方法,實現執行本地事務。

  • 注意,這是一個模板方法。在調用這個方法之前,Spring Cloud Alibaba Stream RocketMQ 已經使用 Producer 發送了一條事務消息。然后根據該方法執行的返回的?RocketMQLocalTransactionState?結果,提交還是回滾該事務消息。

    友情提示:感興趣的胖友,可以看看 DefaultMQProducerImpl 的?#sendMessageInTransaction(...)?的源碼,整個模板方法是怎么執行的。

  • 😈 這里,我們為了模擬 RocketMQ 回查 Producer 來獲得事務消息的狀態,所以返回了?RocketMQLocalTransactionState.UNKNOWN?未知狀態。

④ 實現?#checkLocalTransaction(...)?方法,檢查本地事務。

  • 在事務消息長事件未被提交或回滾時,RocketMQ 會回查事務消息對應的生產者分組下的 Producer ,獲得事務消息的狀態。此時,該方法就會被調用。
  • 😈 這里,我們直接返回?RocketMQLocalTransactionState.COMMIT?提交狀態。

一般來說,有兩種方式實現本地事務回查時,返回事務消息的狀態。

第一種,通過?msg?消息,獲得某個業務上的標識或者編號,然后去數據庫中查詢業務記錄,從而判斷該事務消息的狀態是提交還是回滾。

第二種,記錄?msg?的事務編號,與事務狀態到數據庫中。

  • 第一步,在?#executeLocalTransaction(...)?方法中,先存儲一條?id?為?msg?的事務編號,狀態為?RocketMQLocalTransactionState.UNKNOWN?的記錄。
  • 第二步,調用帶有事務的業務 Service 的方法。在該 Service 方法中,在邏輯都執行成功的情況下,更新?id?為?msg?的事務編號,狀態變更為?RocketMQLocalTransactionState.COMMIT?。這樣,我們就可以伴隨這個事務的提交,更新?id?為?msg?的事務編號的記錄的狀為?RocketMQLocalTransactionState.COMMIT?,美滋滋。。
  • 第三步,要以?try-catch?的方式,調用業務 Service 的方法。如此,如果發生異常,回滾事務的時候,可以在?catch?中,更新?id?為?msg?的事務編號的記錄的狀態為?RocketMQLocalTransactionState.ROLLBACK?。😭 極端情況下,可能更新失敗,則打印 error 日志,告警知道,人工介入。
  • 如此三步之后,我們在?#executeLocalTransaction(...)?方法中,就可以通過查找數據庫,id?為?msg?的事務編號的記錄的狀態,然后返回。

相比來說,艿艿傾向第二種,實現更加簡單通用,對于業務開發者,更加友好。和有幾個朋友溝通了下,他們也是采用第二種。

10.5 簡單測試

① 執行?ConsumerApplication,啟動消費者的實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send_transaction?接口,發送事務消息。IDEA 控制臺輸出日志如下:

// ProduerApplication 控制臺 // ### TransactionListenerImpl 執行 executeLocalTransaction 方法,先執行本地事務的邏輯 2020-02-21 00:14:08.773 INFO 83052 --- [io-18080-exec-1] c.i.s.l.r.p.l.TransactionListenerImpl : [executeLocalTransaction][執行本地事務,消息:GenericMessage [payload=byte[17], headers={args={"args1":1,"args2":"2"}, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=0A258102446C18B4AAC2670C237B0000, id=d8604733-9083-5d19-15b4-bda0c549e9d1, contentType=application/json, timestamp=1582215248772}] args:Args{args1=1, args2='2'}] // ### Producer 發送事務消息成功,但是因為 executeLocalTransaction 方法返回的是 UNKOWN 狀態,所以事務消息并未提交或者回滾 // ### RocketMQ Broker 在發送事務消息 30 秒后,發現事務消息還未提交或是回滾,所以回查 Producer 。此時,checkLocalTransaction 方法返回 COMMIT ,所以該事務消息被提交 2020-02-21 00:14:48.685 INFO 83052 --- [pool-1-thread-1] c.i.s.l.r.p.l.TransactionListenerImpl : [checkLocalTransaction][回查消息:GenericMessage [payload=byte[17], headers={rocketmq_QUEUE_ID=0, TRANSACTION_CHECK_TIMES=1, rocketmq_BORN_TIMESTAMP=1582215248763, args={"args1":1,"args2":"2"}, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=0A25810200002A9F000000000002868F, rocketmq_TRANSACTION_ID=0A258102446C18B4AAC2670C237B0000, rocketmq_SYS_FLAG=0, id=62383992-5015-f957-41e7-75ec5ace4496, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, timestamp=1582215288685}]]// ConsumerApplication 控制臺 // ### 事務消息被提交,所以該消息被 Consumer 消費 2020-02-21 00:14:48.756 INFO 83058 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:79 消息內容:Demo01Message{id=1950986029}]

整個的執行過程,看看艿艿在日志上添加的說明。

11. 監控端點

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-actuator
  • 消費者:labx-06-sca-stream-rocketmq-consumer-actuator

Spring Cloud Stream 的?endpoint?模塊,基于 Spring Boot Actuator,提供了自定義監控端點?bindings?和?channels,用于獲取 Spring Cloud Stream 的 Binding 和 Channel 信息。

同時,Spring Cloud Alibaba Stream RocketMQ 拓展了 Spring Boot Actuator 內置的?health?端點,通過自定義的?RocketMQBinderHealthIndicator,獲取 RocketMQ 客戶端的健康狀態。

友情提示:對 Spring Boot Actuator 不了解的胖友,可以后續閱讀《芋道 Spring Boot 監控端點 Actuator 入門》文章。

我們來搭建一個 Stream RocketMQ 監控端點的使用示例。考慮方便,我們直接復用「2. 快速入門」小節的項目:

  • 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-actuator,查看生產者的監控端點結果。
  • 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-filter,查看消費者的監控端點結果。

11.1 搭建生產者

從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-actuator,查看生產者的監控端點結果。

11.1.1 引入依賴

在?pom.xml?文件中,額外引入 Spring Boot Actuator 相關依賴。代碼如下:

<!-- 實現對 Actuator 的自動化配置 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId> </dependency>

11.1.2 配置文件

修改?application.yaml?配置文件,額外增加 Spring Boot Actuator 配置項。配置如下:

management:endpoints:web:exposure:include: '*' # 需要開放的端點。默認值只打開 health 和 info 兩個端點。通過設置 * ,可以開放所有端點。endpoint:# Health 端點配置項,對應 HealthProperties 配置類health:enabled: true # 是否開啟。默認為 true 開啟。show-details: ALWAYS # 何時顯示完整的健康信息。默認為 NEVER 都不展示。可選 WHEN_AUTHORIZED 當經過授權的用戶;可選 ALWAYS 總是展示。

每個配置項的作用,胖友看下艿艿添加的注釋。如果還不理解的話,后續看下《芋道 Spring Boot 監控端點 Actuator 入門》文章。

11.1.3 簡單測試

① 使用 ProducerApplication 啟動生產者。

② 訪問應用的?bindings?監控端點?http://127.0.0.1:18080/actuator/bindings,返回結果如下圖:

③ 訪問應用的?channels?監控端點?http://127.0.0.1:18080/actuator/channels,返回結果如下圖:

④ 訪問應用的?health?監控端點?http://127.0.0.1:18080/actuator/health,返回結果如下圖:

11.2 搭建消費者

從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-filter,查看消費者的監控端點結果。

11.2.1 引入依賴

在?pom.xml?文件中,額外引入 Spring Boot Actuator 相關依賴。代碼如下:

<!-- 實現對 Actuator 的自動化配置 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId> </dependency>

11.2.2 配置文件

修改?application.yaml?配置文件,額外增加 Spring Boot Actuator 配置項。配置如下:

management:endpoints:web:exposure:include: '*' # 需要開放的端點。默認值只打開 health 和 info 兩個端點。通過設置 * ,可以開放所有端點。endpoint:# Health 端點配置項,對應 HealthProperties 配置類health:enabled: true # 是否開啟。默認為 true 開啟。show-details: ALWAYS # 何時顯示完整的健康信息。默認為 NEVER 都不展示。可選 WHEN_AUTHORIZED 當經過授權的用戶;可選 ALWAYS 總是展示。

每個配置項的作用,胖友看下艿艿添加的注釋。如果還不理解的話,后續看下《芋道 Spring Boot 監控端點 Actuator 入門》文章。

112.3 簡單測試

① 使用 ConsumerApplication 啟動消費者,隨機端口為 19541。

② 訪問應用的?bindings?監控端點?http://127.0.0.1:19541/actuator/bindings,返回結果如下圖:

③ 訪問應用的?channels?監控端點?http://127.0.0.1:19541/actuator/channels,返回結果如下圖:

④ 訪問應用的?health?監控端點?http://127.0.0.1:19541/actuator/health,返回結果如下圖:

12. 更多的配置項信息

Spring Cloud Alibaba Stream RocketMQ 提供的配置項挺多的,我們參考文檔將配置項一起梳理下。

RocketMQ Binder Properties

以?spring.cloud.stream.rocketmq.binder?為前綴。

配置項說明默認值
name-serverRocketMQ NameServer 地址127.0.0.1:9876
access-key阿里云賬號 AccessKey
secret-key阿里云賬號 SecretKey
enable-msg-trace是否為 Producer 和 Consumer 開啟消息軌跡功能true
customized-trace-topic消息軌跡開啟后存儲的 Topic 名稱RMQ_SYS_TRACE_TOPIC

RocketMQ Consumer Properties

以?spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.?為前綴。

配置項說明默認值
enable是否啟用 Consumertrue
tagsConsumer 基于 TAGS 訂閱,多個 tag 以?||?分割
sqlConsumer 基于 SQL 訂閱
broadcasting是Consumer 是否是廣播消費模式。如果想讓所有的訂閱者都能接收到消息,可以使用廣播模式false
orderlyConsumer 是否同步消費消息模式false
delayLevelWhenNextConsume異步消費消息模式下消費失敗重試策略:-1, 不重復,直接放入死信隊列;0, Broker 控制重試策略;>0, Client 控制重試策略0
suspendCurrentQueueTimeMillis同步消費消息模式下消費失敗后再次消費的時間間隔1000

RocketMQ Provider Properties

配置項說明默認值
enable是否啟用 Producertrue
groupProducer 分組
maxMessageSize消息發送的最大字節數8249344
transactional是否發送事務消息false
sync是否使用同步得方式發送消息false
vipChannelEnabled是否在 Vip Channel 上發送消息true
sendMessageTimeout發送消息的超時時間(毫秒)3000
compressMessageBodyThreshold消息體壓縮閥值(當消息體超過 4k 的時候會被壓縮)4096
retryTimesWhenSendFailed在同步發送消息的模式下,消息發送失敗的重試次數2
retryTimesWhenSendAsyncFailed在異步發送消息的模式下,消息發送失敗的重試次數2
retryNextServer消息發送失敗的情況下是否重試其它的 Brokerfalse

13.接入阿里云的消息隊列 RocketMQ

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-aliyun
  • 消費者:labx-06-sca-stream-rocketmq-consumer-aliyun

在阿里云上,提供消息隊列?RocketMQ?服務。那么,我們是否能夠使用 Spring Cloud Alibaba Stream RocketMQ 實現阿里云 RocketMQ 的消息的發送與消費呢?

答案是可以。在?《阿里云 —— 消息隊列 MQ —— 開源 Java SDK 接入說明》?中,提到目前開源的 Java SDK 可以接入阿里云 RocketMQ 服務。

如果您已使用開源 Java SDK 進行生產,只需參考方法,重新配置參數,即可實現無縫上云。

前提條件

  • 已在阿里云 MQ 控制臺創建資源,包括 Topic、Group ID(GID)、接入點(Endpoint),以及 AccessKeyId 和 AccessKeySecret。
  • 已下載開源 RocketMQ 4.5.1 或以上版本,以支持連接阿里云 MQ。

我們來搭建一個 Stream RocketMQ 監控端點的使用示例。考慮方便,我們直接復用「2. 快速入門」小節的項目:

  • 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-aliyun,接入阿里云 RocketMQ 作為生產者
  • 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-aliyun,接入阿里云 RocketMQ 作為消費者

13.1 搭建生產者

從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-aliyun,接入阿里云 RocketMQ 作為生產者

修改?application.yaml?配置文件,添加?access-key、secret-key?配置項,設置訪問阿里云 RocketMQ 的賬號。完全配置如下:

spring:application:name: demo-producer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-output:destination: TOPIC_YUNAI_TEST # 目的地。這里使用 RocketMQ Topic <ALIYUN>content-type: application/json # 內容格式。這里使用 JSON# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80 # RocketMQ Namesrv 地址 <ALIYUN>access-key: ${ALIYUN_ACCESS_KEY} # 阿里云賬號 AccessKeysecret-key: ${ALIYUN_SECRET_KEY} # 阿里云賬號 SecretKey# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-output:# RocketMQ Producer 配置項,對應 RocketMQProducerProperties 類producer:group: GID_PRODUCER_GROUP_YUNAI_TEST # 生產者分組 <ALIYUN>sync: true # 是否同步發送消息,默認為 false 異步。server:port: 18080

注意,<ALIYUN>?處的三個配置項,也要修改成阿里云 RocketMQ 的 Namesrv、Topic、Producer Group。

13.2 搭建消費者

從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-aliyun,接入阿里云 RocketMQ 作為消費者

修改?application.yaml?配置文件,添加?access-key、secret-key?配置項,設置訪問阿里云 RocketMQ 的賬號。完全配置如下:

spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: TOPIC_YUNAI_TEST # 目的地。這里使用 RocketMQ Topic <ALIYUN>content-type: application/json # 內容格式。這里使用 JSONgroup: GID_PRODUCER_GROUP_YUNAI_TEST # 消費者分組 <ALIYUN># Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80 # RocketMQ Namesrv 地址 <ALIYUN>access-key: ${ALIYUN_ACCESS_KEY} # 阿里云賬號 AccessKeysecret-key: ${ALIYUN_SECRET_KEY} # 阿里云賬號 SecretKey# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: false # 是否使用廣播消費,默認為 false 使用集群消費server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者

注意,<ALIYUN>?處的三個配置項,也要修改成阿里云 RocketMQ 的 Namesrv、Topic、Consumer Group。

13.3 簡單測試

① 執行?ConsumerApplication,啟動消費者的實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send?接口,發送消息。IDEA 控制臺輸出日志如下:

// ConsumerApplication 控制臺 2020-02-21 01:45:16.982 INFO 85901 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:89 消息內容:Demo01Message{id=-724066118}]

總結

以上是生活随笔為你收集整理的Spring Cloud Alibaba RocketMQ 快速入门的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

aⅴ亚洲 日韩 色 图网站 播放 | 永久黄网站色视频免费直播 | 国产超级va在线观看视频 | 亚洲精品鲁一鲁一区二区三区 | 亚洲小说春色综合另类 | 18禁止看的免费污网站 | 成人毛片一区二区 | 国产在热线精品视频 | 一本久道久久综合狠狠爱 | 无码国内精品人妻少妇 | 成人无码视频免费播放 | 人人澡人人妻人人爽人人蜜桃 | 成人性做爰aaa片免费看 | 亚洲国产精品久久久久久 | 国产女主播喷水视频在线观看 | 亚洲乱亚洲乱妇50p | 性史性农村dvd毛片 | 麻花豆传媒剧国产免费mv在线 | 精品无码一区二区三区的天堂 | 狂野欧美性猛xxxx乱大交 | 国产精品久久久久无码av色戒 | 国产免费观看黄av片 | 中国女人内谢69xxxx | 亚洲色大成网站www国产 | 国产精品免费大片 | 久久综合九色综合欧美狠狠 | 国产精品福利视频导航 | 国产精品毛片一区二区 | 日产国产精品亚洲系列 | 日日鲁鲁鲁夜夜爽爽狠狠 | 国产精品手机免费 | 少妇被粗大的猛进出69影院 | 天天躁夜夜躁狠狠是什么心态 | 亚洲国产综合无码一区 | 国产艳妇av在线观看果冻传媒 | 白嫩日本少妇做爰 | 麻豆蜜桃av蜜臀av色欲av | 国产手机在线αⅴ片无码观看 | 骚片av蜜桃精品一区 | 2020久久超碰国产精品最新 | 色婷婷av一区二区三区之红樱桃 | 少妇被黑人到高潮喷出白浆 | 最新国产麻豆aⅴ精品无码 | 久久亚洲国产成人精品性色 | 亚洲国产精华液网站w | 亚洲男人av香蕉爽爽爽爽 | 亚洲日本在线电影 | 思思久久99热只有频精品66 | 亚洲区欧美区综合区自拍区 | 免费男性肉肉影院 | 国产成人无码av片在线观看不卡 | 中文字幕人妻无码一区二区三区 | 久久99热只有频精品8 | 国产综合久久久久鬼色 | 永久免费观看美女裸体的网站 | 国产成人无码a区在线观看视频app | 国产香蕉97碰碰久久人人 | 亚洲七七久久桃花影院 | 久精品国产欧美亚洲色aⅴ大片 | 天天拍夜夜添久久精品大 | 青青草原综合久久大伊人精品 | 国产人妻人伦精品1国产丝袜 | 男人的天堂2018无码 | 欧美自拍另类欧美综合图片区 | 亚洲欧美国产精品专区久久 | 人人爽人人爽人人片av亚洲 | 国产激情一区二区三区 | 国内精品久久毛片一区二区 | 久久久久av无码免费网 | 日本高清一区免费中文视频 | 久久精品国产99精品亚洲 | 无码纯肉视频在线观看 | 丰满岳乱妇在线观看中字无码 | 大地资源中文第3页 | 国产成人无码区免费内射一片色欲 | 影音先锋中文字幕无码 | 日本一区二区三区免费播放 | 中文无码精品a∨在线观看不卡 | 欧美黑人巨大xxxxx | 久久久久成人精品免费播放动漫 | 国产sm调教视频在线观看 | 国产精品自产拍在线观看 | 成人免费视频一区二区 | 国产在线一区二区三区四区五区 | 久久综合色之久久综合 | 欧美老人巨大xxxx做受 | 好爽又高潮了毛片免费下载 | 日本精品久久久久中文字幕 | 欧美国产亚洲日韩在线二区 | 色一情一乱一伦一区二区三欧美 | 日日摸夜夜摸狠狠摸婷婷 | 久久精品人人做人人综合试看 | 性生交大片免费看女人按摩摩 | 人人妻人人澡人人爽人人精品浪潮 | 亚洲日韩av片在线观看 | 国产成人亚洲综合无码 | 国产精品办公室沙发 | 在线播放免费人成毛片乱码 | 一本无码人妻在中文字幕免费 | 亚洲中文无码av永久不收费 | 亚洲精品无码人妻无码 | 国产亚洲tv在线观看 | 乱人伦中文视频在线观看 | 日本熟妇乱子伦xxxx | 亚洲精品综合一区二区三区在线 | 亚洲大尺度无码无码专区 | 成人精品一区二区三区中文字幕 | 老头边吃奶边弄进去呻吟 | 日本一卡2卡3卡四卡精品网站 | 老熟女重囗味hdxx69 | 亚洲热妇无码av在线播放 | 国产精品手机免费 | 日韩精品无码一本二本三本色 | 久久久精品国产sm最大网站 | 国产精品久久久午夜夜伦鲁鲁 | 国产成人无码一二三区视频 | 欧美人与善在线com | 男人的天堂av网站 | 久久精品人人做人人综合试看 | 天天做天天爱天天爽综合网 | 帮老师解开蕾丝奶罩吸乳网站 | 国产精品无码一区二区三区不卡 | 亚洲中文字幕乱码av波多ji | 丰满护士巨好爽好大乳 | 色爱情人网站 | 欧美日本精品一区二区三区 | 少妇性俱乐部纵欲狂欢电影 | 欧美freesex黑人又粗又大 | 日韩av无码一区二区三区不卡 | 国产精品美女久久久久av爽李琼 | 51国偷自产一区二区三区 | 久久久精品成人免费观看 | 粉嫩少妇内射浓精videos | 日本精品久久久久中文字幕 | 色噜噜亚洲男人的天堂 | 亚洲精品一区二区三区大桥未久 | 给我免费的视频在线观看 | 3d动漫精品啪啪一区二区中 | 国产成人综合美国十次 | 亚洲国产精品久久人人爱 | 伊人久久婷婷五月综合97色 | 色一情一乱一伦一区二区三欧美 | 免费看少妇作爱视频 | 丝袜足控一区二区三区 | 国产一区二区三区影院 | 亚洲精品一区二区三区四区五区 | 久久精品国产一区二区三区肥胖 | 久久精品国产大片免费观看 | 精品水蜜桃久久久久久久 | 性色欲情网站iwww九文堂 | 日韩人妻系列无码专区 | 中文无码成人免费视频在线观看 | 人妻插b视频一区二区三区 | 亚洲国产精品一区二区美利坚 | 亚洲第一网站男人都懂 | 女高中生第一次破苞av | www国产精品内射老师 | 国产精品18久久久久久麻辣 | 精品国产一区二区三区四区在线看 | 76少妇精品导航 | 国产熟妇高潮叫床视频播放 | 成人aaa片一区国产精品 | 夫妻免费无码v看片 | 国产亚洲精品久久久久久久久动漫 | 久久亚洲中文字幕无码 | 免费看少妇作爱视频 | 日日摸夜夜摸狠狠摸婷婷 | 亚洲日韩中文字幕在线播放 | 日韩精品乱码av一区二区 | 精品国产一区av天美传媒 | 亚洲一区二区三区香蕉 | 亚洲国产日韩a在线播放 | 国产无套内射久久久国产 | 97se亚洲精品一区 | 免费观看黄网站 | 日韩精品一区二区av在线 | 亚洲色欲久久久综合网东京热 | 99精品国产综合久久久久五月天 | 国产黄在线观看免费观看不卡 | 中文毛片无遮挡高清免费 | 国产偷自视频区视频 | 永久免费观看美女裸体的网站 | 国产乱码精品一品二品 | 精品偷拍一区二区三区在线看 | 成人无码影片精品久久久 | 国产乱人伦av在线无码 | 成年美女黄网站色大免费全看 | 无码人妻久久一区二区三区不卡 | 成 人 网 站国产免费观看 | 欧美 丝袜 自拍 制服 另类 | 九九综合va免费看 | 国产精品自产拍在线观看 | 377p欧洲日本亚洲大胆 | 中文字幕无线码免费人妻 | 亚洲精品国偷拍自产在线麻豆 | 亚洲区欧美区综合区自拍区 | 欧美猛少妇色xxxxx | 成 人 免费观看网站 | 久久精品丝袜高跟鞋 | 无套内谢老熟女 | 中文毛片无遮挡高清免费 | 国内老熟妇对白xxxxhd | 国产精品高潮呻吟av久久 | 国产精品久久久久9999小说 | 激情人妻另类人妻伦 | 香蕉久久久久久av成人 | 国内精品久久毛片一区二区 | 国产猛烈高潮尖叫视频免费 | 波多野结衣av一区二区全免费观看 | 大肉大捧一进一出视频出来呀 | 最新国产乱人伦偷精品免费网站 | 东京热无码av男人的天堂 | 日韩成人一区二区三区在线观看 | 久9re热视频这里只有精品 | 久久天天躁夜夜躁狠狠 | 日日橹狠狠爱欧美视频 | 无码纯肉视频在线观看 | 久久精品99久久香蕉国产色戒 | 蜜桃臀无码内射一区二区三区 | 久久久久成人片免费观看蜜芽 | 国产农村妇女aaaaa视频 撕开奶罩揉吮奶头视频 | 999久久久国产精品消防器材 | 十八禁真人啪啪免费网站 | 日韩精品久久久肉伦网站 | 国产成人综合美国十次 | 色综合久久中文娱乐网 | 激情内射日本一区二区三区 | 少妇性俱乐部纵欲狂欢电影 | 一个人看的视频www在线 | 欧美成人家庭影院 | 亚洲国产精品无码一区二区三区 | 乱人伦中文视频在线观看 | 国产 精品 自在自线 | 熟妇人妻中文av无码 | 国产明星裸体无码xxxx视频 | 双乳奶水饱满少妇呻吟 | 色综合久久88色综合天天 | 精品成在人线av无码免费看 | 最近中文2019字幕第二页 | 好男人www社区 | 未满小14洗澡无码视频网站 | 亚洲精品中文字幕乱码 | 成人无码精品一区二区三区 | 无码一区二区三区在线观看 | 亚洲精品一区国产 | 日本欧美一区二区三区乱码 | 国产特级毛片aaaaaaa高清 | 熟女俱乐部五十路六十路av | 日本一区二区三区免费高清 | 青草青草久热国产精品 | 亚洲无人区午夜福利码高清完整版 | 日本熟妇浓毛 | 日本乱偷人妻中文字幕 | 亚洲娇小与黑人巨大交 | 亚洲国产精品一区二区美利坚 | 无码一区二区三区在线观看 | 真人与拘做受免费视频 | 2020最新国产自产精品 | 亚洲一区二区三区国产精华液 | 亚洲精品久久久久久一区二区 | 激情国产av做激情国产爱 | 精品无码av一区二区三区 | 亚洲国产精品成人久久蜜臀 | 亚洲人成网站色7799 | 无码播放一区二区三区 | 精品水蜜桃久久久久久久 | 久9re热视频这里只有精品 | 久久天天躁夜夜躁狠狠 | 中文字幕人妻丝袜二区 | 久久精品女人天堂av免费观看 | 国精产品一品二品国精品69xx | 亚洲精品中文字幕乱码 | 日本一区二区三区免费高清 | 国产手机在线αⅴ片无码观看 | 久久久亚洲欧洲日产国码αv | 内射白嫩少妇超碰 | 亚洲成av人综合在线观看 | aⅴ在线视频男人的天堂 | 国产va免费精品观看 | 精品厕所偷拍各类美女tp嘘嘘 | 色婷婷综合中文久久一本 | 我要看www免费看插插视频 | 久久午夜无码鲁丝片午夜精品 | 人妻无码久久精品人妻 | 久久亚洲国产成人精品性色 | 日韩欧美中文字幕在线三区 | 午夜精品一区二区三区在线观看 | 日韩在线不卡免费视频一区 | 国产亚av手机在线观看 | 久久久久99精品成人片 | 午夜成人1000部免费视频 | 日日碰狠狠躁久久躁蜜桃 | 中文字幕久久久久人妻 | 麻豆蜜桃av蜜臀av色欲av | 日本精品高清一区二区 | 97久久精品无码一区二区 | www国产精品内射老师 | 欧洲熟妇精品视频 | 乱中年女人伦av三区 | 鲁一鲁av2019在线 | 少妇被黑人到高潮喷出白浆 | 在线а√天堂中文官网 | 精品偷拍一区二区三区在线看 | 色五月五月丁香亚洲综合网 | 久久99精品久久久久婷婷 | 日产精品99久久久久久 | 亚洲 a v无 码免 费 成 人 a v | 99久久精品国产一区二区蜜芽 | 国产精品va在线观看无码 | 国产av无码专区亚洲awww | aⅴ在线视频男人的天堂 | 六月丁香婷婷色狠狠久久 | 少妇性俱乐部纵欲狂欢电影 | 亚洲小说春色综合另类 | 国产精品亚洲lv粉色 | 久久久国产精品无码免费专区 | 国产绳艺sm调教室论坛 | 国产成人综合在线女婷五月99播放 | 国产真实伦对白全集 | 97人妻精品一区二区三区 | 蜜桃视频插满18在线观看 | 国产情侣作爱视频免费观看 | 亚洲人成影院在线无码按摩店 | 欧美日韩在线亚洲综合国产人 | 成熟女人特级毛片www免费 | 国产亚av手机在线观看 | 性色av无码免费一区二区三区 | 最新版天堂资源中文官网 | 人人妻人人澡人人爽欧美精品 | 曰韩少妇内射免费播放 | 国产精品久久久久久亚洲影视内衣 | 亚洲一区二区三区无码久久 | 欧洲vodafone精品性 | 国产人妻久久精品二区三区老狼 | 黑人巨大精品欧美一区二区 | 精品无码一区二区三区的天堂 | 欧美大屁股xxxxhd黑色 | 18精品久久久无码午夜福利 | 综合人妻久久一区二区精品 | 国产真实乱对白精彩久久 | 国产成人综合色在线观看网站 | 亚洲中文字幕在线观看 | 国产明星裸体无码xxxx视频 | 99久久人妻精品免费一区 | 国产办公室秘书无码精品99 | 中文字幕无线码免费人妻 | 狠狠色丁香久久婷婷综合五月 | 牛和人交xxxx欧美 | 四十如虎的丰满熟妇啪啪 | 欧美精品一区二区精品久久 | 一本精品99久久精品77 | 天天av天天av天天透 | 亚洲成熟女人毛毛耸耸多 | 中文字幕无码热在线视频 | 亚洲区小说区激情区图片区 | 亚洲成av人在线观看网址 | 精品人妻中文字幕有码在线 | 人人妻人人澡人人爽欧美精品 | 亚洲国产av美女网站 | 亚洲日韩av一区二区三区中文 | 中文无码伦av中文字幕 | 国产猛烈高潮尖叫视频免费 | 黑人巨大精品欧美黑寡妇 | 无码国产色欲xxxxx视频 | 帮老师解开蕾丝奶罩吸乳网站 | 精品久久久中文字幕人妻 | 国产无遮挡又黄又爽免费视频 | yw尤物av无码国产在线观看 | 2019午夜福利不卡片在线 | 人人妻人人澡人人爽人人精品浪潮 | 成人一在线视频日韩国产 | 人人妻人人澡人人爽人人精品 | 亚洲毛片av日韩av无码 | 日本熟妇人妻xxxxx人hd | 波多野结衣av一区二区全免费观看 | 国产欧美亚洲精品a | 丰满少妇弄高潮了www | 国産精品久久久久久久 | 久久久久免费精品国产 | 老熟妇乱子伦牲交视频 | 免费无码av一区二区 | 狂野欧美性猛xxxx乱大交 | 日本乱人伦片中文三区 | 国产人妻精品一区二区三区 | 任你躁国产自任一区二区三区 | 久久99热只有频精品8 | 国产精品人人爽人人做我的可爱 | 少妇人妻av毛片在线看 | 伊人久久大香线蕉亚洲 | 综合人妻久久一区二区精品 | 全黄性性激高免费视频 | 免费无码肉片在线观看 | 99riav国产精品视频 | 无码吃奶揉捏奶头高潮视频 | 亚洲色偷偷偷综合网 | 一二三四社区在线中文视频 | 正在播放老肥熟妇露脸 | 国产区女主播在线观看 | 日欧一片内射va在线影院 | 无码人妻精品一区二区三区不卡 | v一区无码内射国产 | 久久久成人毛片无码 | 300部国产真实乱 | 欧美乱妇无乱码大黄a片 | 99久久婷婷国产综合精品青草免费 | 精品国产av色一区二区深夜久久 | 亚洲高清偷拍一区二区三区 | 色欲人妻aaaaaaa无码 | 中文精品久久久久人妻不卡 | 国产精品久久福利网站 | 国产精品久久久午夜夜伦鲁鲁 | 国产精品高潮呻吟av久久 | 久久亚洲a片com人成 | 九月婷婷人人澡人人添人人爽 | 嫩b人妻精品一区二区三区 | 国产精品欧美成人 | 夜夜影院未满十八勿进 | 精品一区二区三区无码免费视频 | 日韩欧美群交p片內射中文 | 国产9 9在线 | 中文 | av香港经典三级级 在线 | 日本熟妇大屁股人妻 | 撕开奶罩揉吮奶头视频 | 日本xxxx色视频在线观看免费 | 日本欧美一区二区三区乱码 | 欧美 亚洲 国产 另类 | 亚洲啪av永久无码精品放毛片 | 国产香蕉尹人综合在线观看 | 亚洲а∨天堂久久精品2021 | 亚洲无人区午夜福利码高清完整版 | 曰韩无码二三区中文字幕 | 日韩欧美群交p片內射中文 | 精品人妻人人做人人爽 | 国产亚洲精品久久久久久久 | 国产精品丝袜黑色高跟鞋 | 51国偷自产一区二区三区 | 欧美成人免费全部网站 | 国产精品久久久久久久9999 | 精品国产成人一区二区三区 | 精品久久久中文字幕人妻 | 欧美老熟妇乱xxxxx | 久久午夜无码鲁丝片 | 日韩少妇内射免费播放 | 最新国产麻豆aⅴ精品无码 | 全球成人中文在线 | 人妻有码中文字幕在线 | 中文字幕无码免费久久9一区9 | 国产精品人妻一区二区三区四 | 欧美性猛交内射兽交老熟妇 | 欧美国产日韩亚洲中文 | 亚洲成a人片在线观看日本 | 亚洲精品成a人在线观看 | 少妇厨房愉情理9仑片视频 | 欧美丰满老熟妇xxxxx性 | 牲欲强的熟妇农村老妇女 | 久久人妻内射无码一区三区 | 欧美高清在线精品一区 | 国产性生大片免费观看性 | 亚洲国产精品成人久久蜜臀 | 一本大道久久东京热无码av | 天堂无码人妻精品一区二区三区 | 欧美日韩久久久精品a片 | 日韩精品无码免费一区二区三区 | 国产精品办公室沙发 | 草草网站影院白丝内射 | 国产精品高潮呻吟av久久4虎 | 国产两女互慰高潮视频在线观看 | 强伦人妻一区二区三区视频18 | 欧美黑人乱大交 | 大肉大捧一进一出视频出来呀 | 欧美xxxx黑人又粗又长 | 国产av无码专区亚洲awww | 久久 国产 尿 小便 嘘嘘 | 午夜福利试看120秒体验区 | 国产精品久久国产精品99 | 荫蒂被男人添的好舒服爽免费视频 | 亚洲成a人片在线观看无码3d | 日韩人妻无码中文字幕视频 | 精品熟女少妇av免费观看 | 精品成在人线av无码免费看 | 中文字幕日产无线码一区 | 无码播放一区二区三区 | 宝宝好涨水快流出来免费视频 | 内射巨臀欧美在线视频 | 玩弄人妻少妇500系列视频 | 国产精品a成v人在线播放 | 色狠狠av一区二区三区 | 岛国片人妻三上悠亚 | 欧洲vodafone精品性 | 人妻熟女一区 | 国产精品无码永久免费888 | av小次郎收藏 | 九九综合va免费看 | 国产精品亚洲综合色区韩国 | 成人影院yy111111在线观看 | 影音先锋中文字幕无码 | 天下第一社区视频www日本 | 久久国产精品_国产精品 | 国产成人久久精品流白浆 | 亚洲欧美日韩国产精品一区二区 | 5858s亚洲色大成网站www | 高潮毛片无遮挡高清免费 | 久久视频在线观看精品 | 欧美第一黄网免费网站 | 欧美freesex黑人又粗又大 | 亚洲国产日韩a在线播放 | 国产精品久免费的黄网站 | 日日摸日日碰夜夜爽av | 国产精品a成v人在线播放 | 亚洲综合另类小说色区 | 熟女体下毛毛黑森林 | 欧美放荡的少妇 | 国产激情综合五月久久 | 亚洲一区二区三区香蕉 | 亚洲国产精品无码久久久久高潮 | 暴力强奷在线播放无码 | 亚洲欧美国产精品专区久久 | 久久伊人色av天堂九九小黄鸭 | 98国产精品综合一区二区三区 | 又大又硬又爽免费视频 | 成人av无码一区二区三区 | 一本一道久久综合久久 | 中文字幕无码热在线视频 | 国产人成高清在线视频99最全资源 | 丝袜人妻一区二区三区 | 久久精品国产日本波多野结衣 | 曰韩少妇内射免费播放 | 久久综合久久自在自线精品自 | 成人性做爰aaa片免费看 | 国产特级毛片aaaaaa高潮流水 | 露脸叫床粗话东北少妇 | 欧美激情一区二区三区成人 | 国产午夜视频在线观看 | 色综合久久网 | 精品aⅴ一区二区三区 | 丝袜人妻一区二区三区 | 思思久久99热只有频精品66 | 亚洲成色www久久网站 | 国产又爽又猛又粗的视频a片 | 99re在线播放 | 波多野结衣av一区二区全免费观看 | 久久人人爽人人爽人人片av高清 | 亚洲色无码一区二区三区 | 日产国产精品亚洲系列 | 天天做天天爱天天爽综合网 | 精品国产福利一区二区 | 人人妻在人人 | 好爽又高潮了毛片免费下载 | 正在播放老肥熟妇露脸 | 国产小呦泬泬99精品 | 精品国偷自产在线视频 | 蜜桃视频韩日免费播放 | 2020久久超碰国产精品最新 | 国产女主播喷水视频在线观看 | 久久国产36精品色熟妇 | 国产精品无码mv在线观看 | 成人精品视频一区二区 | 欧美人与动性行为视频 | 伊人久久大香线焦av综合影院 | 丰满少妇熟乱xxxxx视频 | 日产精品高潮呻吟av久久 | 亚洲日韩精品欧美一区二区 | 欧美精品一区二区精品久久 | 十八禁真人啪啪免费网站 | 亚洲精品综合五月久久小说 | 无码乱肉视频免费大全合集 | 国产成人精品三级麻豆 | 国产成人无码午夜视频在线观看 | 亲嘴扒胸摸屁股激烈网站 | 两性色午夜视频免费播放 | 久久综合给合久久狠狠狠97色 | 久久久无码中文字幕久... | 久久人人97超碰a片精品 | 内射白嫩少妇超碰 | 亚洲七七久久桃花影院 | 性开放的女人aaa片 | 人人妻在人人 | a国产一区二区免费入口 | av无码电影一区二区三区 | 国产精品资源一区二区 | 蜜桃无码一区二区三区 | 欧美国产亚洲日韩在线二区 | а天堂中文在线官网 | 久久五月精品中文字幕 | 国产精品办公室沙发 | 麻豆成人精品国产免费 | 久久久久成人片免费观看蜜芽 | 成人三级无码视频在线观看 | 狠狠色欧美亚洲狠狠色www | 人妻尝试又大又粗久久 | 久久无码中文字幕免费影院蜜桃 | 亚洲欧洲中文日韩av乱码 | 无码人妻丰满熟妇区毛片18 | 亚洲国产成人av在线观看 | 特大黑人娇小亚洲女 | 日产精品99久久久久久 | 激情五月综合色婷婷一区二区 | 一二三四在线观看免费视频 | 欧洲精品码一区二区三区免费看 | 国产人妻精品一区二区三区 | 国产人妻精品午夜福利免费 | 欧美黑人乱大交 | 国产熟妇高潮叫床视频播放 | 日本一卡2卡3卡四卡精品网站 | 中文字幕+乱码+中文字幕一区 | 成人无码视频免费播放 | 国产激情综合五月久久 | 蜜臀aⅴ国产精品久久久国产老师 | 无码av中文字幕免费放 | 东京无码熟妇人妻av在线网址 | 久久久久国色av免费观看性色 | 国产精品久久久 | 久久99精品国产麻豆 | 亚洲精品一区二区三区在线观看 | 人人澡人人妻人人爽人人蜜桃 | 亚洲第一无码av无码专区 | 丁香啪啪综合成人亚洲 | 久久99精品国产麻豆蜜芽 | 久久婷婷五月综合色国产香蕉 | 国产精品自产拍在线观看 | 又黄又爽又色的视频 | 熟女少妇在线视频播放 | 高清国产亚洲精品自在久久 | 国产国语老龄妇女a片 | 亚洲国产欧美日韩精品一区二区三区 | 人妻少妇精品久久 | 无码成人精品区在线观看 | 国产成人一区二区三区别 | 精品乱码久久久久久久 | 中文字幕无码人妻少妇免费 | 欧美日韩色另类综合 | 4hu四虎永久在线观看 | av在线亚洲欧洲日产一区二区 | 欧美性生交活xxxxxdddd | 日韩人妻无码一区二区三区久久99 | 国产美女精品一区二区三区 | 四虎永久在线精品免费网址 | 午夜嘿嘿嘿影院 | 亚洲人成影院在线观看 | 日本乱人伦片中文三区 | 巨爆乳无码视频在线观看 | 久久亚洲a片com人成 | 天堂在线观看www | 欧美刺激性大交 | 日韩在线不卡免费视频一区 | 综合激情五月综合激情五月激情1 | 强开小婷嫩苞又嫩又紧视频 | 中文字幕乱码人妻无码久久 | 精品人妻中文字幕有码在线 | 国产精品多人p群无码 | 久久久精品国产sm最大网站 | 国产av剧情md精品麻豆 | 精品夜夜澡人妻无码av蜜桃 | 久久亚洲国产成人精品性色 | 麻豆国产丝袜白领秘书在线观看 | 欧美人与物videos另类 | 成人免费无码大片a毛片 | 精品aⅴ一区二区三区 | 天天综合网天天综合色 | 人妻少妇精品无码专区二区 | 国内老熟妇对白xxxxhd | 99精品无人区乱码1区2区3区 | 九九热爱视频精品 | 六月丁香婷婷色狠狠久久 | 国产精品无码mv在线观看 | 欧美日韩人成综合在线播放 | 熟女少妇在线视频播放 | 天堂а√在线地址中文在线 | 性欧美牲交在线视频 | 黑人粗大猛烈进出高潮视频 | 亚洲成熟女人毛毛耸耸多 | 国产精品资源一区二区 | 国产凸凹视频一区二区 | 超碰97人人射妻 | 成人片黄网站色大片免费观看 | 色一情一乱一伦一视频免费看 | 欧美丰满熟妇xxxx性ppx人交 | 国产成人精品无码播放 | av无码不卡在线观看免费 | 亚洲啪av永久无码精品放毛片 | 中文字幕人妻丝袜二区 | 性色欲网站人妻丰满中文久久不卡 | 2020最新国产自产精品 | 欧美亚洲日韩国产人成在线播放 | 无码人妻黑人中文字幕 | 日韩人妻无码中文字幕视频 | 亚洲精品一区三区三区在线观看 | 18无码粉嫩小泬无套在线观看 | 精品一区二区三区波多野结衣 | 国产麻豆精品精东影业av网站 | 久久精品99久久香蕉国产色戒 | 我要看www免费看插插视频 | 亚洲中文字幕无码中文字在线 | 大色综合色综合网站 | 丰满肥臀大屁股熟妇激情视频 | 精品国产一区二区三区四区在线看 | 精品人妻人人做人人爽夜夜爽 | 女人被男人躁得好爽免费视频 | 大乳丰满人妻中文字幕日本 | 亚洲精品欧美二区三区中文字幕 | 欧美性黑人极品hd | 亚洲一区二区三区国产精华液 | 国产午夜福利100集发布 | 久久久中文字幕日本无吗 | 国产成人精品无码播放 | 成人女人看片免费视频放人 | 国产激情一区二区三区 | 国产成人无码专区 | 国产av一区二区精品久久凹凸 | 久久久久久国产精品无码下载 | 精品国产aⅴ无码一区二区 | 内射老妇bbwx0c0ck | 在线亚洲高清揄拍自拍一品区 | 亚洲日韩乱码中文无码蜜桃臀网站 | 综合网日日天干夜夜久久 | 欧美一区二区三区视频在线观看 | 无码播放一区二区三区 | 国产一精品一av一免费 | 亚洲综合伊人久久大杳蕉 | 无码人妻少妇伦在线电影 | 人人妻人人澡人人爽精品欧美 | 天堂久久天堂av色综合 | 色窝窝无码一区二区三区色欲 | 亚洲日韩乱码中文无码蜜桃臀网站 | 999久久久国产精品消防器材 | 人妻有码中文字幕在线 | 夜夜影院未满十八勿进 | 日韩欧美成人免费观看 | 大肉大捧一进一出视频出来呀 | 免费观看激色视频网站 | 国产精品多人p群无码 | 曰本女人与公拘交酡免费视频 | 国语精品一区二区三区 | 成熟女人特级毛片www免费 | 女人和拘做爰正片视频 | 一区二区传媒有限公司 | 波多野结衣高清一区二区三区 | 亚洲成色在线综合网站 | 国产精品怡红院永久免费 | 丰满少妇女裸体bbw | 国产精品va在线观看无码 | 亚洲s色大片在线观看 | 亚洲区小说区激情区图片区 | 99精品视频在线观看免费 | 奇米影视7777久久精品人人爽 | 国产日产欧产精品精品app | 特黄特色大片免费播放器图片 | 中文字幕无码av波多野吉衣 | 欧美 日韩 亚洲 在线 | 亚洲精品综合五月久久小说 | 国产 浪潮av性色四虎 | 国产香蕉尹人视频在线 | 十八禁视频网站在线观看 | 大胆欧美熟妇xx | 国产免费久久久久久无码 | 日日摸日日碰夜夜爽av | 国产精品亚洲lv粉色 | 天堂亚洲免费视频 | 少妇无码av无码专区在线观看 | 波多野结衣av在线观看 | 国产精品美女久久久网av | 无遮挡国产高潮视频免费观看 | 久青草影院在线观看国产 | 欧美黑人性暴力猛交喷水 | 少妇性荡欲午夜性开放视频剧场 | 女人被男人躁得好爽免费视频 | 国产97在线 | 亚洲 | 成人动漫在线观看 | 狠狠噜狠狠狠狠丁香五月 | 日本va欧美va欧美va精品 | 欧美丰满老熟妇xxxxx性 | 国精产品一区二区三区 | 国产欧美精品一区二区三区 | 国产午夜手机精彩视频 | 国产三级精品三级男人的天堂 | 亚洲欧美精品伊人久久 | 国产婷婷色一区二区三区在线 | 欧美一区二区三区视频在线观看 | 精品少妇爆乳无码av无码专区 | 国产精品高潮呻吟av久久4虎 | a在线观看免费网站大全 | 午夜肉伦伦影院 | 国产舌乚八伦偷品w中 | 少妇无码av无码专区在线观看 | 精品日本一区二区三区在线观看 | 国产精品久久福利网站 | 国产亚洲精品久久久久久 | 国精产品一区二区三区 | 人人妻人人澡人人爽精品欧美 | 亚洲欧美日韩国产精品一区二区 | 熟女少妇在线视频播放 | 亚洲a无码综合a国产av中文 | 久久国产精品_国产精品 | 99er热精品视频 | 久久久精品456亚洲影院 | 国产偷国产偷精品高清尤物 | 熟女少妇人妻中文字幕 | 亚洲国产精品久久久天堂 | 曰韩无码二三区中文字幕 | 少妇人妻av毛片在线看 | 好爽又高潮了毛片免费下载 | 久9re热视频这里只有精品 | 国产综合久久久久鬼色 | 亚洲第一网站男人都懂 | 久久成人a毛片免费观看网站 | 国产明星裸体无码xxxx视频 | 亚洲精品国偷拍自产在线观看蜜桃 | 日产精品99久久久久久 | 国内丰满熟女出轨videos | 亚洲精品一区二区三区在线观看 | 欧美35页视频在线观看 | 亚洲理论电影在线观看 | 欧美亚洲国产一区二区三区 | 婷婷综合久久中文字幕蜜桃三电影 | 无码av中文字幕免费放 | 国产午夜亚洲精品不卡下载 | 激情内射日本一区二区三区 | 久久久精品456亚洲影院 | 在线观看国产一区二区三区 | 国产亚洲欧美在线专区 | 国产精品无码永久免费888 | 亚洲中文字幕成人无码 | 亚洲欧洲日本综合aⅴ在线 | 又大又黄又粗又爽的免费视频 | 天堂一区人妻无码 | 伊人色综合久久天天小片 | 亚洲自偷自拍另类第1页 | 久久久无码中文字幕久... | 国产香蕉尹人综合在线观看 | 中文字幕乱妇无码av在线 | 国产无遮挡又黄又爽免费视频 | 纯爱无遮挡h肉动漫在线播放 | 国产亚洲精品久久久久久 | 黑人粗大猛烈进出高潮视频 | 一个人看的www免费视频在线观看 | 波多野结衣 黑人 | 亚洲日本在线电影 | 99久久精品午夜一区二区 | 久久综合九色综合欧美狠狠 | 岛国片人妻三上悠亚 | 国产香蕉尹人综合在线观看 | 亚洲精品欧美二区三区中文字幕 | 精品一区二区三区无码免费视频 | 日韩精品无码一本二本三本色 | 国产激情一区二区三区 | 精品国产国产综合精品 | 无码国产色欲xxxxx视频 | 又粗又大又硬毛片免费看 | 中文字幕 人妻熟女 | 内射后入在线观看一区 | 一本无码人妻在中文字幕免费 | av香港经典三级级 在线 | 3d动漫精品啪啪一区二区中 | 人妻与老人中文字幕 | 无码人妻av免费一区二区三区 | 成人无码精品一区二区三区 | 欧美精品国产综合久久 | 久久久久人妻一区精品色欧美 | 日韩精品无码一本二本三本色 | 水蜜桃亚洲一二三四在线 | 免费看少妇作爱视频 | 国产人妻人伦精品1国产丝袜 | 无码人妻丰满熟妇区毛片18 | 中文字幕乱妇无码av在线 | 欧美熟妇另类久久久久久多毛 | 无码人妻久久一区二区三区不卡 | 一本大道伊人av久久综合 | 国产真实乱对白精彩久久 | 东京一本一道一二三区 | 色婷婷综合中文久久一本 | 久久久久免费看成人影片 | 欧美喷潮久久久xxxxx | 人人超人人超碰超国产 | 亚洲毛片av日韩av无码 | а√天堂www在线天堂小说 | 国产成人无码av一区二区 | 青草青草久热国产精品 | 久久久中文久久久无码 | 一本大道伊人av久久综合 | 国产亚洲精品精品国产亚洲综合 | 国产亚洲精品久久久ai换 | 国产亚洲精品精品国产亚洲综合 | 久久国产精品萌白酱免费 | 狠狠色噜噜狠狠狠狠7777米奇 | 亚洲欧美综合区丁香五月小说 | 97久久精品无码一区二区 | 国产精品永久免费视频 | 欧美猛少妇色xxxxx | 性做久久久久久久免费看 | 精品久久久无码人妻字幂 | 狂野欧美性猛xxxx乱大交 | 亚洲一区二区三区 | 四虎永久在线精品免费网址 | 日韩精品成人一区二区三区 | 成人精品视频一区二区 | 2020久久香蕉国产线看观看 | av无码电影一区二区三区 | 国产av无码专区亚洲a∨毛片 | 亚洲gv猛男gv无码男同 | 国产精品沙发午睡系列 | 国产高清av在线播放 | 国产午夜无码视频在线观看 | 人妻有码中文字幕在线 | 无套内谢的新婚少妇国语播放 | 日韩无码专区 | 国产黄在线观看免费观看不卡 | 人人妻人人藻人人爽欧美一区 | 大地资源网第二页免费观看 | 国产午夜亚洲精品不卡 | 噜噜噜亚洲色成人网站 | 亚洲综合无码久久精品综合 | 男女下面进入的视频免费午夜 | 日本精品人妻无码77777 天堂一区人妻无码 | 中文字幕无码日韩专区 | 国内揄拍国内精品少妇国语 | 人妻少妇被猛烈进入中文字幕 | 成年女人永久免费看片 | 国产在线aaa片一区二区99 | 精品国产成人一区二区三区 | 色一情一乱一伦 | 久久97精品久久久久久久不卡 | 国产麻豆精品精东影业av网站 | 久久aⅴ免费观看 | 欧美性猛交内射兽交老熟妇 | 久久久久久亚洲精品a片成人 | 国产精品久久久久影院嫩草 | 日韩成人一区二区三区在线观看 | 天天拍夜夜添久久精品 | 国产偷国产偷精品高清尤物 | 国产精华av午夜在线观看 | 国产做国产爱免费视频 | 99精品视频在线观看免费 | 青青青爽视频在线观看 | 国产精品欧美成人 | 88国产精品欧美一区二区三区 | 国产欧美精品一区二区三区 | 国产激情精品一区二区三区 | 76少妇精品导航 | 亚洲中文字幕久久无码 | 国产精品美女久久久网av | 亚洲成熟女人毛毛耸耸多 | 精品日本一区二区三区在线观看 | 丰满人妻精品国产99aⅴ | 帮老师解开蕾丝奶罩吸乳网站 | 少妇高潮喷潮久久久影院 | 国产av无码专区亚洲awww | 国产成人精品三级麻豆 | 亚洲午夜久久久影院 | 亚洲成av人在线观看网址 | 大地资源网第二页免费观看 | 免费无码一区二区三区蜜桃大 | 国产高清不卡无码视频 | 偷窥日本少妇撒尿chinese | 色偷偷av老熟女 久久精品人妻少妇一区二区三区 | 成人精品天堂一区二区三区 | 日韩人妻无码一区二区三区久久99 | 日本一本二本三区免费 | 日韩少妇白浆无码系列 | 强辱丰满人妻hd中文字幕 | 成年美女黄网站色大免费全看 | 草草网站影院白丝内射 | 免费播放一区二区三区 | 97精品人妻一区二区三区香蕉 | 日本爽爽爽爽爽爽在线观看免 | 久久精品人人做人人综合 | 亚洲a无码综合a国产av中文 | 乱中年女人伦av三区 | 国产美女极度色诱视频www | 久久精品无码一区二区三区 | 国产精品嫩草久久久久 | 国产成人综合色在线观看网站 | 欧美性生交活xxxxxdddd | 国产黄在线观看免费观看不卡 | 亚洲精品鲁一鲁一区二区三区 | 国内老熟妇对白xxxxhd | 2020最新国产自产精品 | 少女韩国电视剧在线观看完整 | 国产麻豆精品精东影业av网站 | 午夜免费福利小电影 | 国精产品一区二区三区 | 国产女主播喷水视频在线观看 | 在线观看欧美一区二区三区 | 无码精品国产va在线观看dvd | 日本精品少妇一区二区三区 | 亚洲成av人综合在线观看 | 双乳奶水饱满少妇呻吟 | 少妇太爽了在线观看 | 色综合久久网 | 亚洲精品鲁一鲁一区二区三区 | 国产成人综合在线女婷五月99播放 | 国产成人无码a区在线观看视频app | 国产超碰人人爽人人做人人添 | 鲁一鲁av2019在线 | 狠狠亚洲超碰狼人久久 | 欧美丰满熟妇xxxx性ppx人交 | 国产成人精品视频ⅴa片软件竹菊 | 人人澡人摸人人添 | 天堂а√在线地址中文在线 | 成人免费视频一区二区 | 奇米影视888欧美在线观看 | 玩弄人妻少妇500系列视频 | 久久天天躁夜夜躁狠狠 | 久久五月精品中文字幕 | 国产偷自视频区视频 | 亚洲综合久久一区二区 | 天天做天天爱天天爽综合网 | 2020久久超碰国产精品最新 | 久久综合激激的五月天 | 婷婷色婷婷开心五月四房播播 | 麻豆精产国品 | 成熟女人特级毛片www免费 | 久久人人爽人人爽人人片av高清 | 久久久久免费看成人影片 | 欧美国产日产一区二区 | 内射老妇bbwx0c0ck | 又黄又爽又色的视频 | 澳门永久av免费网站 | 欧美成人高清在线播放 | 国产av人人夜夜澡人人爽麻豆 | 欧美 丝袜 自拍 制服 另类 | 一本无码人妻在中文字幕免费 | 在线天堂新版最新版在线8 | 狠狠cao日日穞夜夜穞av | 精品久久久久久人妻无码中文字幕 | 粗大的内捧猛烈进出视频 | av无码电影一区二区三区 | 老熟妇乱子伦牲交视频 | 岛国片人妻三上悠亚 | 乌克兰少妇性做爰 | 国产熟妇高潮叫床视频播放 | 沈阳熟女露脸对白视频 | 久久精品人人做人人综合 | 国产免费观看黄av片 | 中文精品久久久久人妻不卡 | 丰满人妻翻云覆雨呻吟视频 | 精品国产aⅴ无码一区二区 | 久久午夜无码鲁丝片秋霞 | 亚洲乱码国产乱码精品精 | 亚洲一区二区三区香蕉 | 色偷偷人人澡人人爽人人模 | 国产熟妇高潮叫床视频播放 | 精品aⅴ一区二区三区 | 18精品久久久无码午夜福利 | 成人三级无码视频在线观看 | 宝宝好涨水快流出来免费视频 | 国内丰满熟女出轨videos | 超碰97人人做人人爱少妇 | 欧美 亚洲 国产 另类 | 免费无码av一区二区 | 欧美freesex黑人又粗又大 | 成人欧美一区二区三区黑人免费 | 女人被男人爽到呻吟的视频 | 欧美自拍另类欧美综合图片区 | 国产 精品 自在自线 | 久久97精品久久久久久久不卡 | 美女张开腿让人桶 | 黑人大群体交免费视频 | 国内精品一区二区三区不卡 | 麻豆果冻传媒2021精品传媒一区下载 | 樱花草在线播放免费中文 | 特级做a爰片毛片免费69 | 色婷婷av一区二区三区之红樱桃 | 蜜桃av抽搐高潮一区二区 | 亚洲欧美精品伊人久久 | 色诱久久久久综合网ywww | 一本久道高清无码视频 | 亚洲色偷偷男人的天堂 | 乱人伦人妻中文字幕无码 | 亚洲欧洲日本无在线码 | 欧美刺激性大交 | 中文字幕无线码免费人妻 | 欧美国产日产一区二区 | 久久国产精品偷任你爽任你 | ass日本丰满熟妇pics | 国产乱码精品一品二品 | 国产97在线 | 亚洲 | 曰韩无码二三区中文字幕 | 最新国产麻豆aⅴ精品无码 | 国产极品视觉盛宴 | 国产人妻精品一区二区三区不卡 | 在线播放亚洲第一字幕 | 亚洲欧美国产精品专区久久 | 最近中文2019字幕第二页 | 色婷婷欧美在线播放内射 | 精品国偷自产在线 | 国产疯狂伦交大片 | 人妻体内射精一区二区三四 | 精品久久久久久亚洲精品 | 欧美日韩一区二区综合 | 亚洲精品久久久久久一区二区 | 国产九九九九九九九a片 | 狠狠色丁香久久婷婷综合五月 | 成人无码视频免费播放 | 爆乳一区二区三区无码 | 国产亚洲人成在线播放 | 婷婷综合久久中文字幕蜜桃三电影 | 精品欧美一区二区三区久久久 | 亚洲国产欧美国产综合一区 | 欧美人与善在线com | 亚洲自偷自拍另类第1页 | 熟女俱乐部五十路六十路av | 国产农村妇女高潮大叫 | 欧美日韩一区二区综合 | 精品一区二区三区无码免费视频 | 日本熟妇乱子伦xxxx | 亚洲欧美日韩综合久久久 | 未满小14洗澡无码视频网站 | 国产人妻精品一区二区三区不卡 | 午夜福利试看120秒体验区 | 国产片av国语在线观看 | 精品久久久无码中文字幕 | 欧美一区二区三区 | 国产人妻久久精品二区三区老狼 | 99久久久国产精品无码免费 | 乌克兰少妇xxxx做受 | 国产综合在线观看 | 欧美人与禽猛交狂配 | 18禁黄网站男男禁片免费观看 | 亚洲国产一区二区三区在线观看 | 日本va欧美va欧美va精品 | 无码人妻黑人中文字幕 | 窝窝午夜理论片影院 | 俺去俺来也在线www色官网 | 俺去俺来也www色官网 | 精品成在人线av无码免费看 | 香港三级日本三级妇三级 | 亚洲日韩精品欧美一区二区 | 国产欧美亚洲精品a | 亚洲日韩乱码中文无码蜜桃臀网站 | 老头边吃奶边弄进去呻吟 | 国产欧美亚洲精品a | 欧美老妇交乱视频在线观看 | 亚洲欧美中文字幕5发布 | 亚洲の无码国产の无码步美 | 国产无遮挡又黄又爽又色 | 红桃av一区二区三区在线无码av | 国内少妇偷人精品视频免费 | 天天综合网天天综合色 | 无码国产色欲xxxxx视频 | a片在线免费观看 | 亚欧洲精品在线视频免费观看 | 精品无码av一区二区三区 | 99久久久国产精品无码免费 | 国产乱人伦偷精品视频 | 国产xxx69麻豆国语对白 | 乱码午夜-极国产极内射 | 日韩av无码一区二区三区不卡 | 色综合视频一区二区三区 | 99久久精品午夜一区二区 | 少女韩国电视剧在线观看完整 | 国产精品资源一区二区 | 日日天日日夜日日摸 | 内射爽无广熟女亚洲 | 国产极品美女高潮无套在线观看 | 亚洲 欧美 激情 小说 另类 | 久久人人爽人人爽人人片av高清 | 男女性色大片免费网站 | 国产成人精品优优av | 一本大道久久东京热无码av | 亚洲码国产精品高潮在线 | 亚洲 日韩 欧美 成人 在线观看 | 曰韩少妇内射免费播放 | 国产午夜精品一区二区三区嫩草 | 真人与拘做受免费视频 | 女高中生第一次破苞av | 色婷婷综合中文久久一本 | 大乳丰满人妻中文字幕日本 | 色婷婷综合中文久久一本 | 国产人妖乱国产精品人妖 | 国产熟妇另类久久久久 | а√资源新版在线天堂 | 日本免费一区二区三区最新 | 欧美日本免费一区二区三区 | 老太婆性杂交欧美肥老太 | 奇米影视7777久久精品人人爽 | 色一情一乱一伦一区二区三欧美 | 国产精品美女久久久 | 狂野欧美性猛xxxx乱大交 | 精品夜夜澡人妻无码av蜜桃 | 鲁一鲁av2019在线 | 综合人妻久久一区二区精品 | 无码av最新清无码专区吞精 | 日日碰狠狠丁香久燥 | 国产精品久久久久9999小说 | 亚洲 另类 在线 欧美 制服 | 成 人影片 免费观看 | 精品欧美一区二区三区久久久 | 久久精品国产大片免费观看 | 精品无码一区二区三区的天堂 | 国产精品内射视频免费 | 国产特级毛片aaaaaaa高清 | 色婷婷香蕉在线一区二区 | 亚洲国产精品久久久天堂 | 黑人巨大精品欧美一区二区 | 日本精品高清一区二区 | 日本精品人妻无码免费大全 | 久久久无码中文字幕久... | 亚洲爆乳大丰满无码专区 | 亚洲爆乳大丰满无码专区 | 丰满肥臀大屁股熟妇激情视频 | 色狠狠av一区二区三区 | 真人与拘做受免费视频一 | 国产 浪潮av性色四虎 | 骚片av蜜桃精品一区 | 精品无码av一区二区三区 | 精品无码一区二区三区爱欲 | 天堂无码人妻精品一区二区三区 | 在线观看欧美一区二区三区 | 疯狂三人交性欧美 | 亚洲国产av精品一区二区蜜芽 | 亚洲国产日韩a在线播放 | 亚洲综合在线一区二区三区 | 粗大的内捧猛烈进出视频 | 国产精品无码一区二区三区不卡 | 人妻人人添人妻人人爱 | 国产xxx69麻豆国语对白 | 亚洲精品国偷拍自产在线观看蜜桃 | 女高中生第一次破苞av | 国产av无码专区亚洲a∨毛片 | 国产精品久久久久9999小说 | 国产精品久久久一区二区三区 | 久久人人97超碰a片精品 | 久久无码中文字幕免费影院蜜桃 | 欧美人与牲动交xxxx | 极品尤物被啪到呻吟喷水 | 亚洲成av人片在线观看无码不卡 | 无码精品人妻一区二区三区av | 99精品国产综合久久久久五月天 | 色综合久久久无码网中文 | 精品久久久久久亚洲精品 | 网友自拍区视频精品 | 日日鲁鲁鲁夜夜爽爽狠狠 | 亚洲自偷自拍另类第1页 | 国产尤物精品视频 | 成人试看120秒体验区 | 日日橹狠狠爱欧美视频 | 学生妹亚洲一区二区 | 亚洲综合久久一区二区 | 亚洲精品国产品国语在线观看 | 国产三级久久久精品麻豆三级 | 国产精品无套呻吟在线 | 国产在线精品一区二区高清不卡 | 四虎国产精品一区二区 | 黄网在线观看免费网站 | 亚洲成在人网站无码天堂 | 激情五月综合色婷婷一区二区 | 国产无av码在线观看 | 131美女爱做视频 | 东京热无码av男人的天堂 | 久久久久成人精品免费播放动漫 | 国产午夜无码视频在线观看 | 亚洲一区二区三区播放 | 国产成人久久精品流白浆 | 少妇性俱乐部纵欲狂欢电影 | 丰满妇女强制高潮18xxxx | 无码国内精品人妻少妇 | 国产国产精品人在线视 | 高中生自慰www网站 | 少妇性l交大片欧洲热妇乱xxx | 在线欧美精品一区二区三区 | 国内精品一区二区三区不卡 | 玩弄少妇高潮ⅹxxxyw | 狠狠色丁香久久婷婷综合五月 | 国产精品久久久一区二区三区 | 国产亚洲精品久久久久久久久动漫 | 成人欧美一区二区三区黑人 | 久久久精品成人免费观看 | 97精品人妻一区二区三区香蕉 | 好爽又高潮了毛片免费下载 | 亚洲国产午夜精品理论片 | 中国大陆精品视频xxxx | 激情亚洲一区国产精品 | 水蜜桃亚洲一二三四在线 | 婷婷五月综合缴情在线视频 | 无码人妻丰满熟妇区五十路百度 | 51国偷自产一区二区三区 | 国产超碰人人爽人人做人人添 | 国产口爆吞精在线视频 | 国产精品99爱免费视频 | 亚洲精品www久久久 | 国产美女极度色诱视频www | 久久久久久国产精品无码下载 | 亚洲国产精品久久久天堂 | 国产精品高潮呻吟av久久 | 性色欲网站人妻丰满中文久久不卡 | 日本大乳高潮视频在线观看 | 国产精品高潮呻吟av久久4虎 | 亚洲自偷自拍另类第1页 | 亚洲精品国产a久久久久久 | 亚洲色无码一区二区三区 | 国产熟妇高潮叫床视频播放 | 欧美肥老太牲交大战 | 亚洲精品一区国产 | 亚洲男人av天堂午夜在 | 中文字幕精品av一区二区五区 | 夫妻免费无码v看片 | 成人一区二区免费视频 | 国产性猛交╳xxx乱大交 国产精品久久久久久无码 欧洲欧美人成视频在线 | 国产亚洲视频中文字幕97精品 | 又粗又大又硬毛片免费看 | 又大又紧又粉嫩18p少妇 | 99久久精品日本一区二区免费 | 国产亚洲日韩欧美另类第八页 | 未满成年国产在线观看 | 国产乱码精品一品二品 | 国产亚洲美女精品久久久2020 | 国产内射爽爽大片视频社区在线 | 亚洲中文字幕乱码av波多ji | 亚洲日韩av片在线观看 | 亚洲综合另类小说色区 | 亚洲欧美日韩成人高清在线一区 | 久久久久久国产精品无码下载 | 亚洲精品国产品国语在线观看 | 扒开双腿吃奶呻吟做受视频 | 蜜桃视频插满18在线观看 | 精品人人妻人人澡人人爽人人 | 中文字幕无码视频专区 | 国产亚洲欧美日韩亚洲中文色 | 久久国产精品萌白酱免费 | 装睡被陌生人摸出水好爽 | 国产色xx群视频射精 | 亚洲aⅴ无码成人网站国产app | 色狠狠av一区二区三区 | 黑人巨大精品欧美一区二区 | 免费人成在线观看网站 | 女人被爽到呻吟gif动态图视看 | www一区二区www免费 | 亚洲国产精品一区二区美利坚 | 乱人伦中文视频在线观看 | 奇米影视7777久久精品人人爽 | 色爱情人网站 | 最新国产乱人伦偷精品免费网站 | 亚洲 另类 在线 欧美 制服 | 中国女人内谢69xxxxxa片 | 国内丰满熟女出轨videos | 日本va欧美va欧美va精品 | 又粗又大又硬又长又爽 | 色五月五月丁香亚洲综合网 | 日韩少妇白浆无码系列 | 亚洲日韩乱码中文无码蜜桃臀网站 | 最近免费中文字幕中文高清百度 | 天天拍夜夜添久久精品大 | 大地资源中文第3页 | 一区二区传媒有限公司 | 精品国产一区二区三区四区 | 日本免费一区二区三区最新 | 欧美成人午夜精品久久久 | 亚洲国产一区二区三区在线观看 | 夜精品a片一区二区三区无码白浆 | 激情内射亚州一区二区三区爱妻 | 国产性猛交╳xxx乱大交 国产精品久久久久久无码 欧洲欧美人成视频在线 | 久久精品成人欧美大片 | 亚洲综合无码久久精品综合 | 国产精品多人p群无码 | 丁香花在线影院观看在线播放 | 亚洲天堂2017无码中文 | 国产亚洲人成在线播放 | 色欲久久久天天天综合网精品 | 日本乱人伦片中文三区 | 亚洲人交乣女bbw | 色综合久久中文娱乐网 | 女人被爽到呻吟gif动态图视看 | 好爽又高潮了毛片免费下载 | 欧美人与禽猛交狂配 | 日韩人妻系列无码专区 | 国产成人精品久久亚洲高清不卡 | 国产av无码专区亚洲a∨毛片 | 国产片av国语在线观看 | 成人免费无码大片a毛片 | 图片区 小说区 区 亚洲五月 | 在线欧美精品一区二区三区 | 天干天干啦夜天干天2017 | 欧美三级不卡在线观看 | 蜜桃视频插满18在线观看 | 蜜桃av蜜臀av色欲av麻 999久久久国产精品消防器材 | 免费乱码人妻系列无码专区 | 任你躁在线精品免费 | 波多野结衣av一区二区全免费观看 | 无码任你躁久久久久久久 | 久久久久久亚洲精品a片成人 | 日韩无码专区 | 中文字幕av伊人av无码av | 麻豆成人精品国产免费 | 欧美日本精品一区二区三区 | 国产人妖乱国产精品人妖 | 老熟女重囗味hdxx69 | 牲欲强的熟妇农村老妇女视频 | 亚洲一区二区三区偷拍女厕 | 夜夜躁日日躁狠狠久久av | 131美女爱做视频 | 丰满人妻被黑人猛烈进入 | 亚洲国产精品无码一区二区三区 | 国产精品多人p群无码 | 欧美阿v高清资源不卡在线播放 | 国产xxx69麻豆国语对白 | 国产肉丝袜在线观看 | 中文字幕av无码一区二区三区电影 | 99精品国产综合久久久久五月天 | 青青青爽视频在线观看 | 国产深夜福利视频在线 | 国产精品va在线观看无码 | 亚洲一区二区三区播放 | 日本在线高清不卡免费播放 | 国产精品久久久av久久久 | 最新国产麻豆aⅴ精品无码 | 在线精品国产一区二区三区 | 少妇性俱乐部纵欲狂欢电影 | 亚洲熟女一区二区三区 | 久久国产劲爆∧v内射 | 国产欧美熟妇另类久久久 | 夜夜夜高潮夜夜爽夜夜爰爰 | 国产人妻精品一区二区三区不卡 | 性欧美videos高清精品 | 亚洲欧美色中文字幕在线 | 国产人妻人伦精品1国产丝袜 | 无码中文字幕色专区 | 少妇人妻av毛片在线看 | 天天做天天爱天天爽综合网 | 精品无码av一区二区三区 | 亚洲精品久久久久avwww潮水 | 又大又硬又爽免费视频 | 未满小14洗澡无码视频网站 | 双乳奶水饱满少妇呻吟 | 免费乱码人妻系列无码专区 | 亚洲爆乳无码专区 | 国产性猛交╳xxx乱大交 国产精品久久久久久无码 欧洲欧美人成视频在线 | 国产亚洲tv在线观看 | 中文亚洲成a人片在线观看 | 久久久久免费看成人影片 | 少妇一晚三次一区二区三区 | 欧美国产日韩久久mv | 俺去俺来也在线www色官网 | 麻豆国产丝袜白领秘书在线观看 | 麻豆国产人妻欲求不满 | 国产乱码精品一品二品 | 人妻与老人中文字幕 | 东京热男人av天堂 | 国产疯狂伦交大片 | 国产suv精品一区二区五 | 亚洲 另类 在线 欧美 制服 | 日韩av无码一区二区三区 | 国产亚洲精品久久久闺蜜 | 久久精品中文闷骚内射 | 给我免费的视频在线观看 | 亚洲国产精品无码久久久久高潮 | 人妻少妇被猛烈进入中文字幕 | 人人爽人人爽人人片av亚洲 | 亚洲中文无码av永久不收费 | 精品厕所偷拍各类美女tp嘘嘘 | 国产情侣作爱视频免费观看 | 中文字幕中文有码在线 | 日韩亚洲欧美中文高清在线 | 亚洲中文无码av永久不收费 | 国产亚av手机在线观看 | 永久免费观看美女裸体的网站 | 国产午夜无码视频在线观看 | 午夜福利一区二区三区在线观看 | 六月丁香婷婷色狠狠久久 | 亚洲另类伦春色综合小说 | 国产精品va在线观看无码 | 精品无人区无码乱码毛片国产 | 国产内射爽爽大片视频社区在线 | 装睡被陌生人摸出水好爽 | 少妇高潮喷潮久久久影院 | 国産精品久久久久久久 | 亚洲色欲久久久综合网东京热 | 综合网日日天干夜夜久久 | 国产精品亚洲一区二区三区喷水 | 亚洲爆乳大丰满无码专区 | 强开小婷嫩苞又嫩又紧视频 | 日本护士毛茸茸高潮 | 成人试看120秒体验区 | 综合人妻久久一区二区精品 | 亚洲精品欧美二区三区中文字幕 | 任你躁国产自任一区二区三区 | a片免费视频在线观看 | 黄网在线观看免费网站 | 中文字幕av日韩精品一区二区 | 久久久久久久女国产乱让韩 | 丰满少妇熟乱xxxxx视频 | 亚洲精品一区三区三区在线观看 | 综合激情五月综合激情五月激情1 | 亲嘴扒胸摸屁股激烈网站 | 青青久在线视频免费观看 | 久久久亚洲欧洲日产国码αv | 精品偷自拍另类在线观看 | 天堂一区人妻无码 | 九月婷婷人人澡人人添人人爽 | 日韩精品久久久肉伦网站 | 国産精品久久久久久久 | 欧美三级不卡在线观看 | 我要看www免费看插插视频 | 国产精品无码一区二区三区不卡 | 久久人妻内射无码一区三区 | 国产深夜福利视频在线 | 东京无码熟妇人妻av在线网址 | 日韩成人一区二区三区在线观看 | 国产av一区二区三区最新精品 | 免费人成网站视频在线观看 | 国产免费观看黄av片 | 亚洲午夜久久久影院 | 国产97色在线 | 免 | 午夜精品久久久久久久 | 国产艳妇av在线观看果冻传媒 | 国产精品亚洲五月天高清 | 青青久在线视频免费观看 | 日韩精品无码免费一区二区三区 | 中文字幕精品av一区二区五区 | 又黄又爽又色的视频 | 久久久久久a亚洲欧洲av冫 | √天堂中文官网8在线 | 久久久久成人片免费观看蜜芽 | 东京热男人av天堂 | 欧美人与禽zoz0性伦交 | 亚洲成av人片在线观看无码不卡 | 秋霞特色aa大片 | aⅴ亚洲 日韩 色 图网站 播放 | 亚洲欧美日韩国产精品一区二区 | 四虎影视成人永久免费观看视频 | 日本在线高清不卡免费播放 | 国产三级久久久精品麻豆三级 | 久激情内射婷内射蜜桃人妖 | 最近免费中文字幕中文高清百度 | 欧美人妻一区二区三区 | 国产特级毛片aaaaaaa高清 | 51国偷自产一区二区三区 | 国产精品18久久久久久麻辣 | 亚洲色在线无码国产精品不卡 | 久久久无码中文字幕久... | 天天躁夜夜躁狠狠是什么心态 | 国产亚洲精品久久久久久久 | 扒开双腿吃奶呻吟做受视频 | 国精产品一品二品国精品69xx | 玩弄少妇高潮ⅹxxxyw | 四虎国产精品免费久久 | 亚洲精品国产第一综合99久久 | 亚洲一区二区三区含羞草 | 少妇无码吹潮 | 久久精品丝袜高跟鞋 | 国产suv精品一区二区五 | 亚洲精品鲁一鲁一区二区三区 | 成人性做爰aaa片免费看 | 日本一区二区三区免费高清 | 男女爱爱好爽视频免费看 | 国产97在线 | 亚洲 | 亚洲国产精品一区二区美利坚 | 天天综合网天天综合色 | 麻豆蜜桃av蜜臀av色欲av | 玩弄少妇高潮ⅹxxxyw | 色综合久久久无码网中文 | 亚洲七七久久桃花影院 | 岛国片人妻三上悠亚 | 日韩av无码一区二区三区 | 亚洲成av人在线观看网址 | 日韩欧美群交p片內射中文 | 无码毛片视频一区二区本码 | 亚洲人成影院在线观看 | 精品国产一区二区三区四区在线看 | 亚洲熟妇自偷自拍另类 | 亚洲精品鲁一鲁一区二区三区 | 亚洲色无码一区二区三区 | 久久成人a毛片免费观看网站 | 国产成人精品一区二区在线小狼 | 丰满少妇人妻久久久久久 | 精品夜夜澡人妻无码av蜜桃 | 久久久av男人的天堂 | 亚洲码国产精品高潮在线 | 成年女人永久免费看片 | 曰韩无码二三区中文字幕 | 久久久久亚洲精品男人的天堂 | 美女张开腿让人桶 | 久久综合狠狠综合久久综合88 | 国产精品第一国产精品 | 日本一卡二卡不卡视频查询 | 精品国产一区二区三区四区 | 荫蒂被男人添的好舒服爽免费视频 | 97人妻精品一区二区三区 | 天天做天天爱天天爽综合网 | 国产精品久久久久影院嫩草 | 欧美熟妇另类久久久久久不卡 | 欧美激情综合亚洲一二区 | 丰满肥臀大屁股熟妇激情视频 | 国产疯狂伦交大片 | 中文亚洲成a人片在线观看 | 亚洲欧美国产精品久久 | 宝宝好涨水快流出来免费视频 | 国产情侣作爱视频免费观看 | 色婷婷欧美在线播放内射 | 国产欧美亚洲精品a | 美女扒开屁股让男人桶 | 极品嫩模高潮叫床 | 激情亚洲一区国产精品 | 欧美人与禽猛交狂配 | 97无码免费人妻超级碰碰夜夜 | 夜夜影院未满十八勿进 | 国产精品资源一区二区 | 麻豆国产97在线 | 欧洲 | 永久免费观看国产裸体美女 | 999久久久国产精品消防器材 | 亚洲爆乳精品无码一区二区三区 | 麻花豆传媒剧国产免费mv在线 | 欧美人与善在线com | 日韩少妇白浆无码系列 | 色综合久久久无码中文字幕 | 欧美兽交xxxx×视频 | 蜜桃臀无码内射一区二区三区 | 中国女人内谢69xxxxxa片 | 高清不卡一区二区三区 | 欧美 丝袜 自拍 制服 另类 | 亚洲爆乳精品无码一区二区三区 | 蜜臀aⅴ国产精品久久久国产老师 | 免费中文字幕日韩欧美 | 亚洲日本一区二区三区在线 | 特级做a爰片毛片免费69 | 精品一二三区久久aaa片 | 黑人巨大精品欧美黑寡妇 | 欧美三级不卡在线观看 | 4hu四虎永久在线观看 | 国产精品国产三级国产专播 | 中文字幕av无码一区二区三区电影 | 亚洲精品国产第一综合99久久 | 亚洲 a v无 码免 费 成 人 a v | 2020最新国产自产精品 | 国产成人精品视频ⅴa片软件竹菊 | 精品无码成人片一区二区98 | 国产激情综合五月久久 | 在线a亚洲视频播放在线观看 | 蜜桃无码一区二区三区 | 精品无码国产自产拍在线观看蜜 | 亚洲成av人综合在线观看 | 97久久精品无码一区二区 | 国产性猛交╳xxx乱大交 国产精品久久久久久无码 欧洲欧美人成视频在线 | 成人欧美一区二区三区 | 国产麻豆精品一区二区三区v视界 | 欧洲极品少妇 | 无码一区二区三区在线 | 亚洲国产精品成人久久蜜臀 | 国内少妇偷人精品视频免费 | 亚洲欧美国产精品久久 | 国产精品亚洲一区二区三区喷水 | 精品厕所偷拍各类美女tp嘘嘘 | 亚洲爆乳精品无码一区二区三区 | 国产乱人伦av在线无码 | 国产性猛交╳xxx乱大交 国产精品久久久久久无码 欧洲欧美人成视频在线 | 乱码av麻豆丝袜熟女系列 | 88国产精品欧美一区二区三区 | 国产激情综合五月久久 | 中文字幕日韩精品一区二区三区 | 97人妻精品一区二区三区 | 亚洲精品久久久久久久久久久 | 国产精品国产自线拍免费软件 | 亚洲熟女一区二区三区 | 无码吃奶揉捏奶头高潮视频 | 老头边吃奶边弄进去呻吟 | 精品人妻人人做人人爽夜夜爽 | 久久久久久久女国产乱让韩 | 日韩欧美群交p片內射中文 | 装睡被陌生人摸出水好爽 | 少妇性l交大片 | 日日麻批免费40分钟无码 | 久久久精品欧美一区二区免费 | 影音先锋中文字幕无码 | 精品无码一区二区三区的天堂 | 中文精品无码中文字幕无码专区 | 亚洲色www成人永久网址 | 国产又粗又硬又大爽黄老大爷视 | 免费播放一区二区三区 | 国产人成高清在线视频99最全资源 | 1000部啪啪未满十八勿入下载 | 久久久精品456亚洲影院 | 国语精品一区二区三区 | 少妇激情av一区二区 | 精品偷拍一区二区三区在线看 | 免费观看黄网站 | 亚洲の无码国产の无码影院 | 久久精品人妻少妇一区二区三区 |