支付通道接口异常统计上报
生活随笔
收集整理的這篇文章主要介紹了
支付通道接口异常统计上报
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
文章目錄
- 接口調用失敗的上報類ReporterUtils :
- 報告失敗信息ReportInfo類:
- 監聽MQ,接收失敗報告MsgListener:
- 處理通道降級的服務類ChannelQualityService:
支付中心對接第三方通道時,會遇到第三方接口不穩定導致無法支付的問題,這就需要有個失敗統計功能,可以根據預定的閾值自動切換支付通道。
接口調用失敗的上報類ReporterUtils :
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;/*** 日志上報類。* 調用第三方通道的接口異常時,上報。* <p>* 維護一個內存隊列msgQueue,日志先放入隊列,然后由后臺線程池推送給MQ。*/ @Component public class ReporterUtils {private static final Logger log = LogManager.getLogger(ReporterUtils.class);@Autowiredprivate ReportConfig config;@Autowiredprivate MsgProducer msgProducer;private static LinkedBlockingQueue<ReportInfo> msgQueue;private static ExecutorService executorService;@PostConstructprivate void init() {log.debug("----------------- reporter init ------");msgQueue = new LinkedBlockingQueue<ReportInfo>(config.getQueueMaxSize());// 添加處理線程executorService = new ThreadPoolExecutor(config.getWorkerNum(), config.getWorkerNum(), Constant.EXECUTOR_KEEP_ALIVE_TIME, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(Constant.EXECUTOR_WORK_QUEUE_SIZE), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = Executors.defaultThreadFactory().newThread(r);t.setDaemon(true);return t;}}, new ThreadPoolExecutor.AbortPolicy());for (int i = 0; i < config.getWorkerNum(); i++) {executorService.submit(new Worker());}}private ReporterUtils() {}/*** 上報線程。*/class Worker implements Runnable {/*** 持續掃描內存隊列中的日志并上報給MQ。*/@Overridepublic void run() {try {while (true) {List<String> batchReportInfos = new ArrayList(config.getBatchSize());ReportInfo reportInfo;for (int j = 0; j < config.getBatchSize(); j++) {/*獲取并移除此隊列的頭部,如果沒有元素則等待(阻塞),直到有元素將喚醒等待線程執行該操作*/reportInfo = msgQueue.take();batchReportInfos.add(reportInfo.toString());}String jsonBatchReportInfos = JacksonHelper.toJsonString(batchReportInfos);log.debug("批量上報通訊異常日志:" + jsonBatchReportInfos);//失敗報告通過MQ發送給MsgListener類處理(也可以改為接口調用方式)msgProducer.send(Topic.HTTP_FAIL, jsonBatchReportInfos);sleep(10_000L);}} catch (Exception ex) {log.error("report fail. ", ex);}}}/*** 調用第三方接口失敗后,調用此方法,報告失敗。** @param reportInfo* @return*/public boolean httpFail(ReportInfo reportInfo) {if (null != reportInfo) {return msgQueue.offer(reportInfo);}return false;}public static String getCurrentTimestampS() {//精確到秒return TimeUtil.getCurrentDateTime();}public static long getCurrentTimestampMs() {//精確到毫秒return System.currentTimeMillis();}private void sleep(Long millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {//catched}}}報告失敗信息ReportInfo類:
public class ReportInfo {private static final String SEPARATOR = ",";private String url; //請求地址private String orderId; // 交易的標識,比如訂單號private String channel; //支付通道private String currentTime; // 上報時的時間戳,單位秒private Long elapsedTimeMillis; // 實際請求耗時,單位毫秒private Integer connectTimeoutMillis; // 請求時設置的連接超時時間,單位毫秒private Integer readTimeoutMillis; // 請求時設置的服務端返回超時時間,單位毫秒private Integer hasDnsError; // 是否DNS解析錯誤,或域名(IP)不存在private Integer hasConnectTimeout; // 是否連接超時private Integer hasReadTimeout; // 是否服務端返回超時private Integer hasUnknownException; //其他未知的錯誤public ReportInfo() {}public ReportInfo(String url, String orderId, String channel,String currentTime, Long elapsedTimeMillis,Integer connectTimeoutMillis, Integer readTimeoutMillis,Boolean hasDnsError, Boolean hasConnectTimeout, Boolean hasReadTimeout,Boolean hasUnknownException) {this.orderId = orderId;this.channel = channel;this.currentTime = currentTime;this.elapsedTimeMillis = elapsedTimeMillis;this.url = url;this.connectTimeoutMillis = connectTimeoutMillis;this.readTimeoutMillis = readTimeoutMillis;this.hasDnsError = hasDnsError ? 1 : 0;this.hasConnectTimeout = hasConnectTimeout ? 1 : 0;this.hasReadTimeout = hasReadTimeout ? 1 : 0;this.hasUnknownException = hasUnknownException ? 1 : 0;}@Overridepublic String toString() {return JacksonHelper.toJsonString(this);}/*** 轉換成 csv 格式。* 每條記錄轉換為一行,都好分割。** @return*/public String toLineString() {Object[] objects = new Object[]{url, orderId, channel,currentTime, elapsedTimeMillis,connectTimeoutMillis, readTimeoutMillis,hasDnsError, hasConnectTimeout, hasReadTimeout, hasUnknownException};StringBuffer sb = new StringBuffer();for (Object obj : objects) {sb.append(obj).append(SEPARATOR);}try {return sb.toString();} catch (Exception ex) {return null;}}public String getUrl() {return url;}public void setUrl(String url) {this.url = url;}public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getChannel() {return channel;}public void setChannel(String channel) {this.channel = channel;}public String getCurrentTime() {return currentTime;}public void setCurrentTime(String currentTime) {this.currentTime = currentTime;}public Long getElapsedTimeMillis() {return elapsedTimeMillis;}public void setElapsedTimeMillis(Long elapsedTimeMillis) {this.elapsedTimeMillis = elapsedTimeMillis;}public Integer getConnectTimeoutMillis() {return connectTimeoutMillis;}public void setConnectTimeoutMillis(Integer connectTimeoutMillis) {this.connectTimeoutMillis = connectTimeoutMillis;}public Integer getReadTimeoutMillis() {return readTimeoutMillis;}public void setReadTimeoutMillis(Integer readTimeoutMillis) {this.readTimeoutMillis = readTimeoutMillis;}public Integer getHasDnsError() {return hasDnsError;}public void setHasDnsError(Integer hasDnsError) {this.hasDnsError = hasDnsError;}public Integer getHasConnectTimeout() {return hasConnectTimeout;}public void setHasConnectTimeout(Integer hasConnectTimeout) {this.hasConnectTimeout = hasConnectTimeout;}public Integer getHasReadTimeout() {return hasReadTimeout;}public void setHasReadTimeout(Integer hasReadTimeout) {this.hasReadTimeout = hasReadTimeout;}public Integer getHasUnknownException() {return hasUnknownException;}public void setHasUnknownException(Integer hasUnknownException) {this.hasUnknownException = hasUnknownException;} }監聽MQ,接收失敗報告MsgListener:
@Component public class MsgListener {private static final Logger logger = LogManager.getLogger(MsgListener.class);@Autowiredprivate QualityService channelQualityService;@RabbitListener(queues = MqConstant.QUEUE_HTTP_FAIL, priority = "10")public void onHttpFail(final String content) {logger.debug("接收到上報的通訊異常日志:" + content);List<String> reportList;try {reportList = JacksonHelper.parseJson(content, ArrayList.class);} catch (DataParseException e) {logger.error("解析通訊異常日志失敗:" + content, e);return;}for (String jsonReport : reportList) {try {ReportInfo reportInfo = JacksonHelper.parseJson(jsonReport, ReportInfo.class);channelQualityService.onHttpFail(reportInfo);} catch (Exception e) {logger.error("處理上通訊異常日志失敗", e);}}} }處理通道降級的服務類ChannelQualityService:
關鍵點是redis執行lua腳本,實現通道降級,算法參考的是spring-cloud熔斷器的思路:
/*** 支付通道 質量服務(QoS) 類。* 處理通道降級。* <p>* 統計各通道的失敗次數,根據頻率(某個時間段)、失敗類型(UnknownHostException, ConnectTimeoutException,SocketTimeoutException,Exception)等,* 確定是否需要觸發通道降級。** @author machunlin* @date 2018/4/28*/ @Service public class ChannelQualityServiceImpl implements QualityService {private static final Logger logger = LogManager.getLogger(ChannelQualityServiceImpl.class);@Autowiredprivate RedisTemplate<String, String> redisTemplate;/*** 是否開啟通道降級功能。*/@Value("${channel-downgraded.enabled}")private Boolean enabled;/*** 統計時長的滾動窗口,默認一分鐘。* 即:統計一分鐘之內所有錯誤請求的總次數。*/@Value("${channel-downgraded.rollingstats-window-seconds}")private Integer rollingstatsWindowSeconds;/*** 錯誤次數。* 超過此閥值則觸發降級。*/@Value("${channel-downgraded.error-threshold}")private Integer errorThreshold;/*** 觸發短路的時長。* 即:在指定時長內,保持“已降級”狀態。*/@Value("${channel-downgraded.sleep-window-seconds}")private Integer sleepWindowSeconds;private static DefaultRedisScript<String> errorCountScript = new DefaultRedisScript<>();private static final StringBuilder luaIncrExpire = new StringBuilder(500);private static DefaultRedisScript<Long> channelDowngradedScript = new DefaultRedisScript<>();private static final StringBuilder luaSAddExpire = new StringBuilder(200);static {/*** 錯誤次數統計:* KEYS[1] = channel* KEYS[2] = 統計時長的滾動窗口, 默認為1分鐘* KEYS[3] = 錯誤次數(閥值),默認4次*/luaIncrExpire.append(" local errorCount").append(" errorCount = redis.call('INCR', KEYS[1])").append(" errorCount = tonumber(errorCount)").append(" if errorCount == 1 then"). //如果是第一次報錯,將設置key值的有效期append(" redis.call('EXPIRE',KEYS[1], KEYS[2])").append(" return 'done: errorCount == '..errorCount").append(" elseif(errorCount <= tonumber(KEYS[3])) then").append(" return 'done: errorCount is '..errorCount").append(" else").append(" return 'downgraded on trigger: errorCount is '..errorCount").append(" end");errorCountScript.setScriptText(luaIncrExpire.toString());errorCountScript.setResultType(String.class);/*** 降級通道新增channel值。* (使用SET集合,value重復的會被忽略):* KEYS[1] = key* KEYS[2] = value* KEYS[3] = ttl seconds**/luaSAddExpire.append(" local result").append(" result = redis.call('SADD', KEYS[1],KEYS[2])").append(" if(result>0) then"). // 返回0表示value重復,不作處理append(" redis.call('EXPIRE',KEYS[1],KEYS[3])").append(" end").append(" return result");channelDowngradedScript.setScriptText(luaSAddExpire.toString());channelDowngradedScript.setResultType(Long.class);}/*** 失敗處理** @param reportInfo*/@Overridepublic void onHttpFail(final ReportInfo reportInfo) {if (!enabled) {logger.warn("通道質量服務(QoS)未開啟");return;}handleDowngraded(reportInfo);}/*** 獲取全部已降級通道** @return*/@Overridepublic Set<String> getDowngraded() {Set<String> channelsDowngraded = redisTemplate.opsForSet().members(Constant.CACHE_KEY_CHANNEL_DOWN);return channelsDowngraded;}/*** 處理通道降級。** @param reportInfo*/private void handleDowngraded(final ReportInfo reportInfo) {List<String> keysCount = new ArrayList<>();keysCount.add(Constant.CACHE_KEY_PREFIX_CHANNEL_ERR_COUNT + reportInfo.getChannel());keysCount.add(rollingstatsWindowSeconds + ""); // 統計時長的滾動窗口keysCount.add(errorThreshold + ""); // 錯誤閥值String resp = redisTemplate.execute(errorCountScript, keysCount);logger.debug("失敗請求已提交 : " + resp);if (resp.startsWith("downgraded on trigger")) {/*** 目前“被降級的通道”是直接放到redis中,集群中的其他PayService向redis查詢。* 后期可以優化:* 1、已降級的通道,放到本機內存中,然后廣播通知集群內的其他服務器更新;* 2、通道路由時,直接取本機內存中的數據。* 3、本機開啟一個后臺線程,監聽"通道降級"的廣播,并更新內存數據。*/List<String> keysChannelDown = new ArrayList<>();keysChannelDown.add(Constant.CACHE_KEY_CHANNEL_DOWN); // keykeysChannelDown.add(reportInfo.getChannel()); // valuekeysChannelDown.add(sleepWindowSeconds + ""); // 有效期Long result = redisTemplate.execute(channelDowngradedScript, keysChannelDown);if (result > 0) {logger.debug("{}通道被降級,降級時長為{}秒", reportInfo.getChannel(), sleepWindowSeconds);}}}} 與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的支付通道接口异常统计上报的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 中国十大名画,你知道几幅?(附高清全图)
- 下一篇: springboot项目集成docker