推荐系统(工程方向)-策略平台
生活随笔
收集整理的這篇文章主要介紹了
推荐系统(工程方向)-策略平台
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一、背景
假設某個app的首頁推薦,有2個策略產品經理、6個算法RD、2個工程RD協同工作,如果沒有一套可視化的策略執行流程,溝通將非常低效,且對接容易出錯,達不到想要的效果。
其次推薦系統一般可抽象出幾個核心流程:
這些核心流程可抽成公共組件,配置在流程中,減少很多工作量。
二、方案設計
1、設計思路
- 使用DAG(有向無環圖)構建用戶請求的處理模塊(模塊=DAG節點=策略)
- 使用一個數據流對象(DataFlow),串聯DAG所有節點
- 使用鎖控制并發
- 通過UI修改DAG配置,近實時生效
- 支持同一個DAG中多個同名模塊調用
- 支持子DAG調用
2、設計圖
簡易架構圖
DAG編輯頁面效果圖
3、核心代碼
Node類
@Setter @Getter @ToString(exclude = {"nextEdges"}) public class Node {//構圖相關private String key;// 節點標識private List<Edge> nextEdges; //依賴它的節點private int inDegree = 0;// 該節點的入度private int inDegreeCheck = 0;// 用于使用前檢測,特別注意private GraphModel graphModel;//模型接口,在執行時,把內存中實例化好的類裝入。為null,表示忽略此節點執行public Node(String key) {this.key = key;nextEdges = new LinkedList<Edge>();}public int addInDegree() {return inDegree = inDegree + 1;}public int decreaseInDegree() {return inDegree = inDegree - 1;}public int addInDegreeCheck() {return inDegreeCheck = inDegreeCheck + 1;}public int decreaseInDegreeCheck() {return inDegreeCheck = inDegreeCheck - 1;}}Edge類
@Setter @Getter @ToString public class Edge {private Node endNode;public Edge(Node endNode) {this.endNode = endNode;} }DirectedGraph類
/** 有向無環圖*/ @Setter @Getter public class DirectedGraph {private Map<String, Node> directedGraphMap;public DirectedGraph(String graphContent) {directedGraphMap = new LinkedHashMap<String, Node>();buildGraph(graphContent);}//構圖 解析文件private void buildGraph(String graphContent) {graphContent = graphContent.replaceAll("\n", "`").replaceAll("\\s*", "").replaceAll("`", "\n");String[] lines = graphContent.split("\n");Node startNode, endNode;String startNodeLabel, endNodeLabel;Edge edge;for (int i = 0; i < lines.length; i++) {String[] nodesInfo = lines[i].split(",");if (nodesInfo.length != 2) {throw new RuntimeException((i + 1) + "行包含" + nodesInfo.length + "節點,每行只能2個節點!");}startNodeLabel = nodesInfo[0];endNodeLabel = nodesInfo[1];startNode = directedGraphMap.get(startNodeLabel);if (startNode == null) {startNode = new Node(startNodeLabel);directedGraphMap.put(startNodeLabel, startNode);}endNode = directedGraphMap.get(endNodeLabel);if (endNode == null) {endNode = new Node(endNodeLabel);directedGraphMap.put(endNodeLabel, endNode);}edge = new Edge(endNode);//每讀入一行代表一條邊startNode.getNextEdges().add(edge);//每讀入一行數據,起始頂點添加一條邊endNode.addInDegree();//每讀入一行數據,終止頂點入度加1endNode.addInDegreeCheck();}}/*** 判斷圖是否規范** @return*/public boolean validate() {int count = 0;//初始化隊列Queue<Node> queue = new LinkedList<>();// 拓撲排序中用到的棧,也可用隊列.//掃描所有的頂點,將入度為0的頂點入隊列Collection<Node> nodes = directedGraphMap.values();for (Node node : nodes)if (node.getInDegreeCheck() == 0) {queue.offer(node);}//執行算法,維護隊列while (!queue.isEmpty()) {Node vistNode = queue.poll();//統計已執行的節點個數count++;//判斷依賴他的節點入度是否為零for (Edge edge : vistNode.getNextEdges()) {if (edge.getEndNode().decreaseInDegreeCheck() == 0)queue.offer(edge.getEndNode());}}if (count != nodes.size())return false;return true;} }GraphModelExecutor(執行引擎execute方法,廣度遍歷dag)
public GraphState execute(final DataFlow dataFlow) {//dag狀態記錄類final GraphState graphState = new GraphState();Stopwatch stopwatch = Stopwatch.createStarted();//abtest攔截器aBtestInterceptor.interceptor(null, dataFlow);//業務封裝project類(業務code+場景code+abtest可定位到project)final Project project = projectManager.getProjectMap().get(dataFlow.getProjectKey());Preconditions.checkNotNull(project, dataFlow.getProjectKey() + " has no project!");//pageSize攔截pageSizeInterceptor.interceptor(project, dataFlow);//count攔截,注入冗余設計countInterceptor.interceptor(project, dataFlow);//itemType攔截itemTypeInterceptor.interceptor(project, dataFlow);final Queue<List<Node>> queue = new LinkedBlockingQueue<>();// 拓撲排序中用到的棧,也可用隊列DirectedGraph directedGraph = new DirectedGraph(project.getDagScript());graphState.setDagScript(project.getDagScript());//先記錄圖腳本if (!directedGraph.validate()) {log.error("【{}】requestId={}`userId={},deviceId={},dagScript={},directedGraph is not right!", dataFlow.getProjectKey(), dataFlow.getRequestId(), dataFlow.getUserId(), dataFlow.getDeviceId(), project.getDagScript());return graphState;}//獲取所有模型,并實例化類Collection<Node> nodes = directedGraph.getDirectedGraphMap().values();for (Node node : nodes) {//1、是普通模塊 2、是DAG圖模塊【注意:如果先提交了圖腳本,代碼忘記提交,找不到model,忽略并報警】GraphModel graphModel;if (GraphModelUtil.isDagModel(node.getKey())) {graphModel = graphModelManager.getModel(GraphModelManager.getSimpleName(DAGModel.class.getSimpleName()));String bzCodeSceneCode = node.getKey().replaceAll("\\$", "");if (bzCodeSceneCode.equals(dataFlow.getBzCodeSceneCodeKey())) {log.error("構圖錯誤,不能把自己作為一個圖模塊={}", node.getKey());return graphState;}} else if (BaseCommonUtil.isMultiInstanceModel(node.getKey())) {//ModelA:a獲取ModelAString[] modelKeyArr = node.getKey().split(BaseConsts.modelInstanceSpliter, 2);graphModel = graphModelManager.getModel(modelKeyArr[0]);} else {graphModel = graphModelManager.getModel(node.getKey());}if (graphModel != null) {//判斷當前模塊熔斷狀態if (checkModelStat(project, node.getKey())) {node.setGraphModel(graphModel);}} else {String env = projectManager.getEnv().endsWith(Consts.env.pre) ? "預發" : "線上";log.error("className=" + node.getKey() + " has no model!");CommonUtil.sendWarningMsg(env + "環境," + node.getKey() + "模塊異常", "推薦平臺", HttpWechat.INFO_LEVEL_ERROR, dataFlow.getProjectKey() + "的" + node.getKey() + "模塊未找到實現類,可能代碼未提交或模塊實例化異常!", "");}}//掃描所有的頂點,將入度為0的頂點入隊列(可能有多個頂點)List<Node> firstNodeList = new ArrayList<>();for (Node node : nodes) {if (node.getInDegree() == 0) {//最開始的任務,入隊firstNodeList.add(node);}}queue.offer(firstNodeList);//執行算法,維護隊列while (!queue.isEmpty()) {final List<Node> visitNodeList = queue.poll();if (visitNodeList == null || visitNodeList.size() == 0) {log.error("【{}】requestId={}`userId={},deviceId={} 該層沒有任何節點!", dataFlow.getProjectKey(), dataFlow.getRequestId(), dataFlow.getUserId(), dataFlow.getDeviceId());continue;}//主線程中add下一次運行的nodeListsetNextNodeList(visitNodeList, queue);final boolean isParallel = visitNodeList.size() > 1 ? true : false;//單節點用于日志區分(不加不易區分)if (isParallel) {if (Config.getConfig().getParallelWay() == 1) {countDownLatch(project, visitNodeList, dataFlow, isParallel, graphState);} else if (Config.getConfig().getParallelWay() == 2) {completableFuture(project, visitNodeList, dataFlow, isParallel, graphState);} else {completeService(project, visitNodeList, dataFlow, isParallel, graphState);}} else {Node node = visitNodeList.get(0);long nodeTimeout = getTimeout(project, node.getKey());dealOneNode(project, visitNodeList.get(0), dataFlow, isParallel, graphState, nodeTimeout, true);}}//preject執行完,設置item_typeitemTypeAfterInterceptor.interceptor(project, dataFlow);recordFusingMsgByGraphState(project, graphState);log.info("【{}】requestId={}`userId={},deviceId={},executed all_nodes spends={}", project.getProjectKey(), dataFlow.getRequestId(), dataFlow.getUserId(), dataFlow.getDeviceId(), stopwatch.elapsed(TimeUnit.MILLISECONDS));return graphState;}DataFlow類
/*** 數據流對象*/ @Setter @Getter @ToString(exclude = {"rwLock", "version", "filterList", "rcmdList", "userProf"}) @Slf4j public class DataFlow {/***必傳字段***/private final List<Long> filterList = new FilterCollection<>(); //過濾列表private ReadWriteLock rwLock = new ReentrantReadWriteLock();//對于有競態條件的使用鎖private AtomicLong version = new AtomicLong();//記錄當前版本號/***必傳字段***/private long userId;private String deviceId;private int vercode;private String deviceType;private int appID;private long subAppId;private String bzCode;//業務對應的實體entityprivate String sceneCode;//用戶所在場景/頁面private int pageNo;//當前頁碼private int pageSize;//一頁數據private int count;//pageNumber+more_count(冗余條數,為了更好效果,額外加多的召回量)private List<ItemScore> rcmdList = new ArrayList<>(); //推薦數據private UserProfile userProf = new UserProfile(); //用戶畫像private UserProfile njUserProf; //主播側的用戶畫像private List<ItemProfile> itemExpo; //當前曝光的ITEM(如果有)private String abtest;//ab標識private boolean isDag;//標記是dag圖調用,只允許調用基礎組(A)private Map<String, Object> extraMap = new HashMap<>(); //擴展數據,模塊間傳輸數據使用private JSONObject extraJson = new JSONObject();//擴展對象,用來接收協議中extra json傳參private JSONObject resultExtraJson = new JSONObject();//擴展對象,用來接整個請求,與List<RcmdItem>同級別的擴展結果,比如直播dataversiontimeprivate boolean isOriginal;//是否只返回原生數據,比如首頁播單接口返回播單+專題+...但是有些情況下只需要返回播單private int itemType;private String requestId;//請求唯一id,用于日志排查private String hitExps;//abtest平臺命中的所有實驗,eg:exp1|exp2|exp3private Map<String, String> expParamMap = new HashMap<>();//只讀,實驗平臺獲取到的配置,key=modelName//存在并發讀寫,實驗平臺:按條件處理之后,如果模塊不啟用,請add進來private Set<String> disableModelSet = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());private String baseInfo = null;public String version() {StringBuilder sb = new StringBuilder();sb.append(toBaseInfo()).append("`version=").append(this.getVersion().incrementAndGet()).append("`filterList=").append(filterList.size()).append("`rcmdList=").append(rcmdList.size());return sb.toString();}/*** 與project配置相對應,用于每個請求,找到對應的project** @return*/public String getProjectKey() {String projectKey = bzCode + "_" + sceneCode;if (abtest != null && abtest.trim().length() > 0) {projectKey = bzCode + "_" + sceneCode + "_" + abtest;}return projectKey;}/*** 打印基礎數據** @return*/public String toBaseInfo() {if (baseInfo == null) {String sb = "userid=" + userId +"`deviceid=" + deviceId +"`projectKey=" + getProjectKey() +"`pageNo=" + pageNo +"`pageSize=" + pageSize +"`hitExps=" + hitExps +"`requestId=" + requestId;baseInfo = sb;}return baseInfo;}public void addExtraMap(String key, Object val) {if (extraMap.containsKey(key)) {log.warn("key already exist`key={}`oldVal={}`newVal={}", key, extraMap.get(key), val);}extraMap.put(key, val);}public <T> T getFromExtraMap(String key) {Object value = extraMap.get(key);if (value == null) {return null;}return (T) value;}public <T> T getFromExtraMap(String key, T defaultValue) {Object value = extraMap.get(key);if (value == null) {return defaultValue;}return (T) value;}/*** bzCode_sceneCode代表一個項目,abtest代表這個項目做了ab測試** @return*/public String getBzCodeSceneCodeKey() {return bzCode + "_" + sceneCode;}public List<Long> getItemIds() {List<Long> result = new ArrayList<>();for (ItemScore itemScore : rcmdList) {result.add(itemScore.getItemId());}return result;}public void setFilterList(List<Long> list) {filterList.clear();filterList.addAll(list);}/*** DAGModel模塊使用,bzCode和sceneCode和pageSize在模塊中注入* 此處只是為了構建查詢條件,萬不可 setRcmdList** @return*/public DataFlow copyForDagModel() {DataFlow dataFlow = new DataFlow();dataFlow.setRequestId(requestId);dataFlow.setUserId(userId);dataFlow.setDeviceId(deviceId);dataFlow.setVercode(vercode);dataFlow.setDeviceType(deviceType);dataFlow.setVercode(vercode);dataFlow.setPageNo(pageNo);dataFlow.setPageSize(pageSize);dataFlow.setCount(pageSize);dataFlow.setExtraJson(extraJson);//有可能用到這個dataFlow.setDag(true);//標記是dag圖調用dataFlow.setItemExpo(itemExpo);dataFlow.setExtraMap(extraMap);dataFlow.setFilterList(new FilterCollection(filterList));if (this.userProf != null) {UserProfile userProfile = new UserProfile(new ConcurrentHashMap<>(this.userProf.getProperty()), new ConcurrentHashMap<>(this.userProf.getOriginalProperty()));userProfile.setUserType(this.userProf.getUserType());dataFlow.setUserProf(userProfile);}if (this.njUserProf != null) {UserProfile njUserProfile = new UserProfile(new ConcurrentHashMap<>(this.njUserProf.getProperty()), new ConcurrentHashMap<>(this.njUserProf.getOriginalProperty()));dataFlow.setNjUserProf(njUserProfile);}//分層實驗參數傳遞dataFlow.setHitExps(hitExps);if (expParamMap != null) {dataFlow.setExpParamMap(new HashMap<>(expParamMap));}return dataFlow;}/*** 判斷是否命中某個實驗平臺的實驗** @param exps* @return*/public boolean containsHitExps(String... exps) {if (StringUtils.isBlank(hitExps)) {return false;}for (String exp : exps) {if (("|" + hitExps + "|").contains("|" + exp + "|")) {return true;}}return false;}/*** 獲取當前用戶的平臺信息** @return*/public String getUserPlatform() {if (deviceType.startsWith("Android")) {return PLATFORM_ANDROID;} else if (deviceType.startsWith("IOS")) {return PLATFORM_IOS;} else if (deviceType.startsWith("h5-web")) {return PLATFORM_H5;} else {return PLATFORM_UNKNOW;}}public synchronized void addRecallProfile(String key, Object value) {UserProfile recallProfile = getFromExtraMap("RECALL_PROFILE");if (recallProfile == null) {recallProfile = new UserProfile();addExtraMap("RECALL_PROFILE", recallProfile);}recallProfile.setProperty(key, value);}public synchronized <T> T getRecallProfile(String key, T defaultValue) {UserProfile recallProfile = getFromExtraMap("RECALL_PROFILE");if (recallProfile == null) {return null;}Object value = recallProfile.getProperty(key);if (value == null) {return defaultValue;}return (T) value;}public synchronized <T> T getRecallProfile(String key) {return getRecallProfile(key, null);}/*** 添加Map類型的畫像** @param key 畫像的key* @param value 畫像的值,其中key為種子的值,value為該種子對應的分數*/public void addRecallProfileMap(String key, Map<String, Float> value) {addRecallProfile(key, value);}/*** 添加JsonObject類型的召回畫像** @param key 畫像的key* @param value 畫像的值,格式:{"seed1":0.2,"seed2":0.1}* 其中key為種子的值,value為該種子對應的分數*/public void addRecallProfileJSONObject(String key, JSONObject value) {addRecallProfile(key, value);}/*** 添加JsonArray類型的召回畫像** @param key 畫像的key* @param value 畫像的值,格式:[{"key1":"value1","key2":"value2"}]*/public void addRecallProfileJSONArray(String key, JSONArray value) {addRecallProfile(key, value);}/*** 添加向量類型的召回畫像** @param key 畫像的key* @param value 畫像的值,格式:[0.1,-0.2,0.3,0.4]*/public void addRecallProfileVector(String key, String value) {addRecallProfile(key, value);} }?
總結
以上是生活随笔為你收集整理的推荐系统(工程方向)-策略平台的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: CRM、DMP、CDP的区别
- 下一篇: 基于XGBoost的PU-Learnin