优惠券项目一
整體項目介紹 見 https://blog.csdn.net/wenjieyatou/article/details/80190886
分支一學習筆記:
分支1.1
1:解決優惠券編碼重復問題,原先采用的是獲取數據庫所有的券,然后去比對是否重復,如果庫數據量達百萬的時候就會出現非常緩慢,而且會出現經常制券失敗等,所以此版本舍棄原先采用隨機數的模式,通過推特的雪花算法來保證唯一,但是依然保留優惠券前綴和后綴。2:由原來的異步采用線程修改為線程池,以為高并發時候存在大量的線程占內存空間。
3:由原來制券采用for循環模式修改為批量制券,而且采用分配插入優惠券,一批次目前定為5000.
4:加入消息隊列(采用rabbitMQ)對于某一批次添加失敗,把失敗的放入對列中,通過隊列進行補救,已到達高可用。避免大批量優惠券來回重新導入消息隊列對于異常信息拒絕解決并重返消息隊列中,配置2個消費者以避免其中一個服務異常,消息處理出現死循環。
1,第一個問題,由于數據請求在百萬級別數據庫,所以可能造成查詢十分緩慢甚至出現超時請求問題。所以直接生成唯一的優惠券值即可。具體代碼如下:
import java.lang.reflect.Executable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;public class GenerateOnlyIdUtils {private final long twepoch = 1288834974657L;private final long workerIdBits = 5L;private final long datacenterIdBits = 5L;private final long maxWorkerId = -1L ^ (-1L << workerIdBits);private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);private final long sequenceBits = 12L;private final long workerIdShift = sequenceBits;private final long datacenterIdShift = sequenceBits + workerIdBits;private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;private final long sequenceMask = -1L ^ (-1L << sequenceBits);private long workerId;private long datacenterId;private long sequence = 0L;private long lastTimestamp = -1L;public GenerateOnlyIdUtils(long workerId, long datacenterId) {if (workerId > maxWorkerId || workerId < 0) {throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));}if (datacenterId > maxDatacenterId || datacenterId < 0) {throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));}this.workerId = workerId;this.datacenterId = datacenterId;}public synchronized long nextId() {long timestamp = timeGen();if (timestamp < lastTimestamp) {throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));}//如果是同一時間序列 則時間序列內區別if (lastTimestamp == timestamp) {sequence = (sequence + 1) & sequenceMask;if (sequence == 0) {timestamp = tilNextMillis(lastTimestamp);}} else {sequence = 0L;}lastTimestamp = timestamp;return ((timestamp - twepoch) << timestampLeftShift) | (datacenterId << datacenterIdShift) | (workerId << workerIdShift) | sequence;}protected long tilNextMillis(long lastTimestamp) {long timestamp = timeGen();while (timestamp <= lastTimestamp) {timestamp = timeGen();}return timestamp;}protected long timeGen() {return System.currentTimeMillis();}public static void main(String args[]) {final GenerateOnlyIdUtils idGe = new GenerateOnlyIdUtils(1, 1);//線程池并行執行100次全局ID生成ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < 100; i++) {executorService.execute(new Runnable() {@Overridepublic void run() {long id = idGe.nextId();System.out.println(id);}});}executorService.shutdown();} }2:由原來的異步采用線程修改為線程池,以為高并發時候存在大量的線程占內存空間。
線程池的自定義可以參考以下代碼為:
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) //package org.springframework.scheduling.concurrent;import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.core.task.TaskRejectedException; import org.springframework.scheduling.SchedulingTaskExecutor; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureTask;public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {private final Object poolSizeMonitor = new Object();//個人覺得這個地方的線程數量設置不是很對 大家有意見可以評論。感覺就只有一個活動線程(corePoolSize),就失去了線程池的意義。private int corePoolSize = 1;private int maxPoolSize = 2147483647;private int keepAliveSeconds = 60;private int queueCapacity = 2147483647;private boolean allowCoreThreadTimeOut = false;private ThreadPoolExecutor threadPoolExecutor;public ThreadPoolTaskExecutor() {}public void setCorePoolSize(int corePoolSize) {Object var2 = this.poolSizeMonitor;synchronized(this.poolSizeMonitor) {this.corePoolSize = corePoolSize;if (this.threadPoolExecutor != null) {this.threadPoolExecutor.setCorePoolSize(corePoolSize);}}}public int getCorePoolSize() {Object var1 = this.poolSizeMonitor;synchronized(this.poolSizeMonitor) {return this.corePoolSize;}}public void setMaxPoolSize(int maxPoolSize) {Object var2 = this.poolSizeMonitor;synchronized(this.poolSizeMonitor) {this.maxPoolSize = maxPoolSize;if (this.threadPoolExecutor != null) {this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);}}}public int getMaxPoolSize() {Object var1 = this.poolSizeMonitor;synchronized(this.poolSizeMonitor) {return this.maxPoolSize;}}public void setKeepAliveSeconds(int keepAliveSeconds) {Object var2 = this.poolSizeMonitor;synchronized(this.poolSizeMonitor) {this.keepAliveSeconds = keepAliveSeconds;if (this.threadPoolExecutor != null) {this.threadPoolExecutor.setKeepAliveTime((long)keepAliveSeconds, TimeUnit.SECONDS);}}}public int getKeepAliveSeconds() {Object var1 = this.poolSizeMonitor;synchronized(this.poolSizeMonitor) {return this.keepAliveSeconds;}}public void setQueueCapacity(int queueCapacity) {this.queueCapacity = queueCapacity;}public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;}protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity);ThreadPoolExecutor executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);if (this.allowCoreThreadTimeOut) {executor.allowCoreThreadTimeOut(true);}this.threadPoolExecutor = executor;return executor;}protected BlockingQueue<Runnable> createQueue(int queueCapacity) {return (BlockingQueue)(queueCapacity > 0 ? new LinkedBlockingQueue(queueCapacity) : new SynchronousQueue());}public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");return this.threadPoolExecutor;}public int getPoolSize() {return this.threadPoolExecutor == null ? this.corePoolSize : this.threadPoolExecutor.getPoolSize();}public int getActiveCount() {return this.threadPoolExecutor == null ? 0 : this.threadPoolExecutor.getActiveCount();}public void execute(Runnable task) {ThreadPoolExecutor executor = this.getThreadPoolExecutor();try {executor.execute(task);} catch (RejectedExecutionException var4) {throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);}}public void execute(Runnable task, long startTimeout) {this.execute(task);}public Future<?> submit(Runnable task) {ThreadPoolExecutor executor = this.getThreadPoolExecutor();try {return executor.submit(task);} catch (RejectedExecutionException var4) {throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);}}public <T> Future<T> submit(Callable<T> task) {ThreadPoolExecutor executor = this.getThreadPoolExecutor();try {return executor.submit(task);} catch (RejectedExecutionException var4) {throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);}}public ListenableFuture<?> submitListenable(Runnable task) {ThreadPoolExecutor executor = this.getThreadPoolExecutor();try {ListenableFutureTask<Object> future = new ListenableFutureTask(task, (Object)null);executor.execute(future);return future;} catch (RejectedExecutionException var4) {throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);}}public <T> ListenableFuture<T> submitListenable(Callable<T> task) {ThreadPoolExecutor executor = this.getThreadPoolExecutor();try {ListenableFutureTask<T> future = new ListenableFutureTask(task);executor.execute(future);return future;} catch (RejectedExecutionException var4) {throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);}}public boolean prefersShortLivedTasks() {return true;} }在制券過程中會涉及線程池的使用,具體代碼參考:
@Overridepublic int makingCoupon(Long vendorId, String taskCode) {try {final CpMakingTask tempMakingTask = makingTaskManager.getMakingTask(vendorId, taskCode);if (tempMakingTask == null) {log.error("沒有保存制券任務");return 2;}if (makingTaskManager.isRepeatByMaikingCoupon(vendorId, taskCode)) {log.error("制券正在進行,不要重復提交");return 3;}tempMakingTask.setState(SysConstants.CPMAKINGTASTSTATE.UNDERWAY);tempMakingTask.setTaskBeginTime(new Date());makingTaskDao.update(tempMakingTask);long start1 = System.currentTimeMillis();final List<Coupon> tempCoupons = getTempCoupons(tempMakingTask);long start2 = System.currentTimeMillis();log.info("生產優惠券消費時間:" + (start2 - start1) + "毫秒");if (CollectionUtils.isNotEmpty(tempCoupons)) {makingTaskManager.insertCacheByMakingConpon(vendorId, taskCode);taskExecutor.execute(new Runnable() {@Overridepublic void run() {boolean result = couponManager.insertCoupons(tempCoupons);if (result) {tempMakingTask.setState(SysConstants.CPMAKINGTASTSTATE.COMPLETED);tempMakingTask.setTaskEndTime(new Date());makingTaskDao.update(tempMakingTask);}}});}} catch (Exception e) {log.error("異步制券出現異常", e);return 0;}return 1;}3:由原來制券采用for循環模式修改為批量制券,而且采用分配插入優惠券,一批次目前定為5000.具體代碼參考:
這是起先采用for循環制券:
/*** 采用for循環插入(5000條數據大約耗時3000毫秒,無疑比for循環效率更佳)*/@Testpublic void insertCpActivityForBatchUnitTests(){long start=System.currentTimeMillis();CpActivity cpActivity = new CpActivity();cpActivity.setVendorId(1432L);List<CpApplyLimitdt> applyLimitdts=new ArrayList<>();List<CpUseLimitdt> useLimitdts=new ArrayList<>();for (int i=0;i<5000;i++){CpApplyLimitdt applyLimitdt=new CpApplyLimitdt();applyLimitdt.setId(1l);applyLimitdt.setVendorId(1433l);applyLimitdt.setCreateDate(new Date());applyLimitdt.setDetailName("ASCDFDRFCF");applyLimitdt.setOwnRecordCode(""+i);applyLimitdt.setDetailCode(""+i);applyLimitdt.setModifyDate(new Date());applyLimitdt.setMemo("備注");applyLimitdt.setApplyScopeType(1);applyLimitdts.add(applyLimitdt);}for (int i=0;i<5000;i++){CpUseLimitdt useLimitdt=new CpUseLimitdt();useLimitdt.setStoreName("加油站");useLimitdt.setCreator("lp");useLimitdt.setModifyDate(new Date());useLimitdt.setOwnRecordCode("12345678");useLimitdt.setOrganName(""+i);useLimitdt.setMemo("備注");useLimitdt.setOwnRecordType(1);useLimitdt.setVendorId(1433l);useLimitdt.setCreateDate(new Date());useLimitdt.setStoreCode(""+i);useLimitdts.add(useLimitdt);}boolean result = activityManager.insertCpActivity(cpActivity, applyLimitdts, useLimitdts);long end=System.currentTimeMillis();System.out.println("消耗的總時間:"+(end-start)+"毫秒");}//以5000記錄為標準 插入數據 數據庫操作是批量插入 @Overridepublic boolean insertCoupons(final List<Coupon> coupons) {final long start1 = System.currentTimeMillis();TransactionTemplate template = new TransactionTemplate(transactionManager);return template.execute(new TransactionCallback<Boolean>() {@Overridepublic Boolean doInTransaction(TransactionStatus transactionStatus) {try {if (coupons.size() <= 5000) {couponDao.insertBatchCoupons(coupons);}if (coupons.size() > 5000) {List<List<Coupon>> tempCoupons = ListUtil.splitList(coupons, 5000);for (List<Coupon> item : tempCoupons) {try {couponDao.insertBatchCoupons(item);} catch (Exception e) {String data = JsonUtil.g.toJson(item);mqSenderHandler.sendMessage("spring.makeCoupons.queueKey", data);continue;}}}long end1 = System.currentTimeMillis();log.info("添加" + coupons.size() + "張優惠券消耗時間:" + (end1 - start1) + "毫秒");return true;} catch (Exception e) {log.error("添加優惠券異常:" + e);transactionStatus.setRollbackOnly();return false;}}});}
關于對批量數據插入的實現還可以參考一下解釋,可以為以后的工作提供一定的思路。
大批量數據高效插入數據庫表
http://yakyang.com/?p=592題目:將一批10萬數據通過系統(Java語言)導入數據庫表,并要求實時看到導入的進度,請設計出系統方案,重點要考慮導入效率!
這個題場景很簡單!
題中提到了兩個重要需求:
1、實時看到導入的進度;
2、導入效率。
第一個需求可以簡單理解就是能夠看到導入過程。進度就是一個大概情況,并不是一個要求非常精準的數,但是如果做的很牛逼就是真實的反應成功導入的條數。這樣話實現的方案有大有不同了!第二個需求就是一個無限想象的需求!只要能提高導入效率的都可以寫入方案,看來是要看答題人的思考問題的廣度和技術經驗了。
下面是我的方案也并非完美!
1、 首先考慮10萬數據,是一個不小的數據量,題目沒有說數據存儲的介質,我就不考慮如何讀文件了。10萬數據如果按照一條一條導入肯定不是一個最優的方案,數據之間沒有相關聯,那就將數據分隔為1萬一批,將10萬數據分給10線程處理,這樣肯定要比一個線程導的快,這樣算是一個分布計算啦!呵呵!
2、 現在我們要考慮一個線程怎么導入快。怎么提高效率,一般有數據庫操作,就要重點考慮數據庫操作,此題是一個插入數據庫表。我們立刻想到:
for(Object o : list ){
??? insert into table value ( o );
}
這種顯然是拼接SQL一個條條插入數據表嘛!相信一般人都會這么想,這樣對嗎?肯定沒有問題,但是絕對不高效的,每次執行一次就要像數據庫傳SQL,數據庫編譯SQL,然后執行。我們是大批數據插入,應該使用批處理方式。
String sql = “insert into table(…) values (?, ?, ?)”;
Connection connection = new getConnection();
PreparedStatement ps = connection.prepareStatement(sql);
for (Object o: list) {
???ps.setString(1,o.get…);
???ps.setString(2, o.get…);
???ps.setString(3, o.get…);
???ps.addBatch();
}
ps.executeBatch();
ps.close();
connection.close();
這樣實現速度就比第一種要快很多,SQL語句也只需編譯一次。
3、 按照上面做還有問題嗎?顯然還不夠嘛!我們還沒有考慮事務!一萬條提交一次事務顯然不合理,對數據庫壓力很大,所以要控制事務的粗粒度,可以100條提交一次事務。
4、 接下來就要考慮數據庫了,我經常問大家?表建索引和沒有索引有啥區別?很多人都說建索引查詢速度快,其實還有一點就是插入慢了啊!所以插入的表單表不要建索引,有索引將索引刪掉,導入完后再建索引。
5、 高效插入應該差不多了,現在要考慮第二個需求—實時進度。很多人第一反應就是select count(1) from table,這樣存在一個問題就是查詢的頻率,頻率大數據就比較精確,如果頻率小數據就不準確,5分鐘查一次比1分鐘獲得數據要差很大,這個方案顯然不算很合理了。那就在內存設置一個變量n,實時保存插入成功數量,但是根據1點說多線程同時插入,那就有可能兩個線程同時加,造成數據也不準確了,那么我們就要考慮對n加鎖啦!
到此我們的方案就算完了!是不是最優的?肯定還有改進的空間,但是這樣夠了。4:加入消息隊列(采用rabbitMQ)對于某一批次添加失敗,把失敗的放入對列中,通過隊列進行補救,已到達高可用。避免大批量優惠券來回重新導入消息隊列對于異常信息拒絕解決并重返消息隊列中,配置2個消費者以避免其中一個服務異常,消息處理出現死循環。
這個地方我沒看懂 因為還不了解消息隊列機制。不過可以參考下制券的代碼:
消費者1:
消費者2:
package com.peiyu.mem.rabbitmq.consumers;import com.google.gson.reflect.TypeToken; import com.migr.common.util.JsonUtil; import com.migr.common.util.StringUtils; import com.peiyu.mem.dao.CouponDao; import com.peiyu.mem.domian.entity.Coupon; import com.peiyu.mem.rabbitmq.Gson2JsonMessageConverter; import com.rabbitmq.client.Channel; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import java.util.List;/*** Created by Administrator on 2016/12/12.*/ @Component public class MakeCouponsHandler2 implements ChannelAwareMessageListener {private Logger log = Logger.getLogger(MakeCouponsHandler2.class);@Autowiredprivate CouponDao couponDao;@Autowiredprivate Gson2JsonMessageConverter jsonMessageConverter;@Overridepublic void onMessage(Message message, Channel channel) throws Exception {try {channel.basicQos(1);if (message == null || message.getBody() == null) {return;}String data = jsonMessageConverter.fromMessage(message).toString();if (StringUtils.isNotBlank(data)) {List<Coupon> coupons = JsonUtil.g.fromJson(data, new TypeToken<List<Coupon>>() {}.getType());couponDao.insertBatchCoupons(coupons);channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);}} catch (Exception e) {channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);log.error("消息隊列處理制券異常:" + e);}} } 這個分支大體就是這些內容。總結
- 上一篇: 为了理想,因为爱情-开课第一天有感(鸡汤
- 下一篇: 跟着架构师学习大型网站架构的技术细节:前