久久精品国产精品国产精品污,男人扒开添女人下部免费视频,一级国产69式性姿势免费视频,夜鲁夜鲁很鲁在线视频 视频,欧美丰满少妇一区二区三区,国产偷国产偷亚洲高清人乐享,中文 在线 日韩 亚洲 欧美,熟妇人妻无乱码中文字幕真矢织江,一区二区三区人妻制服国产

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Storm概念学习系列之storm-starter项目(完整版)(博主推荐)

發布時間:2025/5/22 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Storm概念学习系列之storm-starter项目(完整版)(博主推荐) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

這是書籍《從零開始學Storm》趙必廈 2014年出版的配套代碼!

  storm-starter項目包含使用storm的各種各樣的例子。項目托管在GitHub上面,其網址為:?http://github.com/nathanmarz/storm-starter

?

?

?

?

?

?

?

  或者

  ?

?

?

?

 storm-starter項目的包結構:

?

?

?

?  storm-starter項目的拓撲結構:

?

?

?

  新建maven項目的方式

  以“新建Maven項目的方式”導入storm-starter項目的步驟如下:

1、新建一個Maven項目,項目名稱可以隨意,如storm-starter。

?

?

?

?

?

?

?

?

?

?

2、把storm-starter項目根目錄的src\jvm目錄中的全部文件復制到Maven項目的src/main/java目錄下。

?

?

?

?

?

?

?

?

?

  storm-starter-master\src\jvm\storm\starter下的BasicDRPCTopology.java

package storm.starter;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.drpc.LinearDRPCTopologyBuilder; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;/*** This topology is a basic example of doing distributed RPC on top of Storm. It implements a function that appends a* "!" to any string you send the DRPC function.* <p/>* See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on doing distributed RPC on top of* Storm.*/ public class BasicDRPCTopology {public static class ExclaimBolt extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String input = tuple.getString(1);collector.emit(new Values(tuple.getValue(0), input + "!"));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "result"));}}public static void main(String[] args) throws Exception {LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");builder.addBolt(new ExclaimBolt(), 3);Config conf = new Config();if (args == null || args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));for (String word : new String[]{ "hello", "goodbye" }) {System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));}cluster.shutdown();drpc.shutdown();}else {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());}} }

?

?

?

?

?

?

  storm-starter-master\src\jvm\storm\starter下的的ExclamationTopology.java

package storm.starter;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.TestWordSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils;import java.util.Map;/*** This is a basic example of a Storm topology.*/ public class ExclamationTopology {public static class ExclamationBolt extends BaseRichBolt {OutputCollector _collector;@Overridepublic void prepare(Map conf, TopologyContext context, OutputCollector collector) {_collector = collector;}@Overridepublic void execute(Tuple tuple) {_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));_collector.ack(tuple);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("word", new TestWordSpout(), 10);builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");Config conf = new Config();conf.setDebug(true);if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, builder.createTopology());}else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("test", conf, builder.createTopology());Utils.sleep(10000);cluster.killTopology("test");cluster.shutdown();}} }

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter下的ManualDRPC.java

package storm.starter;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.drpc.DRPCSpout; import backtype.storm.drpc.ReturnResults; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;public class ManualDRPC {public static class ExclamationBolt extends BaseBasicBolt {@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("result", "return-info"));}@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String arg = tuple.getString(0);Object retInfo = tuple.getValue(1);collector.emit(new Values(arg + "!!!", retInfo));}}public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();LocalDRPC drpc = new LocalDRPC();DRPCSpout spout = new DRPCSpout("exclamation", drpc);builder.setSpout("drpc", spout);builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");LocalCluster cluster = new LocalCluster();Config conf = new Config();cluster.submitTopology("exclaim", conf, builder.createTopology());System.out.println(drpc.execute("exclamation", "aaa"));System.out.println(drpc.execute("exclamation", "bbb"));} }

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter下的PrintSampleStream.java

/* // to use this example, uncomment the twitter4j dependency information in the project.clj, // uncomment storm.starter.spout.TwitterSampleSpout, and uncomment this classpackage storm.starter;import storm.starter.spout.TwitterSampleSpout; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; import storm.starter.bolt.PrinterBolt;public class PrintSampleStream { public static void main(String[] args) {String username = args[0];String pwd = args[1];TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new TwitterSampleSpout(username, pwd));builder.setBolt("print", new PrinterBolt()).shuffleGrouping("spout");Config conf = new Config();LocalCluster cluster = new LocalCluster();cluster.submitTopology("test", conf, builder.createTopology());Utils.sleep(10000);cluster.shutdown();} } */

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter下的ReachTopology.java

package storm.starter;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.drpc.LinearDRPCTopologyBuilder; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseBatchBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;import java.util.*;/*** This is a good example of doing complex Distributed RPC on top of Storm. This program creates a topology that can* compute the reach for any URL on Twitter in realtime by parallelizing the whole computation.* <p/>* Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you have to get all the people* who tweeted the URL, get all the followers of all those people, unique that set of followers, and then count the* unique set. It's an intense computation that can involve thousands of database calls and tens of millions of follower* records.* <p/>* This Storm topology does every piece of that computation in parallel, turning what would be a computation that takes* minutes on a single machine into one that takes just a couple seconds.* <p/>* For the purposes of demonstration, this topology replaces the use of actual DBs with in-memory hashmaps.* <p/>* See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on Distributed RPC.*/ public class ReachTopology {public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));}};public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));put("tim", Arrays.asList("alex"));put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));put("adam", Arrays.asList("david", "carissa"));put("mike", Arrays.asList("john", "bob"));put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));}};public static class GetTweeters extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {Object id = tuple.getValue(0);String url = tuple.getString(1);List<String> tweeters = TWEETERS_DB.get(url);if (tweeters != null) {for (String tweeter : tweeters) {collector.emit(new Values(id, tweeter));}}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "tweeter"));}}public static class GetFollowers extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {Object id = tuple.getValue(0);String tweeter = tuple.getString(1);List<String> followers = FOLLOWERS_DB.get(tweeter);if (followers != null) {for (String follower : followers) {collector.emit(new Values(id, follower));}}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "follower"));}}public static class PartialUniquer extends BaseBatchBolt {BatchOutputCollector _collector;Object _id;Set<String> _followers = new HashSet<String>();@Overridepublic void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {_collector = collector;_id = id;}@Overridepublic void execute(Tuple tuple) {_followers.add(tuple.getString(1));}@Overridepublic void finishBatch() {_collector.emit(new Values(_id, _followers.size()));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "partial-count"));}}public static class CountAggregator extends BaseBatchBolt {BatchOutputCollector _collector;Object _id;int _count = 0;@Overridepublic void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {_collector = collector;_id = id;}@Overridepublic void execute(Tuple tuple) {_count += tuple.getInteger(1);}@Overridepublic void finishBatch() {_collector.emit(new Values(_id, _count));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "reach"));}}public static LinearDRPCTopologyBuilder construct() {LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");builder.addBolt(new GetTweeters(), 4);builder.addBolt(new GetFollowers(), 12).shuffleGrouping();builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id"));return builder;}public static void main(String[] args) throws Exception {LinearDRPCTopologyBuilder builder = construct();Config conf = new Config();if (args == null || args.length == 0) {conf.setMaxTaskParallelism(3);LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc));String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" };for (String url : urlsToTry) {System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));}cluster.shutdown();drpc.shutdown();}else {conf.setNumWorkers(6);StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());}} }

?

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter下的RollingTopWords.java

package storm.starter;import backtype.storm.Config; import backtype.storm.testing.TestWordSpout; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import storm.starter.bolt.IntermediateRankingsBolt; import storm.starter.bolt.RollingCountBolt; import storm.starter.bolt.TotalRankingsBolt; import storm.starter.util.StormRunner;/*** This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality.* The top N computation is done in a completely scalable way, and a similar approach could be used to compute things* like trending topics or trending images on Twitter.*/ public class RollingTopWords {private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;private static final int TOP_N = 5;private final TopologyBuilder builder;private final String topologyName;private final Config topologyConfig;private final int runtimeInSeconds;public RollingTopWords() throws InterruptedException {builder = new TopologyBuilder();topologyName = "slidingWindowCounts";topologyConfig = createTopologyConfiguration();runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;wireTopology();}private static Config createTopologyConfiguration() {Config conf = new Config();conf.setDebug(true);return conf;}private void wireTopology() throws InterruptedException {String spoutId = "wordGenerator";String counterId = "counter";String intermediateRankerId = "intermediateRanker";String totalRankerId = "finalRanker";builder.setSpout(spoutId, new TestWordSpout(), 5);builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields("obj"));builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);}public void run() throws InterruptedException {StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds);}public static void main(String[] args) throws Exception {new RollingTopWords().run();} }

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter下的SingleJoinExample.java

package storm.starter;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.testing.FeederSpout; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import storm.starter.bolt.SingleJoinBolt;public class SingleJoinExample {public static void main(String[] args) {FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));TopologyBuilder builder = new TopologyBuilder();builder.setSpout("gender", genderSpout);builder.setSpout("age", ageSpout);builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id")).fieldsGrouping("age", new Fields("id"));Config conf = new Config();conf.setDebug(true);LocalCluster cluster = new LocalCluster();cluster.submitTopology("join-example", conf, builder.createTopology());for (int i = 0; i < 10; i++) {String gender;if (i % 2 == 0) {gender = "male";}else {gender = "female";}genderSpout.feed(new Values(i, gender));}for (int i = 9; i >= 0; i--) {ageSpout.feed(new Values(i, i + 20));}Utils.sleep(2000);cluster.shutdown();} }

?

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter下的TransactionalGlobalCount.java

package storm.starter;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.MemoryTransactionalSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBatchBolt; import backtype.storm.topology.base.BaseTransactionalBolt; import backtype.storm.transactional.ICommitter; import backtype.storm.transactional.TransactionAttempt; import backtype.storm.transactional.TransactionalTopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;/*** This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far in a* database. The source of data and the databases are mocked out as in memory maps for demonstration purposes. This* class is defined in depth on the wiki at https://github.com/nathanmarz/storm/wiki/Transactional-topologies*/ public class TransactionalGlobalCount {public static final int PARTITION_TAKE_PER_BATCH = 3;public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{put(0, new ArrayList<List<Object>>() {{add(new Values("cat"));add(new Values("dog"));add(new Values("chicken"));add(new Values("cat"));add(new Values("dog"));add(new Values("apple"));}});put(1, new ArrayList<List<Object>>() {{add(new Values("cat"));add(new Values("dog"));add(new Values("apple"));add(new Values("banana"));}});put(2, new ArrayList<List<Object>>() {{add(new Values("cat"));add(new Values("cat"));add(new Values("cat"));add(new Values("cat"));add(new Values("cat"));add(new Values("dog"));add(new Values("dog"));add(new Values("dog"));add(new Values("dog"));}});}};public static class Value {int count = 0;BigInteger txid;}public static Map<String, Value> DATABASE = new HashMap<String, Value>();public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";public static class BatchCount extends BaseBatchBolt {Object _id;BatchOutputCollector _collector;int _count = 0;@Overridepublic void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {_collector = collector;_id = id;}@Overridepublic void execute(Tuple tuple) {_count++;}@Overridepublic void finishBatch() {_collector.emit(new Values(_id, _count));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "count"));}}public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {TransactionAttempt _attempt;BatchOutputCollector _collector;int _sum = 0;@Overridepublic void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {_collector = collector;_attempt = attempt;}@Overridepublic void execute(Tuple tuple) {_sum += tuple.getInteger(1);}@Overridepublic void finishBatch() {Value val = DATABASE.get(GLOBAL_COUNT_KEY);Value newval;if (val == null || !val.txid.equals(_attempt.getTransactionId())) {newval = new Value();newval.txid = _attempt.getTransactionId();if (val == null) {newval.count = _sum;}else {newval.count = _sum + val.count;}DATABASE.put(GLOBAL_COUNT_KEY, newval);}else {newval = val;}_collector.emit(new Values(_attempt, newval.count));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "sum"));}}public static void main(String[] args) throws Exception {MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout");builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");LocalCluster cluster = new LocalCluster();Config config = new Config();config.setDebug(true);config.setMaxSpoutPending(3);cluster.submitTopology("global-count-topology", config, builder.buildTopology());Thread.sleep(3000);cluster.shutdown();} }

?

?

?

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter下的TransactionalWords.java

package storm.starter;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.MemoryTransactionalSpout; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseTransactionalBolt; import backtype.storm.transactional.ICommitter; import backtype.storm.transactional.TransactionAttempt; import backtype.storm.transactional.TransactionalTopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;/*** This class defines a more involved transactional topology then TransactionalGlobalCount. This topology processes a* stream of words and produces two outputs:* <p/>* 1. A count for each word (stored in a database) 2. The number of words for every bucket of 10 counts. So it stores in* the database how many words have appeared 0-9 times, how many have appeared 10-19 times, and so on.* <p/>* A batch of words can cause the bucket counts to decrement for some buckets and increment for others as words move* between buckets as their counts accumulate.*/ public class TransactionalWords {public static class CountValue {Integer prev_count = null;int count = 0;BigInteger txid = null;}public static class BucketValue {int count = 0;BigInteger txid;}public static final int BUCKET_SIZE = 10;public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();public static final int PARTITION_TAKE_PER_BATCH = 3;public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{put(0, new ArrayList<List<Object>>() {{add(new Values("cat"));add(new Values("dog"));add(new Values("chicken"));add(new Values("cat"));add(new Values("dog"));add(new Values("apple"));}});put(1, new ArrayList<List<Object>>() {{add(new Values("cat"));add(new Values("dog"));add(new Values("apple"));add(new Values("banana"));}});put(2, new ArrayList<List<Object>>() {{add(new Values("cat"));add(new Values("cat"));add(new Values("cat"));add(new Values("cat"));add(new Values("cat"));add(new Values("dog"));add(new Values("dog"));add(new Values("dog"));add(new Values("dog"));}});}};public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter {Map<String, Integer> _counts = new HashMap<String, Integer>();BatchOutputCollector _collector;TransactionAttempt _id;int _count = 0;@Overridepublic void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {_collector = collector;_id = id;}@Overridepublic void execute(Tuple tuple) {String key = tuple.getString(1);Integer curr = _counts.get(key);if (curr == null)curr = 0;_counts.put(key, curr + 1);}@Overridepublic void finishBatch() {for (String key : _counts.keySet()) {CountValue val = COUNT_DATABASE.get(key);CountValue newVal;if (val == null || !val.txid.equals(_id)) {newVal = new CountValue();newVal.txid = _id.getTransactionId();if (val != null) {newVal.prev_count = val.count;newVal.count = val.count;}newVal.count = newVal.count + _counts.get(key);COUNT_DATABASE.put(key, newVal);}else {newVal = val;}_collector.emit(new Values(_id, key, newVal.count, newVal.prev_count));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "key", "count", "prev-count"));}}public static class Bucketize extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);int curr = tuple.getInteger(2);Integer prev = tuple.getInteger(3);int currBucket = curr / BUCKET_SIZE;Integer prevBucket = null;if (prev != null) {prevBucket = prev / BUCKET_SIZE;}if (prevBucket == null) {collector.emit(new Values(attempt, currBucket, 1));}else if (currBucket != prevBucket) {collector.emit(new Values(attempt, currBucket, 1));collector.emit(new Values(attempt, prevBucket, -1));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("attempt", "bucket", "delta"));}}public static class BucketCountUpdater extends BaseTransactionalBolt {Map<Integer, Integer> _accum = new HashMap<Integer, Integer>();BatchOutputCollector _collector;TransactionAttempt _attempt;int _count = 0;@Overridepublic void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {_collector = collector;_attempt = attempt;}@Overridepublic void execute(Tuple tuple) {Integer bucket = tuple.getInteger(1);Integer delta = tuple.getInteger(2);Integer curr = _accum.get(bucket);if (curr == null)curr = 0;_accum.put(bucket, curr + delta);}@Overridepublic void finishBatch() {for (Integer bucket : _accum.keySet()) {BucketValue currVal = BUCKET_DATABASE.get(bucket);BucketValue newVal;if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) {newVal = new BucketValue();newVal.txid = _attempt.getTransactionId();newVal.count = _accum.get(bucket);if (currVal != null)newVal.count += currVal.count;BUCKET_DATABASE.put(bucket, newVal);}else {newVal = currVal;}_collector.emit(new Values(_attempt, bucket, newVal.count));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "bucket", "count"));}}public static void main(String[] args) throws Exception {MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word"));builder.setBolt("bucketize", new Bucketize()).noneGrouping("count");builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket"));LocalCluster cluster = new LocalCluster();Config config = new Config();config.setDebug(true);config.setMaxSpoutPending(3);cluster.submitTopology("top-n-topology", config, builder.buildTopology());Thread.sleep(3000);cluster.shutdown();} }

?

?

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter下的WordCountTopology.java

package storm.starter;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.ShellBolt; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import storm.starter.spout.RandomSentenceSpout;import java.util.HashMap; import java.util.Map;/*** This topology demonstrates Storm's stream groupings and multilang capabilities.*/ public class WordCountTopology {public static class SplitSentence extends ShellBolt implements IRichBolt {public SplitSentence() {super("python", "splitsentence.py");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}public static class WordCount extends BaseBasicBolt {Map<String, Integer> counts = new HashMap<String, Integer>();@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String word = tuple.getString(0);Integer count = counts.get(word);if (count == null)count = 0;count++;counts.put(word, count);collector.emit(new Values(word, count));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}}public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new RandomSentenceSpout(), 5);builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));Config conf = new Config();conf.setDebug(true);if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, builder.createTopology());}else {conf.setMaxTaskParallelism(3);LocalCluster cluster = new LocalCluster();cluster.submitTopology("word-count", conf, builder.createTopology());Thread.sleep(10000);cluster.shutdown();}} }

?

?

?

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\spout的RandomSentenceSpout.java

package storm.starter.spout;import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils;import java.util.Map; import java.util.Random;public class RandomSentenceSpout extends BaseRichSpout {SpoutOutputCollector _collector;Random _rand;@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {_collector = collector;_rand = new Random();}@Overridepublic void nextTuple() {Utils.sleep(100);String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away","four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };String sentence = sentences[_rand.nextInt(sentences.length)];_collector.emit(new Values(sentence));}@Overridepublic void ack(Object id) {}@Overridepublic void fail(Object id) {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\spout的TwitterSampleSpout.java

/*package storm.starter.spout;import backtype.storm.Config; import twitter4j.conf.ConfigurationBuilder; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener;public class TwitterSampleSpout extends BaseRichSpout {SpoutOutputCollector _collector;LinkedBlockingQueue<Status> queue = null;TwitterStream _twitterStream;String _username;String _pwd;public TwitterSampleSpout(String username, String pwd) {_username = username;_pwd = pwd;}@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {queue = new LinkedBlockingQueue<Status>(1000);_collector = collector;StatusListener listener = new StatusListener() {@Overridepublic void onStatus(Status status) {queue.offer(status);}@Overridepublic void onDeletionNotice(StatusDeletionNotice sdn) {}@Overridepublic void onTrackLimitationNotice(int i) {}@Overridepublic void onScrubGeo(long l, long l1) {}@Overridepublic void onException(Exception e) {}};TwitterStreamFactory fact = new TwitterStreamFactory(new ConfigurationBuilder().setUser(_username).setPassword(_pwd).build());_twitterStream = fact.getInstance();_twitterStream.addListener(listener);_twitterStream.sample();}@Overridepublic void nextTuple() {Status ret = queue.poll();if(ret==null) {Utils.sleep(50);} else {_collector.emit(new Values(ret));}}@Overridepublic void close() {_twitterStream.shutdown();}@Overridepublic Map<String, Object> getComponentConfiguration() {Config ret = new Config();ret.setMaxTaskParallelism(1);return ret;} @Overridepublic void ack(Object id) {}@Overridepublic void fail(Object id) {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("tweet"));}} */

?

?

?

?

?

?

?

?

?

?

?

?storm-starter-master\src\jvm\storm\starter\bolt的AbstractRankerBolt.java

package storm.starter.bolt;import backtype.storm.Config; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import org.apache.log4j.Logger; import storm.starter.tools.Rankings; import storm.starter.util.TupleHelpers;import java.util.HashMap; import java.util.Map;/*** This abstract bolt provides the basic behavior of bolts that rank objects according to their count.* <p/>* It uses a template method design pattern for {@link AbstractRankerBolt#execute(Tuple, BasicOutputCollector)} to allow* actual bolt implementations to specify how incoming tuples are processed, i.e. how the objects embedded within those* tuples are retrieved and counted.*/ public abstract class AbstractRankerBolt extends BaseBasicBolt {private static final long serialVersionUID = 4931640198501530202L;private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2;private static final int DEFAULT_COUNT = 10;private final int emitFrequencyInSeconds;private final int count;private final Rankings rankings;public AbstractRankerBolt() {this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);}public AbstractRankerBolt(int topN) {this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);}public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {if (topN < 1) {throw new IllegalArgumentException("topN must be >= 1 (you requested " + topN + ")");}if (emitFrequencyInSeconds < 1) {throw new IllegalArgumentException("The emit frequency must be >= 1 seconds (you requested " + emitFrequencyInSeconds + " seconds)");}count = topN;this.emitFrequencyInSeconds = emitFrequencyInSeconds;rankings = new Rankings(count);}protected Rankings getRankings() {return rankings;}/*** This method functions as a template method (design pattern).*/@Overridepublic final void execute(Tuple tuple, BasicOutputCollector collector) {if (TupleHelpers.isTickTuple(tuple)) {getLogger().debug("Received tick tuple, triggering emit of current rankings");emitRankings(collector);}else {updateRankingsWithTuple(tuple);}}abstract void updateRankingsWithTuple(Tuple tuple);private void emitRankings(BasicOutputCollector collector) {collector.emit(new Values(rankings.copy()));getLogger().debug("Rankings: " + rankings);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("rankings"));}@Overridepublic Map<String, Object> getComponentConfiguration() {Map<String, Object> conf = new HashMap<String, Object>();conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);return conf;}abstract Logger getLogger(); }

?

?

?

?

?

?

?

?

?

?storm-starter-master\src\jvm\storm\starter\bolt的IntermediateRankingsBolt.java

package storm.starter.bolt;import backtype.storm.tuple.Tuple; import org.apache.log4j.Logger; import storm.starter.tools.Rankable; import storm.starter.tools.RankableObjectWithFields;/*** This bolt ranks incoming objects by their count.* <p/>* It assumes the input tuples to adhere to the following format: (object, object_count, additionalField1,* additionalField2, ..., additionalFieldN).*/ public final class IntermediateRankingsBolt extends AbstractRankerBolt {private static final long serialVersionUID = -1369800530256637409L;private static final Logger LOG = Logger.getLogger(IntermediateRankingsBolt.class);public IntermediateRankingsBolt() {super();}public IntermediateRankingsBolt(int topN) {super(topN);}public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) {super(topN, emitFrequencyInSeconds);}@Overridevoid updateRankingsWithTuple(Tuple tuple) {Rankable rankable = RankableObjectWithFields.from(tuple);super.getRankings().updateWith(rankable);}@OverrideLogger getLogger() {return LOG;} }

?

?

?

?

?

?

?

?storm-starter-master\src\jvm\storm\starter\bolt的PrinterBolt.java

package storm.starter.bolt;import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple;public class PrinterBolt extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {System.out.println(tuple);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer ofd) {}}

?

?

?

?

?

?

?storm-starter-master\src\jvm\storm\starter\bolt的RollingCountBolt.java

package storm.starter.bolt;import backtype.storm.Config; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import org.apache.log4j.Logger; import storm.starter.tools.NthLastModifiedTimeTracker; import storm.starter.tools.SlidingWindowCounter; import storm.starter.util.TupleHelpers;import java.util.HashMap; import java.util.Map; import java.util.Map.Entry;/*** This bolt performs rolling counts of incoming objects, i.e. sliding window based counting.* <p/>* The bolt is configured by two parameters, the length of the sliding window in seconds (which influences the output* data of the bolt, i.e. how it will count objects) and the emit frequency in seconds (which influences how often the* bolt will output the latest window counts). For instance, if the window length is set to an equivalent of five* minutes and the emit frequency to one minute, then the bolt will output the latest five-minute sliding window every* minute.* <p/>* The bolt emits a rolling count tuple per object, consisting of the object itself, its latest rolling count, and the* actual duration of the sliding window. The latter is included in case the expected sliding window length (as* configured by the user) is different from the actual length, e.g. due to high system load. Note that the actual* window length is tracked and calculated for the window, and not individually for each object within a window.* <p/>* Note: During the startup phase you will usually observe that the bolt warns you about the actual sliding window* length being smaller than the expected length. This behavior is expected and is caused by the way the sliding window* counts are initially "loaded up". You can safely ignore this warning during startup (e.g. you will see this warning* during the first ~ five minutes of startup time if the window length is set to five minutes).*/ public class RollingCountBolt extends BaseRichBolt {private static final long serialVersionUID = 5537727428628598519L;private static final Logger LOG = Logger.getLogger(RollingCountBolt.class);private static final int NUM_WINDOW_CHUNKS = 5;private static final int DEFAULT_SLIDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60;private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SLIDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS;private static final String WINDOW_LENGTH_WARNING_TEMPLATE ="Actual window length is %d seconds when it should be %d seconds"+ " (you can safely ignore this warning during the startup phase)";private final SlidingWindowCounter<Object> counter;private final int windowLengthInSeconds;private final int emitFrequencyInSeconds;private OutputCollector collector;private NthLastModifiedTimeTracker lastModifiedTracker;public RollingCountBolt() {this(DEFAULT_SLIDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);}public RollingCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) {this.windowLengthInSeconds = windowLengthInSeconds;this.emitFrequencyInSeconds = emitFrequencyInSeconds;counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds,this.emitFrequencyInSeconds));}private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) {return windowLengthInSeconds / windowUpdateFrequencyInSeconds;}@SuppressWarnings("rawtypes")@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds,this.emitFrequencyInSeconds));}@Overridepublic void execute(Tuple tuple) {if (TupleHelpers.isTickTuple(tuple)) {LOG.debug("Received tick tuple, triggering emit of current window counts");emitCurrentWindowCounts();}else {countObjAndAck(tuple);}}private void emitCurrentWindowCounts() {Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();lastModifiedTracker.markAsModified();if (actualWindowLengthInSeconds != windowLengthInSeconds) {LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));}emit(counts, actualWindowLengthInSeconds);}private void emit(Map<Object, Long> counts, int actualWindowLengthInSeconds) {for (Entry<Object, Long> entry : counts.entrySet()) {Object obj = entry.getKey();Long count = entry.getValue();collector.emit(new Values(obj, count, actualWindowLengthInSeconds));}}private void countObjAndAck(Tuple tuple) {Object obj = tuple.getValue(0);counter.incrementCount(obj);collector.ack(tuple);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds"));}@Overridepublic Map<String, Object> getComponentConfiguration() {Map<String, Object> conf = new HashMap<String, Object>();conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);return conf;} }

?

?

?

?

?

?

?

?

?

?

?

?

?

?

?

?storm-starter-master\src\jvm\storm\starter\bolt的SingleJoinBolt.java

package storm.starter.bolt;import backtype.storm.Config; import backtype.storm.generated.GlobalStreamId; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.utils.TimeCacheMap;import java.util.*;public class SingleJoinBolt extends BaseRichBolt {OutputCollector _collector;Fields _idFields;Fields _outFields;int _numSources;TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> _pending;Map<String, GlobalStreamId> _fieldLocations;public SingleJoinBolt(Fields outFields) {_outFields = outFields;}@Overridepublic void prepare(Map conf, TopologyContext context, OutputCollector collector) {_fieldLocations = new HashMap<String, GlobalStreamId>();_collector = collector;int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();_pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());_numSources = context.getThisSources().size();Set<String> idFields = null;for (GlobalStreamId source : context.getThisSources().keySet()) {Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());Set<String> setFields = new HashSet<String>(fields.toList());if (idFields == null)idFields = setFields;elseidFields.retainAll(setFields);for (String outfield : _outFields) {for (String sourcefield : fields) {if (outfield.equals(sourcefield)) {_fieldLocations.put(outfield, source);}}}}_idFields = new Fields(new ArrayList<String>(idFields));if (_fieldLocations.size() != _outFields.size()) {throw new RuntimeException("Cannot find all outfields among sources");}}@Overridepublic void execute(Tuple tuple) {List<Object> id = tuple.select(_idFields);GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());if (!_pending.containsKey(id)) {_pending.put(id, new HashMap<GlobalStreamId, Tuple>());}Map<GlobalStreamId, Tuple> parts = _pending.get(id);if (parts.containsKey(streamId))throw new RuntimeException("Received same side of single join twice");parts.put(streamId, tuple);if (parts.size() == _numSources) {_pending.remove(id);List<Object> joinResult = new ArrayList<Object>();for (String outField : _outFields) {GlobalStreamId loc = _fieldLocations.get(outField);joinResult.add(parts.get(loc).getValueByField(outField));}_collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);for (Tuple part : parts.values()) {_collector.ack(part);}}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(_outFields);}private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {@Overridepublic void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {for (Tuple tuple : tuples.values()) {_collector.fail(tuple);}}} }

?

?

?

?

?

?

?

?

?

?storm-starter-master\src\jvm\storm\starter\bolt的TotalRankingsBolt.java

?

package storm.starter.bolt;import backtype.storm.tuple.Tuple; import org.apache.log4j.Logger; import storm.starter.tools.Rankings;/*** This bolt merges incoming {@link Rankings}.* <p/>* It can be used to merge intermediate rankings generated by {@link IntermediateRankingsBolt} into a final,* consolidated ranking. To do so, configure this bolt with a globalGrouping on {@link IntermediateRankingsBolt}.*/ public final class TotalRankingsBolt extends AbstractRankerBolt {private static final long serialVersionUID = -8447525895532302198L;private static final Logger LOG = Logger.getLogger(TotalRankingsBolt.class);public TotalRankingsBolt() {super();}public TotalRankingsBolt(int topN) {super(topN);}public TotalRankingsBolt(int topN, int emitFrequencyInSeconds) {super(topN, emitFrequencyInSeconds);}@Overridevoid updateRankingsWithTuple(Tuple tuple) {Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);super.getRankings().updateWith(rankingsToBeMerged);super.getRankings().pruneZeroCounts();}@OverrideLogger getLogger() {return LOG;}}

?

?

?

?

?

?

?

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\tools的NthLastModifiedTimeTracker.java

package storm.starter.tools;import backtype.storm.utils.Time; import org.apache.commons.collections.buffer.CircularFifoBuffer;/*** This class tracks the time-since-last-modify of a "thing" in a rolling fashion.* <p/>* For example, create a 5-slot tracker to track the five most recent time-since-last-modify.* <p/>* You must manually "mark" that the "something" that you want to track -- in terms of modification times -- has just* been modified.*/ public class NthLastModifiedTimeTracker {private static final int MILLIS_IN_SEC = 1000;private final CircularFifoBuffer lastModifiedTimesMillis;public NthLastModifiedTimeTracker(int numTimesToTrack) {if (numTimesToTrack < 1) {throw new IllegalArgumentException("numTimesToTrack must be greater than zero (you requested " + numTimesToTrack + ")");}lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack);initLastModifiedTimesMillis();}private void initLastModifiedTimesMillis() {long nowCached = now();for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) {lastModifiedTimesMillis.add(Long.valueOf(nowCached));}}private long now() {return Time.currentTimeMillis();}public int secondsSinceOldestModification() {long modifiedTimeMillis = ((Long) lastModifiedTimesMillis.get()).longValue();return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC);}public void markAsModified() {updateLastModifiedTime();}private void updateLastModifiedTime() {lastModifiedTimesMillis.add(now());}}

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\tools的Rankable.java

package storm.starter.tools;public interface Rankable extends Comparable<Rankable> {Object getObject();long getCount();/*** Note: We do not defensively copy the object wrapped by the Rankable. It is passed as is.** @return a defensive copy*/Rankable copy(); }

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\tools的RankableObjectWithFields.java

package storm.starter.tools;import backtype.storm.tuple.Tuple; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists;import java.io.Serializable; import java.util.List;/*** This class wraps an objects and its associated count, including any additional data fields.* <p/>* This class can be used, for instance, to track the number of occurrences of an object in a Storm topology.*/ public class RankableObjectWithFields implements Rankable, Serializable {private static final long serialVersionUID = -9102878650001058090L;private static final String toStringSeparator = "|";private final Object obj;private final long count;private final ImmutableList<Object> fields;public RankableObjectWithFields(Object obj, long count, Object... otherFields) {if (obj == null) {throw new IllegalArgumentException("The object must not be null");}if (count < 0) {throw new IllegalArgumentException("The count must be >= 0");}this.obj = obj;this.count = count;fields = ImmutableList.copyOf(otherFields);}/*** Construct a new instance based on the provided {@link Tuple}.* <p/>* This method expects the object to be ranked in the first field (index 0) of the provided tuple, and the number of* occurrences of the object (its count) in the second field (index 1). Any further fields in the tuple will be* extracted and tracked, too. These fields can be accessed via {@link RankableObjectWithFields#getFields()}.** @param tuple** @return new instance based on the provided tuple*/public static RankableObjectWithFields from(Tuple tuple) {List<Object> otherFields = Lists.newArrayList(tuple.getValues());Object obj = otherFields.remove(0);Long count = (Long) otherFields.remove(0);return new RankableObjectWithFields(obj, count, otherFields.toArray());}public Object getObject() {return obj;}public long getCount() {return count;}/*** @return an immutable list of any additional data fields of the object (may be empty but will never be null)*/public List<Object> getFields() {return fields;}@Overridepublic int compareTo(Rankable other) {long delta = this.getCount() - other.getCount();if (delta > 0) {return 1;}else if (delta < 0) {return -1;}else {return 0;}}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (!(o instanceof RankableObjectWithFields)) {return false;}RankableObjectWithFields other = (RankableObjectWithFields) o;return obj.equals(other.obj) && count == other.count;}@Overridepublic int hashCode() {int result = 17;int countHash = (int) (count ^ (count >>> 32));result = 31 * result + countHash;result = 31 * result + obj.hashCode();return result;}public String toString() {StringBuffer buf = new StringBuffer();buf.append("[");buf.append(obj);buf.append(toStringSeparator);buf.append(count);for (Object field : fields) {buf.append(toStringSeparator);buf.append(field);}buf.append("]");return buf.toString();}/*** Note: We do not defensively copy the wrapped object and any accompanying fields. We do guarantee, however,* do return a defensive (shallow) copy of the List object that is wrapping any accompanying fields.** @return*/@Overridepublic Rankable copy() {List<Object> shallowCopyOfFields = ImmutableList.copyOf(getFields());return new RankableObjectWithFields(getObject(), getCount(), shallowCopyOfFields);}}

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\tools的Rankings.java

package storm.starter.tools;import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists;import java.io.Serializable; import java.util.Collections; import java.util.List;public class Rankings implements Serializable {private static final long serialVersionUID = -1549827195410578903L;private static final int DEFAULT_COUNT = 10;private final int maxSize;private final List<Rankable> rankedItems = Lists.newArrayList();public Rankings() {this(DEFAULT_COUNT);}public Rankings(int topN) {if (topN < 1) {throw new IllegalArgumentException("topN must be >= 1");}maxSize = topN;}/*** Copy constructor.* @param other*/public Rankings(Rankings other) {this(other.maxSize());updateWith(other);}/*** @return the maximum possible number (size) of ranked objects this instance can hold*/public int maxSize() {return maxSize;}/*** @return the number (size) of ranked objects this instance is currently holding*/public int size() {return rankedItems.size();}/*** The returned defensive copy is only "somewhat" defensive. We do, for instance, return a defensive copy of the* enclosing List instance, and we do try to defensively copy any contained Rankable objects, too. However, the* contract of {@link storm.starter.tools.Rankable#copy()} does not guarantee that any Object's embedded within* a Rankable will be defensively copied, too.** @return a somewhat defensive copy of ranked items*/public List<Rankable> getRankings() {List<Rankable> copy = Lists.newLinkedList();for (Rankable r: rankedItems) {copy.add(r.copy());}return ImmutableList.copyOf(copy);}public void updateWith(Rankings other) {for (Rankable r : other.getRankings()) {updateWith(r);}}public void updateWith(Rankable r) {synchronized(rankedItems) {addOrReplace(r);rerank();shrinkRankingsIfNeeded();}}private void addOrReplace(Rankable r) {Integer rank = findRankOf(r);if (rank != null) {rankedItems.set(rank, r);}else {rankedItems.add(r);}}private Integer findRankOf(Rankable r) {Object tag = r.getObject();for (int rank = 0; rank < rankedItems.size(); rank++) {Object cur = rankedItems.get(rank).getObject();if (cur.equals(tag)) {return rank;}}return null;}private void rerank() {Collections.sort(rankedItems);Collections.reverse(rankedItems);}private void shrinkRankingsIfNeeded() {if (rankedItems.size() > maxSize) {rankedItems.remove(maxSize);}}/*** Removes ranking entries that have a count of zero.*/public void pruneZeroCounts() {int i = 0;while (i < rankedItems.size()) {if (rankedItems.get(i).getCount() == 0) {rankedItems.remove(i);}else {i++;}}}public String toString() {return rankedItems.toString();}/*** Creates a (defensive) copy of itself.*/public Rankings copy() {return new Rankings(this);} }

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\tools的SlidingWindowCounter.java

package storm.starter.tools;import java.io.Serializable; import java.util.Map;/*** This class counts objects in a sliding window fashion.* <p/>* It is designed 1) to give multiple "producer" threads write access to the counter, i.e. being able to increment* counts of objects, and 2) to give a single "consumer" thread (e.g. {@link PeriodicSlidingWindowCounter}) read access* to the counter. Whenever the consumer thread performs a read operation, this class will advance the head slot of the* sliding window counter. This means that the consumer thread indirectly controls where writes of the producer threads* will go to. Also, by itself this class will not advance the head slot.* <p/>* A note for analyzing data based on a sliding window count: During the initial <code>windowLengthInSlots</code>* iterations, this sliding window counter will always return object counts that are equal or greater than in the* previous iteration. This is the effect of the counter "loading up" at the very start of its existence. Conceptually,* this is the desired behavior.* <p/>* To give an example, using a counter with 5 slots which for the sake of this example represent 1 minute of time each:* <p/>* <pre>* {@code* Sliding window counts of an object X over time** Minute (timeline):* 1 2 3 4 5 6 7 8** Observed counts per minute:* 1 1 1 1 0 0 0 0** Counts returned by counter:* 1 2 3 4 4 3 2 1* }* </pre>* <p/>* As you can see in this example, for the first <code>windowLengthInSlots</code> (here: the first five minutes) the* counter will always return counts equal or greater than in the previous iteration (1, 2, 3, 4, 4). This initial load* effect needs to be accounted for whenever you want to perform analyses such as trending topics; otherwise your* analysis algorithm might falsely identify the object to be trending as the counter seems to observe continuously* increasing counts. Also, note that during the initial load phase <em>every object</em> will exhibit increasing* counts.* <p/>* On a high-level, the counter exhibits the following behavior: If you asked the example counter after two minutes,* "how often did you count the object during the past five minutes?", then it should reply "I have counted it 2 times* in the past five minutes", implying that it can only account for the last two of those five minutes because the* counter was not running before that time.** @param <T> The type of those objects we want to count.*/ public final class SlidingWindowCounter<T> implements Serializable {private static final long serialVersionUID = -2645063988768785810L;private SlotBasedCounter<T> objCounter;private int headSlot;private int tailSlot;private int windowLengthInSlots;public SlidingWindowCounter(int windowLengthInSlots) {if (windowLengthInSlots < 2) {throw new IllegalArgumentException("Window length in slots must be at least two (you requested " + windowLengthInSlots + ")");}this.windowLengthInSlots = windowLengthInSlots;this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);this.headSlot = 0;this.tailSlot = slotAfter(headSlot);}public void incrementCount(T obj) {objCounter.incrementCount(obj, headSlot);}/*** Return the current (total) counts of all tracked objects, then advance the window.* <p/>* Whenever this method is called, we consider the counts of the current sliding window to be available to and* successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent* objects within the next "chunk" of the sliding window.** @return The current (total) counts of all tracked objects.*/public Map<T, Long> getCountsThenAdvanceWindow() {Map<T, Long> counts = objCounter.getCounts();objCounter.wipeZeros();objCounter.wipeSlot(tailSlot);advanceHead();return counts;}private void advanceHead() {headSlot = tailSlot;tailSlot = slotAfter(tailSlot);}private int slotAfter(int slot) {return (slot + 1) % windowLengthInSlots;}}

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\tools的SlotBasedCounter.java

package storm.starter.tools;import java.io.Serializable; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set;/*** This class provides per-slot counts of the occurrences of objects.* <p/>* It can be used, for instance, as a building block for implementing sliding window counting of objects.** @param <T> The type of those objects we want to count.*/ public final class SlotBasedCounter<T> implements Serializable {private static final long serialVersionUID = 4858185737378394432L;private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();private final int numSlots;public SlotBasedCounter(int numSlots) {if (numSlots <= 0) {throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots + ")");}this.numSlots = numSlots;}public void incrementCount(T obj, int slot) {long[] counts = objToCounts.get(obj);if (counts == null) {counts = new long[this.numSlots];objToCounts.put(obj, counts);}counts[slot]++;}public long getCount(T obj, int slot) {long[] counts = objToCounts.get(obj);if (counts == null) {return 0;}else {return counts[slot];}}public Map<T, Long> getCounts() {Map<T, Long> result = new HashMap<T, Long>();for (T obj : objToCounts.keySet()) {result.put(obj, computeTotalCount(obj));}return result;}private long computeTotalCount(T obj) {long[] curr = objToCounts.get(obj);long total = 0;for (long l : curr) {total += l;}return total;}/*** Reset the slot count of any tracked objects to zero for the given slot.** @param slot*/public void wipeSlot(int slot) {for (T obj : objToCounts.keySet()) {resetSlotCountToZero(obj, slot);}}private void resetSlotCountToZero(T obj, int slot) {long[] counts = objToCounts.get(obj);counts[slot] = 0;}private boolean shouldBeRemovedFromCounter(T obj) {return computeTotalCount(obj) == 0;}/*** Remove any object from the counter whose total count is zero (to free up memory).*/public void wipeZeros() {Set<T> objToBeRemoved = new HashSet<T>();for (T obj : objToCounts.keySet()) {if (shouldBeRemovedFromCounter(obj)) {objToBeRemoved.add(obj);}}for (T obj : objToBeRemoved) {objToCounts.remove(obj);}}}

?

?

?

?

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\trident的TridentReach.java

package storm.starter.trident;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.generated.StormTopology; import backtype.storm.task.IMetricsContext; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.TridentState; import storm.trident.TridentTopology; import storm.trident.operation.BaseFunction; import storm.trident.operation.CombinerAggregator; import storm.trident.operation.TridentCollector; import storm.trident.operation.builtin.MapGet; import storm.trident.operation.builtin.Sum; import storm.trident.state.ReadOnlyState; import storm.trident.state.State; import storm.trident.state.StateFactory; import storm.trident.state.map.ReadOnlyMapState; import storm.trident.tuple.TridentTuple;import java.util.*;public class TridentReach {public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));}};public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));put("tim", Arrays.asList("alex"));put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));put("adam", Arrays.asList("david", "carissa"));put("mike", Arrays.asList("john", "bob"));put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));}};public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> {public static class Factory implements StateFactory {Map _map;public Factory(Map map) {_map = map;}@Overridepublic State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {return new StaticSingleKeyMapState(_map);}}Map _map;public StaticSingleKeyMapState(Map map) {_map = map;}@Overridepublic List<Object> multiGet(List<List<Object>> keys) {List<Object> ret = new ArrayList();for (List<Object> key : keys) {Object singleKey = key.get(0);ret.add(_map.get(singleKey));}return ret;}}public static class One implements CombinerAggregator<Integer> {@Overridepublic Integer init(TridentTuple tuple) {return 1;}@Overridepublic Integer combine(Integer val1, Integer val2) {return 1;}@Overridepublic Integer zero() {return 1;}}public static class ExpandList extends BaseFunction {@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {List l = (List) tuple.getValue(0);if (l != null) {for (Object o : l) {collector.emit(new Values(o));}}}}public static StormTopology buildTopology(LocalDRPC drpc) {TridentTopology topology = new TridentTopology();TridentState urlToTweeters = topology.newStaticState(new StaticSingleKeyMapState.Factory(TWEETERS_DB));TridentState tweetersToFollowers = topology.newStaticState(new StaticSingleKeyMapState.Factory(FOLLOWERS_DB));topology.newDRPCStream("reach", drpc).stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")).shuffle().stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")).each(new Fields("followers"),new ExpandList(), new Fields("follower")).groupBy(new Fields("follower")).aggregate(new One(), new Fields("one")).aggregate(new Fields("one"), new Sum(), new Fields("reach"));return topology.build();}public static void main(String[] args) throws Exception {LocalDRPC drpc = new LocalDRPC();Config conf = new Config();LocalCluster cluster = new LocalCluster();cluster.submitTopology("reach", conf, buildTopology(drpc));Thread.sleep(2000);System.out.println("REACH: " + drpc.execute("reach", "aaa"));System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1"));System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5"));cluster.shutdown();drpc.shutdown();} }

?

?

?

?

?

?

?

?storm-starter-master\src\jvm\storm\starter\trident的TridentWordCount.java

package storm.starter.trident;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.TridentState; import storm.trident.TridentTopology; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.operation.builtin.Count; import storm.trident.operation.builtin.FilterNull; import storm.trident.operation.builtin.MapGet; import storm.trident.operation.builtin.Sum; import storm.trident.testing.FixedBatchSpout; import storm.trident.testing.MemoryMapState; import storm.trident.tuple.TridentTuple;public class TridentWordCount {public static class Split extends BaseFunction {@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {String sentence = tuple.getString(0);for (String word : sentence.split(" ")) {collector.emit(new Values(word));}}}public static StormTopology buildTopology(LocalDRPC drpc) {FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),new Values("how many apples can you eat"), new Values("to be or not to be the person"));spout.setCycle(true);TridentTopology topology = new TridentTopology();TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),new Count(), new Fields("count")).parallelismHint(16);topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"),new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));return topology.build();}public static void main(String[] args) throws Exception {Config conf = new Config();conf.setMaxSpoutPending(20);if (args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("wordCounter", conf, buildTopology(drpc));for (int i = 0; i < 100; i++) {System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));Thread.sleep(1000);}}else {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, buildTopology(null));}} }

?

?

?

?

?

?

?

?

?

?

?

?

?

storm-starter-master\src\jvm\storm\starter\util的StormRunner.java

package storm.starter.util;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology;public final class StormRunner {private static final int MILLIS_IN_SEC = 1000;private StormRunner() {}public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds)throws InterruptedException {LocalCluster cluster = new LocalCluster();cluster.submitTopology(topologyName, conf, topology);Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);cluster.killTopology(topologyName);cluster.shutdown();} }

?

?

?

?

?

?

?storm-starter-master\src\jvm\storm\starter\util的TupleHelpers.java

package storm.starter.util;import backtype.storm.Constants; import backtype.storm.tuple.Tuple;public final class TupleHelpers {private TupleHelpers() {}public static boolean isTickTuple(Tuple tuple) {return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);}}

藏經閣技術資料分享群二維碼

轉載于:https://www.cnblogs.com/wangsongbai/p/9122725.html

總結

以上是生活随笔為你收集整理的Storm概念学习系列之storm-starter项目(完整版)(博主推荐)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

亚洲欧洲中文日韩av乱码 | 久久综合给合久久狠狠狠97色 | 久久99精品久久久久婷婷 | 亚洲国产成人a精品不卡在线 | 曰韩无码二三区中文字幕 | 日韩人妻无码一区二区三区久久99 | 免费无码午夜福利片69 | 国产美女极度色诱视频www | 国产情侣作爱视频免费观看 | 福利一区二区三区视频在线观看 | 人人妻人人澡人人爽欧美精品 | 久青草影院在线观看国产 | 亚洲区欧美区综合区自拍区 | 国产精品久久久av久久久 | 日日天干夜夜狠狠爱 | 国产sm调教视频在线观看 | 夜夜夜高潮夜夜爽夜夜爰爰 | 色婷婷欧美在线播放内射 | 国产免费久久久久久无码 | a在线亚洲男人的天堂 | 国产综合久久久久鬼色 | 国产成人精品无码播放 | 日本一区二区三区免费高清 | 亚洲中文字幕无码中字 | 国产亚洲人成在线播放 | 精品国产一区二区三区四区在线看 | 一本久久伊人热热精品中文字幕 | 鲁大师影院在线观看 | 精品少妇爆乳无码av无码专区 | 黑人粗大猛烈进出高潮视频 | 啦啦啦www在线观看免费视频 | 亚洲午夜久久久影院 | 三上悠亚人妻中文字幕在线 | 国产内射爽爽大片视频社区在线 | 捆绑白丝粉色jk震动捧喷白浆 | 亚洲第一网站男人都懂 | 国产精品成人av在线观看 | 色情久久久av熟女人妻网站 | 亚洲精品久久久久久久久久久 | 国产精品久久精品三级 | 人人妻人人澡人人爽欧美一区 | 成人一区二区免费视频 | 人妻尝试又大又粗久久 | 俺去俺来也在线www色官网 | 老熟妇仑乱视频一区二区 | 国产在线无码精品电影网 | 免费无码一区二区三区蜜桃大 | 曰本女人与公拘交酡免费视频 | 亚洲精品欧美二区三区中文字幕 | 少妇无码av无码专区在线观看 | 香蕉久久久久久av成人 | 久久久久成人片免费观看蜜芽 | 捆绑白丝粉色jk震动捧喷白浆 | 18禁黄网站男男禁片免费观看 | 少妇久久久久久人妻无码 | 一二三四社区在线中文视频 | 亚洲中文字幕av在天堂 | 亚洲 日韩 欧美 成人 在线观看 | 色情久久久av熟女人妻网站 | 国产人妻人伦精品1国产丝袜 | 国精品人妻无码一区二区三区蜜柚 | 国产亚洲视频中文字幕97精品 | 亲嘴扒胸摸屁股激烈网站 | 老子影院午夜伦不卡 | 黑人巨大精品欧美黑寡妇 | 国产精品.xx视频.xxtv | 成人无码精品一区二区三区 | 亚洲中文无码av永久不收费 | 免费男性肉肉影院 | 妺妺窝人体色www婷婷 | 无码人妻精品一区二区三区不卡 | 国产内射爽爽大片视频社区在线 | 天下第一社区视频www日本 | 黑人粗大猛烈进出高潮视频 | 天堂а√在线中文在线 | 中文字幕无码av激情不卡 | 丰满少妇高潮惨叫视频 | 精品夜夜澡人妻无码av蜜桃 | 亚洲 另类 在线 欧美 制服 | 人妻尝试又大又粗久久 | 一本色道久久综合亚洲精品不卡 | 色婷婷久久一区二区三区麻豆 | 黑人大群体交免费视频 | 久久成人a毛片免费观看网站 | 丝袜美腿亚洲一区二区 | 国产无套内射久久久国产 | 久久无码中文字幕免费影院蜜桃 | 国产肉丝袜在线观看 | 国产午夜手机精彩视频 | 澳门永久av免费网站 | 免费看男女做好爽好硬视频 | 在线精品亚洲一区二区 | 国产偷抇久久精品a片69 | 国产午夜精品一区二区三区嫩草 | 少妇无码一区二区二三区 | 亚洲精品一区二区三区四区五区 | 成人欧美一区二区三区 | 亚洲色欲色欲天天天www | 成人亚洲精品久久久久 | 欧美成人午夜精品久久久 | 少妇无码一区二区二三区 | 国产明星裸体无码xxxx视频 | 婷婷色婷婷开心五月四房播播 | 午夜成人1000部免费视频 | 人妻尝试又大又粗久久 | 亚洲爆乳无码专区 | 少妇性俱乐部纵欲狂欢电影 | 蜜臀av在线播放 久久综合激激的五月天 | 欧美亚洲日韩国产人成在线播放 | 国产香蕉尹人视频在线 | 一本久久a久久精品亚洲 | 久久视频在线观看精品 | 又大又黄又粗又爽的免费视频 | 丰满人妻被黑人猛烈进入 | 成人女人看片免费视频放人 | 四十如虎的丰满熟妇啪啪 | 国产精品99爱免费视频 | 亚欧洲精品在线视频免费观看 | 欧美大屁股xxxxhd黑色 | 大乳丰满人妻中文字幕日本 | 玩弄人妻少妇500系列视频 | 欧洲精品码一区二区三区免费看 | 欧美人与牲动交xxxx | 亚洲熟悉妇女xxx妇女av | 国产极品美女高潮无套在线观看 | 亚洲欧洲日本综合aⅴ在线 | 精品国产福利一区二区 | 亚洲а∨天堂久久精品2021 | 国模大胆一区二区三区 | 欧美 日韩 人妻 高清 中文 | 国产精品久久久午夜夜伦鲁鲁 | 99久久人妻精品免费二区 | 一个人看的视频www在线 | 性做久久久久久久免费看 | 免费国产黄网站在线观看 | 日韩精品一区二区av在线 | 欧美 日韩 人妻 高清 中文 | 国产福利视频一区二区 | 久久久久久av无码免费看大片 | 亚洲精品国偷拍自产在线观看蜜桃 | 亚洲国产精品一区二区美利坚 | 综合网日日天干夜夜久久 | 国产免费无码一区二区视频 | 美女极度色诱视频国产 | 日韩人妻少妇一区二区三区 | 久久精品视频在线看15 | 四虎影视成人永久免费观看视频 | 久精品国产欧美亚洲色aⅴ大片 | 国产麻豆精品精东影业av网站 | 亚洲成a人片在线观看日本 | 亚洲人成网站在线播放942 | 久久久精品成人免费观看 | 无码纯肉视频在线观看 | 国精产品一区二区三区 | 日本护士xxxxhd少妇 | 久久午夜无码鲁丝片秋霞 | 日日鲁鲁鲁夜夜爽爽狠狠 | 欧美老熟妇乱xxxxx | 中文无码伦av中文字幕 | 少妇的肉体aa片免费 | 国产日产欧产精品精品app | 性色欲网站人妻丰满中文久久不卡 | 丰腴饱满的极品熟妇 | 4hu四虎永久在线观看 | 日本一卡2卡3卡4卡无卡免费网站 国产一区二区三区影院 | 97夜夜澡人人爽人人喊中国片 | 国产精品人人妻人人爽 | 国产精品久久福利网站 | 国产成人亚洲综合无码 | 中国大陆精品视频xxxx | 精品国产福利一区二区 | 丰满肥臀大屁股熟妇激情视频 | 国产97人人超碰caoprom | 无人区乱码一区二区三区 | 久久久久99精品国产片 | 国产精品内射视频免费 | 精品水蜜桃久久久久久久 | 99精品久久毛片a片 | 人妻夜夜爽天天爽三区 | 精品国产精品久久一区免费式 | 97久久国产亚洲精品超碰热 | 久久精品国产精品国产精品污 | 呦交小u女精品视频 | 国产精品.xx视频.xxtv | 97无码免费人妻超级碰碰夜夜 | 国产精品久免费的黄网站 | 欧美日本日韩 | 色欲av亚洲一区无码少妇 | 久久成人a毛片免费观看网站 | 国产乱人伦av在线无码 | 中文字幕人妻无码一区二区三区 | 精品偷自拍另类在线观看 | 成人免费无码大片a毛片 | www国产亚洲精品久久久日本 | 亚洲男人av香蕉爽爽爽爽 | 一个人看的视频www在线 | 强辱丰满人妻hd中文字幕 | 日日碰狠狠丁香久燥 | 亚洲乱亚洲乱妇50p | 亚洲精品国产精品乱码视色 | 无套内谢老熟女 | 天堂无码人妻精品一区二区三区 | av人摸人人人澡人人超碰下载 | 人妻尝试又大又粗久久 | 久久久久免费精品国产 | 无人区乱码一区二区三区 | 中文字幕无码免费久久9一区9 | 曰韩无码二三区中文字幕 | 性色av无码免费一区二区三区 | 国产 浪潮av性色四虎 | 无码一区二区三区在线 | 久久人人爽人人人人片 | 国产精品亚洲一区二区三区喷水 | 久久人人97超碰a片精品 | 夜先锋av资源网站 | 一二三四在线观看免费视频 | 精品乱子伦一区二区三区 | 国产午夜福利100集发布 | 性做久久久久久久免费看 | 人妻有码中文字幕在线 | 正在播放老肥熟妇露脸 | 黑人大群体交免费视频 | 日韩av无码一区二区三区 | 狠狠亚洲超碰狼人久久 | 国产亚洲美女精品久久久2020 | 台湾无码一区二区 | 午夜丰满少妇性开放视频 | 人人澡人摸人人添 | 人人澡人人妻人人爽人人蜜桃 | 亚洲自偷自拍另类第1页 | 丰满少妇熟乱xxxxx视频 | 狂野欧美性猛交免费视频 | 国产激情无码一区二区 | 日韩亚洲欧美中文高清在线 | 少妇无套内谢久久久久 | 无码播放一区二区三区 | 日韩在线不卡免费视频一区 | 一本大道伊人av久久综合 | 中文字幕人妻丝袜二区 | 99久久精品日本一区二区免费 | 久久99精品久久久久婷婷 | 国产三级久久久精品麻豆三级 | 亚洲欧美综合区丁香五月小说 | 色偷偷av老熟女 久久精品人妻少妇一区二区三区 | 亚洲日韩av一区二区三区中文 | 扒开双腿疯狂进出爽爽爽视频 | 国产精品国产自线拍免费软件 | 免费播放一区二区三区 | 久久www免费人成人片 | 国産精品久久久久久久 | 国产香蕉尹人视频在线 | 亚洲国产精品一区二区第一页 | 亚洲国产综合无码一区 | 国产精品内射视频免费 | 精品夜夜澡人妻无码av蜜桃 | 日韩人妻少妇一区二区三区 | 亚洲国产综合无码一区 | 无码av岛国片在线播放 | 免费网站看v片在线18禁无码 | 国产成人无码一二三区视频 | 亚洲理论电影在线观看 | 无码国产色欲xxxxx视频 | 国产精品无套呻吟在线 | 亚洲人成网站在线播放942 | 红桃av一区二区三区在线无码av | 国产精品国产自线拍免费软件 | 熟妇女人妻丰满少妇中文字幕 | 国产熟女一区二区三区四区五区 | 久久久久久久人妻无码中文字幕爆 | 国产精品美女久久久久av爽李琼 | 天堂久久天堂av色综合 | 奇米影视7777久久精品人人爽 | 久热国产vs视频在线观看 | 国产免费观看黄av片 | 国产免费观看黄av片 | 亚洲精品久久久久avwww潮水 | 国产无套粉嫩白浆在线 | 成人女人看片免费视频放人 | 天干天干啦夜天干天2017 | 亚洲国产欧美国产综合一区 | 日韩精品成人一区二区三区 | 亚洲精品一区二区三区在线 | 少妇被黑人到高潮喷出白浆 | 鲁鲁鲁爽爽爽在线视频观看 | 日本精品少妇一区二区三区 | 亚洲精品成a人在线观看 | 日韩av激情在线观看 | 成人欧美一区二区三区 | 婷婷六月久久综合丁香 | 久久久精品成人免费观看 | 亚洲乱码日产精品bd | 牲欲强的熟妇农村老妇女视频 | 亚洲国产精品一区二区美利坚 | 国产精品嫩草久久久久 | 午夜嘿嘿嘿影院 | 性欧美牲交在线视频 | 亚洲色无码一区二区三区 | 内射爽无广熟女亚洲 | 精品国产麻豆免费人成网站 | 国产精品人人爽人人做我的可爱 | 欧美兽交xxxx×视频 | 黑森林福利视频导航 | 在线天堂新版最新版在线8 | 老子影院午夜伦不卡 | 捆绑白丝粉色jk震动捧喷白浆 | 精品久久久久久亚洲精品 | 国产综合在线观看 | 精品人妻人人做人人爽夜夜爽 | 亚洲a无码综合a国产av中文 | 岛国片人妻三上悠亚 | 欧美人与禽zoz0性伦交 | 久久99精品久久久久婷婷 | 精品无码成人片一区二区98 | 精品久久8x国产免费观看 | 精品国产精品久久一区免费式 | 亚洲阿v天堂在线 | 中国女人内谢69xxxxxa片 | 最新国产麻豆aⅴ精品无码 | 强伦人妻一区二区三区视频18 | 一区二区传媒有限公司 | 亚洲日韩一区二区 | 国产口爆吞精在线视频 | 欧美三级a做爰在线观看 | 久久精品国产一区二区三区肥胖 | 国产麻豆精品一区二区三区v视界 | 国产精品高潮呻吟av久久4虎 | 99国产欧美久久久精品 | 2020久久香蕉国产线看观看 | 欧美精品无码一区二区三区 | 国产av一区二区三区最新精品 | 未满小14洗澡无码视频网站 | 欧美黑人乱大交 | 无码中文字幕色专区 | 亚洲精品综合一区二区三区在线 | 国产成人综合色在线观看网站 | 高潮毛片无遮挡高清免费视频 | 亚洲七七久久桃花影院 | 国产精品永久免费视频 | 国产做国产爱免费视频 | 国产精品欧美成人 | 亚洲日韩中文字幕在线播放 | 夫妻免费无码v看片 | 亚洲熟妇色xxxxx亚洲 | 欧美日韩色另类综合 | 成人无码视频免费播放 | 久久久久成人精品免费播放动漫 | 亚洲 另类 在线 欧美 制服 | 亚洲成av人片天堂网无码】 | 久久国内精品自在自线 | 无码播放一区二区三区 | 性色欲情网站iwww九文堂 | 欧美日韩久久久精品a片 | 永久免费精品精品永久-夜色 | 国产真实夫妇视频 | 日韩 欧美 动漫 国产 制服 | 男女下面进入的视频免费午夜 | 国产成人无码av一区二区 | 18禁止看的免费污网站 | 伊人久久婷婷五月综合97色 | 波多野结衣 黑人 | 久久国产36精品色熟妇 | 亚洲成在人网站无码天堂 | 婷婷六月久久综合丁香 | 亚洲gv猛男gv无码男同 | 人人妻人人澡人人爽人人精品 | 国产两女互慰高潮视频在线观看 | 呦交小u女精品视频 | 国产电影无码午夜在线播放 | 亚洲乱码中文字幕在线 | 狂野欧美性猛xxxx乱大交 | 亚洲国产精品无码一区二区三区 | 青青青手机频在线观看 | 国产做国产爱免费视频 | 亚洲一区二区观看播放 | 强开小婷嫩苞又嫩又紧视频 | 久久午夜无码鲁丝片午夜精品 | 国产乱子伦视频在线播放 | 熟女少妇在线视频播放 | 麻豆av传媒蜜桃天美传媒 | 久久久国产精品无码免费专区 | 国产精品人妻一区二区三区四 | 国产suv精品一区二区五 | 欧美阿v高清资源不卡在线播放 | 国产乡下妇女做爰 | 久久熟妇人妻午夜寂寞影院 | 88国产精品欧美一区二区三区 | 国产激情综合五月久久 | 亚洲欧美国产精品专区久久 | 蜜桃无码一区二区三区 | 亚洲爆乳精品无码一区二区三区 | 少妇被粗大的猛进出69影院 | 丝袜 中出 制服 人妻 美腿 | 中文字幕无线码免费人妻 | 狠狠躁日日躁夜夜躁2020 | 在线亚洲高清揄拍自拍一品区 | 无码精品人妻一区二区三区av | 成 人 免费观看网站 | 日韩亚洲欧美精品综合 | 精品无码成人片一区二区98 | 亚洲成a人一区二区三区 | 日韩人妻无码一区二区三区久久99 | 巨爆乳无码视频在线观看 | 超碰97人人射妻 | 色情久久久av熟女人妻网站 | 日日摸日日碰夜夜爽av | 日韩人妻系列无码专区 | 精品乱码久久久久久久 | 国产熟女一区二区三区四区五区 | 两性色午夜视频免费播放 | www国产亚洲精品久久久日本 | 亚洲色大成网站www国产 | 一区二区三区乱码在线 | 欧洲 | 午夜不卡av免费 一本久久a久久精品vr综合 | 欧美真人作爱免费视频 | 蜜臀av在线观看 在线欧美精品一区二区三区 | 欧洲精品码一区二区三区免费看 | 久久精品国产精品国产精品污 | 丰满人妻被黑人猛烈进入 | 亚洲国产av精品一区二区蜜芽 | 色情久久久av熟女人妻网站 | 久久视频在线观看精品 | 夜夜高潮次次欢爽av女 | 狠狠综合久久久久综合网 | 日韩精品一区二区av在线 | 成人一在线视频日韩国产 | 国产精品国产三级国产专播 | 捆绑白丝粉色jk震动捧喷白浆 | 97无码免费人妻超级碰碰夜夜 | 亚洲a无码综合a国产av中文 | 国产成人综合在线女婷五月99播放 | 撕开奶罩揉吮奶头视频 | 国产精品99久久精品爆乳 | 亚洲精品国产精品乱码视色 | 国産精品久久久久久久 | yw尤物av无码国产在线观看 | 天堂亚洲免费视频 | 人妻体内射精一区二区三四 | 乱人伦人妻中文字幕无码久久网 | 国产九九九九九九九a片 | 性史性农村dvd毛片 | 中文字幕人妻无码一夲道 | 蜜桃av抽搐高潮一区二区 | 亚洲熟妇自偷自拍另类 | 欧美亚洲日韩国产人成在线播放 | 久久久久成人片免费观看蜜芽 | 呦交小u女精品视频 | 午夜福利不卡在线视频 | 国产精品va在线播放 | 国产av久久久久精东av | 性做久久久久久久免费看 | 中文字幕无码av激情不卡 | 欧美成人家庭影院 | 任你躁国产自任一区二区三区 | 欧美日韩在线亚洲综合国产人 | 丰满人妻被黑人猛烈进入 | 精品一区二区三区无码免费视频 | 亚洲欧美日韩国产精品一区二区 | 99久久人妻精品免费二区 | 久久久久99精品成人片 | 欧洲熟妇色 欧美 | 国产在线精品一区二区三区直播 | 人人妻人人澡人人爽欧美精品 | 国产又爽又黄又刺激的视频 | 国内丰满熟女出轨videos | 一本大道伊人av久久综合 | 国产免费久久精品国产传媒 | 亚洲中文字幕无码中文字在线 | a在线观看免费网站大全 | 久久综合久久自在自线精品自 | 人人爽人人澡人人人妻 | 国产亚洲视频中文字幕97精品 | 亚洲 高清 成人 动漫 | 久久综合激激的五月天 | 荫蒂被男人添的好舒服爽免费视频 | 亚洲精品一区国产 | 国产精品无码mv在线观看 | 国产精品对白交换视频 | 性做久久久久久久免费看 | 97人妻精品一区二区三区 | 精品国产麻豆免费人成网站 | 亚洲第一网站男人都懂 | 亚洲 高清 成人 动漫 | 影音先锋中文字幕无码 | 中文精品久久久久人妻不卡 | 久久久av男人的天堂 | 夜夜躁日日躁狠狠久久av | 成人一在线视频日韩国产 | 免费观看又污又黄的网站 | 正在播放老肥熟妇露脸 | 激情综合激情五月俺也去 | 久久久久久九九精品久 | 欧洲熟妇精品视频 | 欧美黑人性暴力猛交喷水 | 亚洲а∨天堂久久精品2021 | 欧美野外疯狂做受xxxx高潮 | 综合网日日天干夜夜久久 | 精品久久久无码中文字幕 | 亚洲人成人无码网www国产 | 国产精品办公室沙发 | 欧美兽交xxxx×视频 | 国产成人精品视频ⅴa片软件竹菊 | 成年美女黄网站色大免费视频 | 亚洲爆乳精品无码一区二区三区 | 黑人巨大精品欧美黑寡妇 | 日本饥渴人妻欲求不满 | 国产精品久久久久久无码 | 熟女少妇在线视频播放 | 99久久精品无码一区二区毛片 | 国产精品永久免费视频 | 欧美丰满熟妇xxxx性ppx人交 | 少妇无码吹潮 | 色综合视频一区二区三区 | 久久无码中文字幕免费影院蜜桃 | 国产区女主播在线观看 | 国产婷婷色一区二区三区在线 | 中文字幕日韩精品一区二区三区 | 内射老妇bbwx0c0ck | 精品无码成人片一区二区98 | 亚洲午夜福利在线观看 | 亚洲精品久久久久中文第一幕 | 曰韩无码二三区中文字幕 | 日韩视频 中文字幕 视频一区 | 狠狠色欧美亚洲狠狠色www | 妺妺窝人体色www在线小说 | 性欧美疯狂xxxxbbbb | 好爽又高潮了毛片免费下载 | 国产免费久久精品国产传媒 | 无码人妻丰满熟妇区毛片18 | 少妇人妻偷人精品无码视频 | 亚洲欧美综合区丁香五月小说 | 无码一区二区三区在线 | 国产极品美女高潮无套在线观看 | 99视频精品全部免费免费观看 | 波多野结衣乳巨码无在线观看 | 亚洲一区二区三区在线观看网站 | 人妻人人添人妻人人爱 | 亚洲人成影院在线无码按摩店 | 九九久久精品国产免费看小说 | а天堂中文在线官网 | 99精品视频在线观看免费 | 亚洲人亚洲人成电影网站色 | 中文字幕色婷婷在线视频 | 久久久国产一区二区三区 | 水蜜桃av无码 | 欧美国产日产一区二区 | 99麻豆久久久国产精品免费 | 免费人成在线视频无码 | 亚洲欧美中文字幕5发布 | 装睡被陌生人摸出水好爽 | 欧美第一黄网免费网站 | 麻豆av传媒蜜桃天美传媒 | 国产suv精品一区二区五 | 国产亚洲精品久久久久久国模美 | 99er热精品视频 | 十八禁视频网站在线观看 | 亚洲精品一区二区三区在线 | 久久午夜无码鲁丝片午夜精品 | 一本久久a久久精品vr综合 | 无码人妻出轨黑人中文字幕 | 欧美日韩一区二区免费视频 | 日韩无码专区 | 久久综合网欧美色妞网 | 九九在线中文字幕无码 | 玩弄中年熟妇正在播放 | 熟女俱乐部五十路六十路av | 啦啦啦www在线观看免费视频 | 久久 国产 尿 小便 嘘嘘 | 亚洲中文字幕在线观看 | 国产精品毛片一区二区 | 久久久久久久女国产乱让韩 | 最新国产麻豆aⅴ精品无码 | 欧美日韩人成综合在线播放 | 免费人成网站视频在线观看 | 亚洲区欧美区综合区自拍区 | 成人一在线视频日韩国产 | 美女扒开屁股让男人桶 | 国产精品99爱免费视频 | 成人欧美一区二区三区黑人免费 | 午夜性刺激在线视频免费 | 欧美日本免费一区二区三区 | 成人无码影片精品久久久 | 久久久久99精品成人片 | 国产精品手机免费 | 国产成人精品视频ⅴa片软件竹菊 | 国产乱人无码伦av在线a | 亚洲欧美国产精品专区久久 | 国产人妻人伦精品1国产丝袜 | 中文字幕无线码 | 精品夜夜澡人妻无码av蜜桃 | 亚洲毛片av日韩av无码 | 国产精品无码一区二区桃花视频 | 狠狠色丁香久久婷婷综合五月 | 亚洲成色www久久网站 | 亚洲欧美综合区丁香五月小说 | 国产内射老熟女aaaa | 日韩精品一区二区av在线 | 亚洲の无码国产の无码步美 | 久久久中文久久久无码 | 精品日本一区二区三区在线观看 | 乱码午夜-极国产极内射 | 中文字幕无码日韩欧毛 | 国产乱子伦视频在线播放 | 大屁股大乳丰满人妻 | 久久精品人人做人人综合 | 国产国语老龄妇女a片 | 国产亚洲精品精品国产亚洲综合 | 久9re热视频这里只有精品 | yw尤物av无码国产在线观看 | 国产综合色产在线精品 | 精品偷拍一区二区三区在线看 | 少女韩国电视剧在线观看完整 | 妺妺窝人体色www在线小说 | 无套内射视频囯产 | 久久久中文久久久无码 | 国产69精品久久久久app下载 | 1000部夫妻午夜免费 | 中文无码精品a∨在线观看不卡 | 国产成人精品久久亚洲高清不卡 | 无码乱肉视频免费大全合集 | 国产尤物精品视频 | 欧美性猛交xxxx富婆 | 国产精品成人av在线观看 | 中文字幕无码免费久久9一区9 | 香蕉久久久久久av成人 | av无码久久久久不卡免费网站 | 婷婷五月综合激情中文字幕 | 丰腴饱满的极品熟妇 | 久久精品国产精品国产精品污 | 最新国产乱人伦偷精品免费网站 | 欧美日本精品一区二区三区 | 国产精品免费大片 | 国产成人无码午夜视频在线观看 | 99re在线播放 | 西西人体www44rt大胆高清 | 国产绳艺sm调教室论坛 | 国产精品igao视频网 | 中国大陆精品视频xxxx | 亚洲国产精品毛片av不卡在线 | 精品人妻中文字幕有码在线 | 98国产精品综合一区二区三区 | 一本一道久久综合久久 | 一本加勒比波多野结衣 | 玩弄人妻少妇500系列视频 | 亚洲欧美国产精品专区久久 | 国产精品成人av在线观看 | 亚洲人成影院在线无码按摩店 | 日产精品高潮呻吟av久久 | 天堂亚洲2017在线观看 | 中文字幕+乱码+中文字幕一区 | 亚洲国产日韩a在线播放 | 国产97人人超碰caoprom | 国产口爆吞精在线视频 | 国内精品久久久久久中文字幕 | 精品aⅴ一区二区三区 | 99久久精品日本一区二区免费 | 国产九九九九九九九a片 | 帮老师解开蕾丝奶罩吸乳网站 | 综合激情五月综合激情五月激情1 | 国产一区二区三区四区五区加勒比 | 久久精品国产大片免费观看 | 人人妻人人澡人人爽欧美一区九九 | 狂野欧美性猛交免费视频 | 无码一区二区三区在线 | 麻豆md0077饥渴少妇 | 久久aⅴ免费观看 | 强开小婷嫩苞又嫩又紧视频 | 男女猛烈xx00免费视频试看 | 国产香蕉尹人综合在线观看 | 久久精品国产一区二区三区肥胖 | 亚洲人亚洲人成电影网站色 | 欧美丰满老熟妇xxxxx性 | 国产真人无遮挡作爱免费视频 | 国内老熟妇对白xxxxhd | 麻豆精产国品 | 精品国产成人一区二区三区 | 欧美zoozzooz性欧美 | 中文字幕+乱码+中文字幕一区 | 97色伦图片97综合影院 | 美女毛片一区二区三区四区 | 97se亚洲精品一区 | 婷婷五月综合缴情在线视频 | 欧美熟妇另类久久久久久多毛 | 国产精品久久精品三级 | 西西人体www44rt大胆高清 | 亚洲日韩精品欧美一区二区 | 欧美日本免费一区二区三区 | 欧美丰满熟妇xxxx性ppx人交 | av在线亚洲欧洲日产一区二区 | 麻豆人妻少妇精品无码专区 | 免费人成在线观看网站 | 国产97色在线 | 免 | 亚洲爆乳精品无码一区二区三区 | 亚洲成av人片在线观看无码不卡 | 国产人妻精品一区二区三区 | 国产真实乱对白精彩久久 | 高潮毛片无遮挡高清免费视频 | 亚洲狠狠色丁香婷婷综合 | 少妇人妻大乳在线视频 | 正在播放老肥熟妇露脸 | 日本乱偷人妻中文字幕 | 三级4级全黄60分钟 | 无遮挡啪啪摇乳动态图 | 好男人www社区 | 国产精品嫩草久久久久 | 好爽又高潮了毛片免费下载 | 国产精品.xx视频.xxtv | 成人欧美一区二区三区 | 亚洲の无码国产の无码步美 | 成 人影片 免费观看 | 人妻少妇精品无码专区动漫 | 免费观看又污又黄的网站 | 老熟妇乱子伦牲交视频 | 国产亚洲tv在线观看 | 国产网红无码精品视频 | 奇米影视7777久久精品人人爽 | 精品偷拍一区二区三区在线看 | 亚洲国产精品美女久久久久 | 男女爱爱好爽视频免费看 | 免费乱码人妻系列无码专区 | 国产真实伦对白全集 | 亚洲国产精品毛片av不卡在线 | 荫蒂添的好舒服视频囗交 | 婷婷综合久久中文字幕蜜桃三电影 | 亚洲国产精品无码久久久久高潮 | 欧美日韩人成综合在线播放 | 在线播放亚洲第一字幕 | 高潮毛片无遮挡高清免费 | 久久久久免费精品国产 | 午夜福利一区二区三区在线观看 | 欧美 日韩 亚洲 在线 | 色五月五月丁香亚洲综合网 | 在线天堂新版最新版在线8 | 成人无码精品1区2区3区免费看 | 少妇人妻av毛片在线看 | 国产性猛交╳xxx乱大交 国产精品久久久久久无码 欧洲欧美人成视频在线 | 又湿又紧又大又爽a视频国产 | 午夜精品一区二区三区在线观看 | 亚洲色欲色欲欲www在线 | 亚洲色欲色欲天天天www | 4hu四虎永久在线观看 | 麻豆av传媒蜜桃天美传媒 | 又大又黄又粗又爽的免费视频 | 色欲av亚洲一区无码少妇 | 久久精品人人做人人综合 | 亚洲欧洲日本综合aⅴ在线 | 一本色道久久综合亚洲精品不卡 | aⅴ亚洲 日韩 色 图网站 播放 | 国产福利视频一区二区 | 给我免费的视频在线观看 | 中文字幕乱码中文乱码51精品 | 国产成人无码a区在线观看视频app | 日本丰满护士爆乳xxxx | 2020最新国产自产精品 | 激情内射亚州一区二区三区爱妻 | 十八禁真人啪啪免费网站 | 一个人免费观看的www视频 | 午夜精品久久久内射近拍高清 | 人妻aⅴ无码一区二区三区 | 日日碰狠狠丁香久燥 | 无码精品国产va在线观看dvd | 久久99精品久久久久久 | 精品无人国产偷自产在线 | 国产精品久久久久久久9999 | 国产三级精品三级男人的天堂 | 亚洲精品久久久久久一区二区 | 亚洲成在人网站无码天堂 | 少妇激情av一区二区 | 国产亚洲精品久久久久久久久动漫 | 97精品国产97久久久久久免费 | 麻豆国产人妻欲求不满 | 无码成人精品区在线观看 | 2019nv天堂香蕉在线观看 | 亚洲va欧美va天堂v国产综合 | 天堂а√在线地址中文在线 | 天天拍夜夜添久久精品大 | 日韩少妇白浆无码系列 | 老熟妇仑乱视频一区二区 | 久久无码人妻影院 | 又色又爽又黄的美女裸体网站 | 成在人线av无码免费 | 少妇太爽了在线观看 | 国产精品国产三级国产专播 | 精品国产一区av天美传媒 | 真人与拘做受免费视频一 | 亚洲天堂2017无码 | 四虎永久在线精品免费网址 | 人人妻人人澡人人爽人人精品浪潮 | 丝袜足控一区二区三区 | 国产午夜福利亚洲第一 | 国产精品久久久 | 四虎国产精品免费久久 | 亚洲欧洲无卡二区视頻 | 人人妻人人澡人人爽欧美一区 | 亚洲精品久久久久久一区二区 | 国产一区二区三区影院 | 狠狠色欧美亚洲狠狠色www | 色欲综合久久中文字幕网 | 亚洲精品中文字幕乱码 | 亚洲欧美国产精品专区久久 | 亚洲综合无码久久精品综合 | 欧美激情综合亚洲一二区 | 久久午夜无码鲁丝片 | 人妻aⅴ无码一区二区三区 | 2020最新国产自产精品 | 久久久久久a亚洲欧洲av冫 | 婷婷五月综合激情中文字幕 | 无码一区二区三区在线 | 国产麻豆精品精东影业av网站 | 日本熟妇大屁股人妻 | 亚洲va中文字幕无码久久不卡 | 一区二区三区乱码在线 | 欧洲 | 久久久久免费看成人影片 | 国产综合久久久久鬼色 | 蜜桃视频韩日免费播放 | 天天摸天天碰天天添 | 永久免费观看美女裸体的网站 | 噜噜噜亚洲色成人网站 | 日韩欧美群交p片內射中文 | 国产精品久久久久久亚洲影视内衣 | 午夜福利一区二区三区在线观看 | 欧洲欧美人成视频在线 | 一本大道伊人av久久综合 | 久久视频在线观看精品 | 中文字幕av日韩精品一区二区 | 国产超级va在线观看视频 | 少妇性荡欲午夜性开放视频剧场 | 国产精品亚洲一区二区三区喷水 | 青青久在线视频免费观看 | 国产一区二区三区日韩精品 | 国产精品欧美成人 | 无码国内精品人妻少妇 | 亚洲小说春色综合另类 | 成年女人永久免费看片 | 精品国产一区av天美传媒 | 国产精品成人av在线观看 | 国产网红无码精品视频 | 亚洲国产精品无码一区二区三区 | 亚洲日韩精品欧美一区二区 | 亚洲国产欧美日韩精品一区二区三区 | 久久久久亚洲精品中文字幕 | 男女作爱免费网站 | 人妻少妇精品无码专区动漫 | 无套内射视频囯产 | 国产黄在线观看免费观看不卡 | 国产亚洲人成a在线v网站 | 国产精品久久久av久久久 | 99国产欧美久久久精品 | 国产精品爱久久久久久久 | 高清国产亚洲精品自在久久 | 国产精品久久久久9999小说 | 久久伊人色av天堂九九小黄鸭 | 乱码av麻豆丝袜熟女系列 | 久久国产精品萌白酱免费 | 日韩欧美成人免费观看 | 伊人久久婷婷五月综合97色 | 沈阳熟女露脸对白视频 | 2020最新国产自产精品 | 久久精品丝袜高跟鞋 | 久久久久久国产精品无码下载 | 88国产精品欧美一区二区三区 | 国产一区二区三区四区五区加勒比 | 日韩精品成人一区二区三区 | 高清国产亚洲精品自在久久 | 国产色精品久久人妻 | 日日摸日日碰夜夜爽av | 久久亚洲中文字幕无码 | 51国偷自产一区二区三区 | 日韩视频 中文字幕 视频一区 | 99麻豆久久久国产精品免费 | 人人妻人人澡人人爽人人精品 | 久久亚洲精品中文字幕无男同 | 久久 国产 尿 小便 嘘嘘 | 亚洲精品中文字幕乱码 | 国产精品爱久久久久久久 | 国产精品资源一区二区 | 青草青草久热国产精品 | 中文字幕av无码一区二区三区电影 | 精品一区二区三区波多野结衣 | 久久久成人毛片无码 | 亚洲国产欧美国产综合一区 | 久久zyz资源站无码中文动漫 | 少妇人妻大乳在线视频 | 久久精品国产一区二区三区肥胖 | 野外少妇愉情中文字幕 | 久久久www成人免费毛片 | 日韩精品无码一本二本三本色 | 精品偷拍一区二区三区在线看 | 国产精品免费大片 | 欧美人与禽猛交狂配 | 国内少妇偷人精品视频免费 | 十八禁视频网站在线观看 | 午夜丰满少妇性开放视频 | 红桃av一区二区三区在线无码av | 日本免费一区二区三区最新 | 一本久久a久久精品vr综合 | 中文字幕乱妇无码av在线 | 亚洲成av人影院在线观看 | 欧美怡红院免费全部视频 | 精品成人av一区二区三区 | 99视频精品全部免费免费观看 | 婷婷色婷婷开心五月四房播播 | 国产精品亚洲五月天高清 | 国产亚洲精品久久久ai换 | 成人影院yy111111在线观看 | 丰满少妇高潮惨叫视频 | 性啪啪chinese东北女人 | 国产成人午夜福利在线播放 | 伊人久久大香线蕉亚洲 | 国产性生交xxxxx无码 | 国内精品久久久久久中文字幕 | 欧美日韩色另类综合 | 国产麻豆精品一区二区三区v视界 | 国产精品无码成人午夜电影 | 亚洲国产综合无码一区 | 精品国产精品久久一区免费式 | 蜜桃视频插满18在线观看 | 四虎国产精品一区二区 | 久久亚洲a片com人成 | 国产精品亚洲а∨无码播放麻豆 | 一区二区三区高清视频一 | 精品无人区无码乱码毛片国产 | 国产午夜无码视频在线观看 | 国产av久久久久精东av | 人妻互换免费中文字幕 | 亚洲熟女一区二区三区 | 日本xxxx色视频在线观看免费 | 国产精品久久久久久久9999 | 国内少妇偷人精品视频 | 亚洲区欧美区综合区自拍区 | 久久亚洲日韩精品一区二区三区 | 亚洲另类伦春色综合小说 | 国产精品久免费的黄网站 | 少妇无套内谢久久久久 | 东京热一精品无码av | 欧美自拍另类欧美综合图片区 | 人人妻人人澡人人爽精品欧美 | 成人无码影片精品久久久 | 国产亲子乱弄免费视频 | 玩弄中年熟妇正在播放 | 无码播放一区二区三区 | 国产精品二区一区二区aⅴ污介绍 | www国产精品内射老师 | 久久精品国产99精品亚洲 | 色五月丁香五月综合五月 | 日日摸天天摸爽爽狠狠97 | 国产精品久久福利网站 | 九九在线中文字幕无码 | 高清不卡一区二区三区 | 国内精品人妻无码久久久影院蜜桃 | 婷婷丁香五月天综合东京热 | 无套内谢老熟女 | 成人无码影片精品久久久 | 中文字幕久久久久人妻 | 熟妇女人妻丰满少妇中文字幕 | 呦交小u女精品视频 | 亚洲中文字幕av在天堂 | 波多野结衣乳巨码无在线观看 | 色综合久久88色综合天天 | 999久久久国产精品消防器材 | 一区二区三区乱码在线 | 欧洲 | 国产香蕉尹人视频在线 | 国产真实乱对白精彩久久 | 欧美国产日韩亚洲中文 | 午夜熟女插插xx免费视频 | 国产精品永久免费视频 | 欧美午夜特黄aaaaaa片 | 中文字幕无码热在线视频 | 国产亚洲tv在线观看 | 精品无码一区二区三区爱欲 | 精品无人国产偷自产在线 | 国产欧美亚洲精品a | 东京热男人av天堂 | 无码国产乱人伦偷精品视频 | 亚洲精品久久久久久久久久久 | 成人av无码一区二区三区 | 无码av中文字幕免费放 | 日本免费一区二区三区最新 | 久久亚洲a片com人成 | 乌克兰少妇xxxx做受 | 国产莉萝无码av在线播放 | 国产亚洲精品精品国产亚洲综合 | 小sao货水好多真紧h无码视频 | 一本大道伊人av久久综合 | 午夜精品久久久内射近拍高清 | 在线 国产 欧美 亚洲 天堂 | 久久久久久久久蜜桃 | 九一九色国产 | 亚洲精品国产a久久久久久 | 成人无码视频免费播放 | 亚洲精品久久久久avwww潮水 | 少妇性荡欲午夜性开放视频剧场 | 国产又粗又硬又大爽黄老大爷视 | 久久精品女人的天堂av | 成人影院yy111111在线观看 | 国产人成高清在线视频99最全资源 | 亚洲熟妇色xxxxx欧美老妇y | 性史性农村dvd毛片 | 18禁止看的免费污网站 | 久精品国产欧美亚洲色aⅴ大片 | 丰满人妻精品国产99aⅴ | 学生妹亚洲一区二区 | 男女下面进入的视频免费午夜 | 狠狠色欧美亚洲狠狠色www | 中文字幕色婷婷在线视频 | 福利一区二区三区视频在线观看 | 国内精品人妻无码久久久影院蜜桃 | 日日摸日日碰夜夜爽av | 欧美成人高清在线播放 | 国产内射爽爽大片视频社区在线 | 欧美成人高清在线播放 | 国产另类ts人妖一区二区 | 男人的天堂2018无码 | 成人无码视频在线观看网站 | 精品国偷自产在线视频 | 日日麻批免费40分钟无码 | 欧美野外疯狂做受xxxx高潮 | 中文字幕无码免费久久99 | 国产在线一区二区三区四区五区 | 性欧美牲交xxxxx视频 | 女高中生第一次破苞av | 人人妻人人澡人人爽人人精品浪潮 | 欧美肥老太牲交大战 | 久久久久久a亚洲欧洲av冫 | 黑人巨大精品欧美黑寡妇 | 日韩av无码中文无码电影 | 日韩欧美成人免费观看 | 好屌草这里只有精品 | 精品厕所偷拍各类美女tp嘘嘘 | 国产乱人伦av在线无码 | 在线精品亚洲一区二区 | 激情五月综合色婷婷一区二区 | 久久国产精品偷任你爽任你 | 在线精品国产一区二区三区 | 中文字幕色婷婷在线视频 | 久久国产36精品色熟妇 | 中文字幕无码av波多野吉衣 | 欧美性猛交xxxx富婆 | 欧美老妇交乱视频在线观看 | 熟妇人妻无码xxx视频 | 天天av天天av天天透 | 欧美激情一区二区三区成人 | 噜噜噜亚洲色成人网站 | 亚洲中文字幕无码一久久区 | 午夜成人1000部免费视频 | 妺妺窝人体色www在线小说 | 色婷婷久久一区二区三区麻豆 | 亚洲а∨天堂久久精品2021 | 国产av一区二区三区最新精品 | av无码久久久久不卡免费网站 | 中文毛片无遮挡高清免费 | 大肉大捧一进一出视频出来呀 | 在线亚洲高清揄拍自拍一品区 | 国产亚洲视频中文字幕97精品 | 熟女少妇人妻中文字幕 | 欧洲极品少妇 | 亚洲熟妇色xxxxx亚洲 | 免费观看的无遮挡av | 久热国产vs视频在线观看 | 久久亚洲中文字幕精品一区 | 国产区女主播在线观看 | 国产精品人妻一区二区三区四 | 久久婷婷五月综合色国产香蕉 | 国产av一区二区精品久久凹凸 | 久久精品一区二区三区四区 | 少妇无套内谢久久久久 | 激情国产av做激情国产爱 | 精品亚洲韩国一区二区三区 | 成人片黄网站色大片免费观看 | 色一情一乱一伦 | 麻豆蜜桃av蜜臀av色欲av | 久久久久国色av免费观看性色 | 特级做a爰片毛片免费69 | 国产美女极度色诱视频www | 日本熟妇人妻xxxxx人hd | 在线看片无码永久免费视频 | 亚洲成在人网站无码天堂 | 香港三级日本三级妇三级 | 综合人妻久久一区二区精品 | 人人爽人人澡人人高潮 | 精品亚洲成av人在线观看 | 老熟妇仑乱视频一区二区 | 成人无码精品1区2区3区免费看 | 日韩精品无码一区二区中文字幕 | 亚洲码国产精品高潮在线 | 亚洲一区二区三区偷拍女厕 | 亚洲一区二区三区四区 | 免费无码的av片在线观看 | 中文精品无码中文字幕无码专区 | 一本色道婷婷久久欧美 | 中文字幕久久久久人妻 | 欧美freesex黑人又粗又大 | 久久综合激激的五月天 | 国产精品人人爽人人做我的可爱 | 无码精品国产va在线观看dvd | 久精品国产欧美亚洲色aⅴ大片 | 亚洲日韩一区二区 | 精品国产国产综合精品 | 人妻少妇被猛烈进入中文字幕 | 国产精品亚洲五月天高清 | 免费看少妇作爱视频 | 亚洲人成网站色7799 | 日韩精品无码免费一区二区三区 | 国产99久久精品一区二区 | 国产成人午夜福利在线播放 | 疯狂三人交性欧美 | 99久久精品午夜一区二区 | 精品成人av一区二区三区 | 亚洲欧美综合区丁香五月小说 | 亚洲国产精品无码久久久久高潮 | 精品人妻中文字幕有码在线 | 国产在线aaa片一区二区99 | 乱码午夜-极国产极内射 | 中文字幕人成乱码熟女app | 国产无套粉嫩白浆在线 | 无码av岛国片在线播放 | 波多野结衣av一区二区全免费观看 | 18禁黄网站男男禁片免费观看 | 两性色午夜免费视频 | 东京热男人av天堂 | 在线观看国产一区二区三区 | 欧美 日韩 人妻 高清 中文 | 色偷偷人人澡人人爽人人模 | 亚洲欧美中文字幕5发布 | 色婷婷综合中文久久一本 | 兔费看少妇性l交大片免费 | 日韩视频 中文字幕 视频一区 | 色婷婷综合中文久久一本 | 偷窥村妇洗澡毛毛多 | 久久久久久九九精品久 | 黑人巨大精品欧美黑寡妇 | 久久久亚洲欧洲日产国码αv | 国产精华av午夜在线观看 | 国产精品久久久久9999小说 | 久久久久se色偷偷亚洲精品av | 风流少妇按摩来高潮 | 国产高清av在线播放 | 国产黑色丝袜在线播放 | 国产人妖乱国产精品人妖 | 波多野结衣av一区二区全免费观看 | 欧美性猛交xxxx富婆 | 人妻体内射精一区二区三四 | 亚洲精品中文字幕久久久久 | 纯爱无遮挡h肉动漫在线播放 | 曰韩少妇内射免费播放 | 中文无码伦av中文字幕 | 国产精品丝袜黑色高跟鞋 | 国产精品亚洲五月天高清 | 亚洲中文无码av永久不收费 | 九九热爱视频精品 | 国产免费久久精品国产传媒 | 亚洲国产av精品一区二区蜜芽 | 成熟人妻av无码专区 | 人妻天天爽夜夜爽一区二区 | √8天堂资源地址中文在线 | 久久精品国产一区二区三区 | 亚洲第一网站男人都懂 | 人人妻人人澡人人爽欧美一区 | 久久久久国色av免费观看性色 | 欧美日韩久久久精品a片 | 欧美激情内射喷水高潮 | 国产成人精品三级麻豆 | 乌克兰少妇性做爰 | 少妇太爽了在线观看 | 国产人妻精品一区二区三区 | 色诱久久久久综合网ywww | 国产亚洲精品久久久闺蜜 | 国产亚洲精品久久久ai换 | 亚洲成a人片在线观看日本 | 日本精品久久久久中文字幕 | 国产成人精品优优av | 风流少妇按摩来高潮 | 国产成人无码午夜视频在线观看 | 国产亚洲精品久久久久久国模美 | 欧美日韩亚洲国产精品 | 天天综合网天天综合色 | 亚洲人交乣女bbw | 国产成人精品视频ⅴa片软件竹菊 | 国产精品美女久久久网av | 98国产精品综合一区二区三区 | 精品aⅴ一区二区三区 | 日韩精品无码一本二本三本色 | 荡女精品导航 | 爱做久久久久久 | 亚洲中文字幕在线观看 | 亚洲国产精品无码久久久久高潮 | 台湾无码一区二区 | 亚洲乱码日产精品bd | 国产欧美熟妇另类久久久 | 精品无码成人片一区二区98 | 搡女人真爽免费视频大全 | 成 人 免费观看网站 | 粗大的内捧猛烈进出视频 | 国产精品二区一区二区aⅴ污介绍 | 亚洲中文字幕无码中文字在线 | 亚洲国产精品毛片av不卡在线 | 久久久久99精品成人片 | 人人妻人人澡人人爽欧美一区九九 | 激情内射亚州一区二区三区爱妻 | 欧美老熟妇乱xxxxx | 国精产品一品二品国精品69xx | 欧美喷潮久久久xxxxx | 久久国语露脸国产精品电影 | 狠狠色色综合网站 | 亚洲精品鲁一鲁一区二区三区 | 色综合久久中文娱乐网 | 波多野结衣一区二区三区av免费 | 久久久久亚洲精品中文字幕 | 全黄性性激高免费视频 | 日韩欧美成人免费观看 | 婷婷色婷婷开心五月四房播播 | 久久综合九色综合欧美狠狠 | 中文字幕无码人妻少妇免费 | 国产卡一卡二卡三 | 又色又爽又黄的美女裸体网站 | 免费观看激色视频网站 | 成人欧美一区二区三区 | 人妻有码中文字幕在线 | 亚洲成av人综合在线观看 | 久久久国产精品无码免费专区 | 人人妻人人澡人人爽欧美精品 | 欧美精品一区二区精品久久 | 午夜无码人妻av大片色欲 | 日韩少妇白浆无码系列 | 日本精品久久久久中文字幕 | 久久久国产精品无码免费专区 | 国产成人综合美国十次 | 欧美人与物videos另类 | 国产精品国产三级国产专播 | 亚洲精品久久久久久一区二区 | 国产艳妇av在线观看果冻传媒 | 夜夜高潮次次欢爽av女 | 国产又爽又黄又刺激的视频 | 国产xxx69麻豆国语对白 | 精品国产一区av天美传媒 | 欧洲vodafone精品性 | 色一情一乱一伦 | 99久久久无码国产aaa精品 | 日韩精品乱码av一区二区 | 一区二区传媒有限公司 | 国产情侣作爱视频免费观看 | 亚洲春色在线视频 | 亚洲啪av永久无码精品放毛片 | 又大又硬又黄的免费视频 | 人人妻人人澡人人爽欧美一区 | 无码av最新清无码专区吞精 | 一本久道高清无码视频 | 亚洲精品综合一区二区三区在线 | 精品国产av色一区二区深夜久久 | 亚洲精品久久久久久一区二区 | 国产精品久久久午夜夜伦鲁鲁 | av人摸人人人澡人人超碰下载 | 性欧美牲交在线视频 | 国产亚洲精品久久久久久国模美 | 国产精品高潮呻吟av久久4虎 | 亚洲aⅴ无码成人网站国产app | 99久久久无码国产精品免费 | 国产口爆吞精在线视频 | 亚洲va欧美va天堂v国产综合 | 大肉大捧一进一出好爽视频 | 久久久久久a亚洲欧洲av冫 | 国产两女互慰高潮视频在线观看 | 国产福利视频一区二区 | 玩弄人妻少妇500系列视频 | 正在播放东北夫妻内射 | 国产成人无码av一区二区 | а√天堂www在线天堂小说 | 国产莉萝无码av在线播放 | 一本精品99久久精品77 | 午夜福利一区二区三区在线观看 | 熟妇人妻激情偷爽文 | 毛片内射-百度 | 欧美性色19p | 激情亚洲一区国产精品 | 国产两女互慰高潮视频在线观看 | 国产精品久久福利网站 | 妺妺窝人体色www婷婷 | 丝袜 中出 制服 人妻 美腿 | 88国产精品欧美一区二区三区 | 麻豆国产人妻欲求不满谁演的 | 搡女人真爽免费视频大全 | 国产精品第一区揄拍无码 | 国内揄拍国内精品人妻 | 少妇无套内谢久久久久 | 亚洲国产成人av在线观看 | 免费人成网站视频在线观看 | 丰满人妻翻云覆雨呻吟视频 | 欧美精品无码一区二区三区 | 久久综合九色综合欧美狠狠 | aa片在线观看视频在线播放 | 久久久久99精品成人片 | 97精品国产97久久久久久免费 | 无套内射视频囯产 | 亚洲中文字幕va福利 | 婷婷色婷婷开心五月四房播播 | 天干天干啦夜天干天2017 | 老熟妇仑乱视频一区二区 | 学生妹亚洲一区二区 | 国产人成高清在线视频99最全资源 | 久久99国产综合精品 | 国产真实乱对白精彩久久 | 亚洲区欧美区综合区自拍区 | 一区二区传媒有限公司 | 熟妇女人妻丰满少妇中文字幕 | 最近的中文字幕在线看视频 | 亚洲成av人片天堂网无码】 | 男女爱爱好爽视频免费看 | 国产在线精品一区二区三区直播 | 国产性生交xxxxx无码 | 狂野欧美性猛交免费视频 | 亚洲中文字幕乱码av波多ji | 亚洲自偷自拍另类第1页 | 熟妇女人妻丰满少妇中文字幕 | 亚洲男人av天堂午夜在 | 黑人玩弄人妻中文在线 | 大胆欧美熟妇xx | 国产精品毛片一区二区 | 亚洲区小说区激情区图片区 | 久久精品国产99精品亚洲 | 强开小婷嫩苞又嫩又紧视频 | 亚洲欧美精品aaaaaa片 | 樱花草在线社区www | 精品水蜜桃久久久久久久 | 人妻人人添人妻人人爱 | 欧美人与牲动交xxxx | 色综合久久88色综合天天 | 成人片黄网站色大片免费观看 | 国产精品无码成人午夜电影 | 黑人巨大精品欧美黑寡妇 | 亚洲精品成人av在线 | 日本肉体xxxx裸交 | 亚洲中文字幕久久无码 | 欧美freesex黑人又粗又大 | 国模大胆一区二区三区 | 一本色道久久综合亚洲精品不卡 | 97夜夜澡人人爽人人喊中国片 | а√资源新版在线天堂 | 免费播放一区二区三区 | 欧美丰满熟妇xxxx | 欧美老妇交乱视频在线观看 | 图片小说视频一区二区 | 久久成人a毛片免费观看网站 | 亚洲国产精品毛片av不卡在线 | 在线视频网站www色 | 中文字幕乱码中文乱码51精品 | 久久精品中文闷骚内射 | 日本一卡二卡不卡视频查询 | 日本精品少妇一区二区三区 | 99在线 | 亚洲 | 中文无码成人免费视频在线观看 | 天堂а√在线地址中文在线 | 国产欧美精品一区二区三区 | 亚洲精品欧美二区三区中文字幕 | 狂野欧美激情性xxxx | 久久99精品久久久久久 | 国产激情无码一区二区 | 人人澡人人妻人人爽人人蜜桃 | 久久久中文字幕日本无吗 | 波多野结衣一区二区三区av免费 | 人人澡人人妻人人爽人人蜜桃 | 亚洲码国产精品高潮在线 | 欧美成人高清在线播放 | 国产精品无码久久av | 国产精品久久久 | 亚洲大尺度无码无码专区 | 国产一区二区三区日韩精品 | 九月婷婷人人澡人人添人人爽 | 日本一区二区更新不卡 | 亚洲爆乳大丰满无码专区 | 精品国产一区二区三区四区在线看 | 兔费看少妇性l交大片免费 | 欧美日韩色另类综合 | 亚洲 激情 小说 另类 欧美 | 国产在线精品一区二区高清不卡 | 国产美女精品一区二区三区 | 少妇人妻大乳在线视频 | 日产精品99久久久久久 | 亚洲精品成a人在线观看 | 亚洲国产一区二区三区在线观看 | 亲嘴扒胸摸屁股激烈网站 | 熟妇人妻无码xxx视频 | 内射后入在线观看一区 | 国产精品沙发午睡系列 | 无码帝国www无码专区色综合 | 小sao货水好多真紧h无码视频 | 色婷婷综合激情综在线播放 | 欧美黑人乱大交 | 人人澡人人妻人人爽人人蜜桃 | 国产小呦泬泬99精品 | 亚洲国产高清在线观看视频 | 亚洲成a人片在线观看日本 | 曰韩无码二三区中文字幕 | 亚洲精品久久久久avwww潮水 | 久久综合色之久久综合 | 国产精品多人p群无码 | 日本护士毛茸茸高潮 | 久久综合久久自在自线精品自 | 中文字幕亚洲情99在线 | 国内精品人妻无码久久久影院 | 久久99久久99精品中文字幕 | 久久99精品久久久久久 | 国产偷自视频区视频 | 377p欧洲日本亚洲大胆 | 色综合久久久无码网中文 | 88国产精品欧美一区二区三区 | 欧美兽交xxxx×视频 | 欧美一区二区三区视频在线观看 | 国产精品对白交换视频 | 无码一区二区三区在线 | 东京一本一道一二三区 | 国产精品无码永久免费888 | 无码免费一区二区三区 | 乱码av麻豆丝袜熟女系列 | 国产成人精品三级麻豆 | 国产猛烈高潮尖叫视频免费 | 狠狠噜狠狠狠狠丁香五月 | 成年美女黄网站色大免费全看 | 1000部啪啪未满十八勿入下载 | 99在线 | 亚洲 | 99麻豆久久久国产精品免费 | 久久综合香蕉国产蜜臀av | 又粗又大又硬又长又爽 | 成人精品天堂一区二区三区 | 成人毛片一区二区 | 无码av最新清无码专区吞精 | 国产免费久久久久久无码 | 日欧一片内射va在线影院 | 国模大胆一区二区三区 | 青青草原综合久久大伊人精品 | 成人影院yy111111在线观看 | 无码成人精品区在线观看 | 国产亚洲美女精品久久久2020 | 极品尤物被啪到呻吟喷水 | 久久视频在线观看精品 | 精品一二三区久久aaa片 | 高清国产亚洲精品自在久久 | 未满小14洗澡无码视频网站 | 久久久av男人的天堂 | 日日橹狠狠爱欧美视频 | 国产精品va在线观看无码 | 国产午夜福利100集发布 | 狠狠躁日日躁夜夜躁2020 | av无码电影一区二区三区 | 高清国产亚洲精品自在久久 | 欧美激情内射喷水高潮 | 丰满少妇人妻久久久久久 | 在线a亚洲视频播放在线观看 | 东京热无码av男人的天堂 | 久久精品无码一区二区三区 | 无码一区二区三区在线 | 婷婷色婷婷开心五月四房播播 | 久久久久久久人妻无码中文字幕爆 | 国产精品无码成人午夜电影 | 性欧美牲交xxxxx视频 | 久久久精品国产sm最大网站 | 久久综合九色综合97网 | 真人与拘做受免费视频 | 精品午夜福利在线观看 | 亚洲va中文字幕无码久久不卡 | 欧美成人家庭影院 | 久久视频在线观看精品 | 亚洲日韩av一区二区三区中文 | 欧美 丝袜 自拍 制服 另类 | 国产香蕉尹人视频在线 | 久久综合九色综合欧美狠狠 | 久久久婷婷五月亚洲97号色 | 桃花色综合影院 | 女人高潮内射99精品 | 美女黄网站人色视频免费国产 | 国产精品视频免费播放 | 日韩精品一区二区av在线 | 精品国产青草久久久久福利 | 麻豆国产丝袜白领秘书在线观看 | 国产小呦泬泬99精品 | 国产香蕉尹人综合在线观看 | 在教室伦流澡到高潮hnp视频 | 偷窥村妇洗澡毛毛多 | 东京热一精品无码av | 亚洲中文字幕av在天堂 | 日韩无套无码精品 | 亚洲精品国产精品乱码不卡 | 无人区乱码一区二区三区 | 国产欧美精品一区二区三区 | 蜜桃臀无码内射一区二区三区 | 久久国产精品_国产精品 | 国内老熟妇对白xxxxhd | www一区二区www免费 | 国产女主播喷水视频在线观看 | 一本色道久久综合亚洲精品不卡 | 骚片av蜜桃精品一区 | 国产精品怡红院永久免费 | 中文无码成人免费视频在线观看 | 日韩欧美中文字幕在线三区 | 黑人巨大精品欧美一区二区 | 国产午夜手机精彩视频 | 国产精品欧美成人 | 久久99久久99精品中文字幕 | 青春草在线视频免费观看 | 国产人成高清在线视频99最全资源 | 中国大陆精品视频xxxx | 成人动漫在线观看 | 国产香蕉97碰碰久久人人 | 日韩在线不卡免费视频一区 | 图片区 小说区 区 亚洲五月 | 亚洲爆乳无码专区 | 色综合久久久久综合一本到桃花网 | 亚洲国产欧美国产综合一区 | 国产精品无码一区二区桃花视频 | 麻豆精产国品 | аⅴ资源天堂资源库在线 | 岛国片人妻三上悠亚 | 欧美猛少妇色xxxxx | 亚洲日韩精品欧美一区二区 | 成在人线av无码免观看麻豆 | 国产麻豆精品精东影业av网站 | 波多野结衣av在线观看 | 国产综合在线观看 | 中文字幕乱码亚洲无线三区 | 日本精品人妻无码免费大全 | 久久99精品久久久久婷婷 | 亚洲无人区午夜福利码高清完整版 | 少妇愉情理伦片bd | 熟妇人妻无码xxx视频 | 亚洲区小说区激情区图片区 | 2019午夜福利不卡片在线 | 久久天天躁狠狠躁夜夜免费观看 | 亚洲日本一区二区三区在线 | 狠狠色噜噜狠狠狠狠7777米奇 | 国内精品久久久久久中文字幕 | 人妻无码αv中文字幕久久琪琪布 | 无遮挡国产高潮视频免费观看 | 欧美熟妇另类久久久久久不卡 | 老熟女重囗味hdxx69 | 青草视频在线播放 | 欧洲美熟女乱又伦 | 精品偷拍一区二区三区在线看 | 色一情一乱一伦一视频免费看 | 色狠狠av一区二区三区 | 国产免费无码一区二区视频 | 成 人影片 免费观看 | 特级做a爰片毛片免费69 | 在线视频网站www色 | 免费网站看v片在线18禁无码 | 亚洲综合在线一区二区三区 | 亚洲高清偷拍一区二区三区 | 亚洲国产精品一区二区第一页 | 粗大的内捧猛烈进出视频 | 久久亚洲中文字幕精品一区 | 精品一区二区三区波多野结衣 | 在教室伦流澡到高潮hnp视频 | 无码一区二区三区在线 | 无套内谢老熟女 | 国产午夜精品一区二区三区嫩草 | 国产美女极度色诱视频www | 97资源共享在线视频 | 欧美性猛交内射兽交老熟妇 | 日韩亚洲欧美中文高清在线 | 久久精品99久久香蕉国产色戒 | 又粗又大又硬毛片免费看 | 精品国产一区二区三区av 性色 | 丰满妇女强制高潮18xxxx | 欧美老熟妇乱xxxxx | 午夜时刻免费入口 | 国精产品一区二区三区 | 亚洲乱码国产乱码精品精 | 97se亚洲精品一区 | 最新国产乱人伦偷精品免费网站 | 中文精品久久久久人妻不卡 | 精品偷拍一区二区三区在线看 | 国产午夜福利100集发布 | 国产精品久久久av久久久 | 又大又硬又爽免费视频 | 激情人妻另类人妻伦 | 国产成人精品三级麻豆 | 狠狠色欧美亚洲狠狠色www | 精品久久久久香蕉网 | 国产高清av在线播放 | 无人区乱码一区二区三区 | 欧美xxxxx精品 | 成人精品视频一区二区三区尤物 | 亚洲国产精华液网站w | 人妻互换免费中文字幕 | 东京热一精品无码av | 婷婷丁香六月激情综合啪 | 国内揄拍国内精品人妻 | 亚洲日本va中文字幕 | 亚洲国产成人a精品不卡在线 | a国产一区二区免费入口 | 亚洲成色www久久网站 | 欧美freesex黑人又粗又大 | 国产一区二区三区日韩精品 | 精品久久久无码中文字幕 | 久久国语露脸国产精品电影 | 在线精品国产一区二区三区 | 啦啦啦www在线观看免费视频 | 国产无av码在线观看 | 天堂а√在线中文在线 | 久久天天躁狠狠躁夜夜免费观看 | 精品无码国产自产拍在线观看蜜 | 中文字幕中文有码在线 | 欧美人与禽猛交狂配 | 曰韩少妇内射免费播放 | 精品无码成人片一区二区98 | 俺去俺来也在线www色官网 | 国产精品久久国产精品99 | 精品久久综合1区2区3区激情 | 无码av免费一区二区三区试看 | 97夜夜澡人人双人人人喊 | 亚洲小说春色综合另类 | 久久99精品国产麻豆 | 亚洲色在线无码国产精品不卡 | 久久久久国色av免费观看性色 | 欧美阿v高清资源不卡在线播放 | 国产真实乱对白精彩久久 | 国产成人综合色在线观看网站 | 成人无码精品一区二区三区 | 99riav国产精品视频 | 美女黄网站人色视频免费国产 | 国产亚洲精品久久久久久久 | 国产激情综合五月久久 | 亚洲欧美日韩国产精品一区二区 | 亚洲爆乳精品无码一区二区三区 | 久久精品成人欧美大片 | 国产欧美精品一区二区三区 | 内射欧美老妇wbb | 天天拍夜夜添久久精品 | 丁香花在线影院观看在线播放 | 国产艳妇av在线观看果冻传媒 | 在线а√天堂中文官网 | 色五月五月丁香亚洲综合网 | 欧美性猛交xxxx富婆 | 国产精品无码久久av | 一本久久伊人热热精品中文字幕 | 麻豆av传媒蜜桃天美传媒 | 性史性农村dvd毛片 | 欧美国产日产一区二区 | 国产99久久精品一区二区 | 久久久久成人精品免费播放动漫 | 黑人大群体交免费视频 | 日日橹狠狠爱欧美视频 | 国内精品一区二区三区不卡 | 性生交大片免费看女人按摩摩 | 欧美日韩在线亚洲综合国产人 | 亚洲成av人片天堂网无码】 | 丝袜 中出 制服 人妻 美腿 | 在线 国产 欧美 亚洲 天堂 | 亚洲欧美日韩综合久久久 | 欧美激情一区二区三区成人 | 国产猛烈高潮尖叫视频免费 | 免费人成网站视频在线观看 | 国产乱码精品一品二品 | 亚洲自偷精品视频自拍 | 沈阳熟女露脸对白视频 | 秋霞成人午夜鲁丝一区二区三区 | 国产热a欧美热a在线视频 | 亚洲精品国偷拍自产在线观看蜜桃 | 国产一区二区不卡老阿姨 | 性欧美牲交xxxxx视频 | 天天燥日日燥 | 蜜桃无码一区二区三区 | 国产精品第一区揄拍无码 | 国产精品久久久久久亚洲影视内衣 | 国产成人一区二区三区别 | 欧美xxxx黑人又粗又长 | 国产激情无码一区二区app | 内射巨臀欧美在线视频 | 婷婷丁香五月天综合东京热 | 亚洲综合伊人久久大杳蕉 | 乱人伦人妻中文字幕无码 | 久久久精品成人免费观看 | 天天躁夜夜躁狠狠是什么心态 | 亚洲国产精品美女久久久久 | 亚洲aⅴ无码成人网站国产app | 国产人妖乱国产精品人妖 | aⅴ亚洲 日韩 色 图网站 播放 | 99久久精品国产一区二区蜜芽 | 成人亚洲精品久久久久软件 |