GraphSAGE 模型解读与tensorflow2.0代码实现
上一篇文章,我們講了圖神經網絡:圖卷積神經網絡(GCN)理解與tensorflow2.0代碼實現,也提到GCN的缺點:GCN屬于直推式學習(Transductive Learning),它在固定的圖上學習每個節點的embedding表示,不能泛化到未加入到圖中的節點,一旦圖的結構發生改變,就需要重新訓練整個網絡。
本文提到的 GraphSAGE 屬于歸納學習(Inductive Learning),核心是學習聚合鄰居節點信息的函數(聚合函數),然后利用聚合函數來得到一個節點的表示。即使圖中加入了新的節點,建立了新的邊,那么也可以借助鄰居節點信息與聚合函數得到新節點的向量表示。
前向傳播
下圖展示了 GraphSAGE 學習目標節點(中心的紅色節點) embedding 的過程:
上圖展示了GraphSage的兩個核心思想:采樣(Sample)和聚合(Aggregate)。
整個過程的偽代碼(Algorithm 1)如下:
核心部分是第4、5行:
- 第4行:聚合節點 v 的鄰居節點(采樣后的)在第 k?1k-1k?1 層的 embedding,得到第 kkk 層的鄰居聚合特征 hN(v)kh_{N(v)}^khN(v)k?。
- 第5行:將 hN(v)kh_{N(v)}^khN(v)k? 與第 k?1k-1k?1 層節點 vvv 的 embedding 向量拼接,輸入到全連接層后得到節點 vvv 在第 kkk 層的 embedding 向量:hvkh_v^khvk?。
偽代碼中的 K 表示聚合深度,可以理解為要聚合 K 跳鄰居節點的信息。
聚合函數
在偽代碼的第 4 行,使用了聚合函數,由于在圖中頂點的鄰居是天然無序的,所以我們希望構造出的聚合函數是對稱的(即改變輸入的順序,函數的輸出結果不變),同時具有較高的表達能力。
作者給出了多種聚合函數:
1,平均聚合
也就是Algorithm 1中的聚合方法,對第 k-1 層的鄰居embedding中每個維度取平均,然后與目標節點第 k-1 層的embedding拼接后進行非線性轉換:
hN(v)k=mean?({huk?1,u∈N(v)})hvk=σ(Wk?CONCAT?(hvk?1,hN(u)k))\begin{array}{c} h_{N(v)}^{k}=\operatorname{mean}\left(\left\{h_{u}^{k-1}, u \in N(v)\right\}\right) \\ h_{v}^{k}=\sigma\left(W^{k} \cdot \operatorname{CONCAT}\left(h_{v}^{k-1}, h_{N(u)}^{k}\right)\right) \end{array} hN(v)k?=mean({huk?1?,u∈N(v)})hvk?=σ(Wk?CONCAT(hvk?1?,hN(u)k?))?
2,類GCN聚合
直接對目標節點和所有鄰居emebdding中每個維度取平均,然后再非線性轉換:
hvk=σ(Wk?mean?({hvk?1}∪{huk?1,?u∈N(v)}))h_{v}^{k}=\sigma\left(W^{k} \cdot \operatorname{mean}\left(\left\{h_{v}^{k-1}\right\} \cup\left\{h_{u}^{k-1}, \forall u \in N(v)\right\}\right)\right) hvk?=σ(Wk?mean({hvk?1?}∪{huk?1?,?u∈N(v)}))
這樣的聚合方法與GCN的聚合方式十分相似。
3,LSTM 聚合
LSTM 模型對輸入數據的表達能力更強,但是LSTM聚合函數不具有對稱性,因此在使用時,需要對輸入的序列隨機打亂順序。
4,Pooling聚合
每個鄰居節點的embedding向量都輸入到全連接神經網絡中,然后對得到的embedding進行 max pooling 操作:
hN(v)k?1=max?({σ(Wpoolhuik?1+b)},?ui∈N(v))hvk=σ(Wk?CONCAT?(hvk?1,hN(u)k?1))\begin{aligned} h_{N(v)}^{k-1} &=\max \left(\left\{\sigma\left(W_{\text {pool}} h_{u_i}^{k-1}+b\right)\right\}, \forall u_{i} \in N(v)\right) \\ h_{v}^{k} &=\sigma\left(W^{k} \cdot \operatorname{CONCAT}\left(h_{v}^{k-1}, h_{N(u)}^{k-1}\right)\right) \end{aligned} hN(v)k?1?hvk??=max({σ(Wpool?hui?k?1?+b)},?ui?∈N(v))=σ(Wk?CONCAT(hvk?1?,hN(u)k?1?))?
論文中的結果表明LSTM與Pooling聚合的方式要好一些,但是LSTM復雜度過高,因此 Pooling 是比較好的聚合器。作者也給出GraphSage的實踐中,K=2,S1*S2<=500就可以達到很高的性能,這說明一般只需要擴展到節點的2階鄰居,每次擴展約20~30個鄰居即可。
參數的學習
對于無監督學習,我們的學習目標是讓相鄰的節點擁有相似的向量表示:
其中
- zuz_uzu?是節點 u 通過模型學到的 embedding
- Q 是采樣的樣本數量
- vn~Pn(u)v_n \sim P_n{(u)}vn?~Pn?(u) 表示負采樣
- 節點 vnv_nvn? 是從節點 uuu 的負采樣分布 PnP_nPn? 中采樣的
- ε 是非常接近0的正數,是為了防止對 0 取對數
如何理解這個損失函數?
先看損失函數的藍色部分,當節點 u、v 比較接近時,那么其 embedding 向量zu,zvz_u, z_vzu?,zv?的距離應該比較近,因此二者的內積應該很大,經過σ函數后是接近1的數,因此取對數后的數值接近于0。
再看看紫色的部分,當節點 u、v 比較遠時,那么其 embedding 向量zu,zvz_u, z_vzu?,zv?的距離應該比較遠,在理想情況下,二者的內積應該是很大的負數,乘上-1后再經過σ函數可以得到接近1的數,因此取對數后的數值接近于0。
對于有監督學習,損失函數就比較常見了,例如交叉熵。
mini-batch 采樣
前面提到的采樣算法僅適用于比較小的圖,它需要將輸入GraphSAGE的數據在整個圖中一次性采樣好,然后作為一個batch輸入到模型中。如果想要對大規模的圖進行訓練,并且使用隨機梯度下降算法進行優化,那么就需要構造 mini-batch 數據作為模型的輸入。
由于 mini-batch 中的節點只是圖的局部數據,因此采樣方法與之前提到的方法略有不同,整體如下圖所示:
上圖中第 2~7 行是進行mini-batch采樣,以 K = 2 為例,最終得到的mini-batch數據有:
- k = 2:B2B^{2}B2 = [目標節點]
- k = 1:B1B^{1}B1 = [目標節點 + 一階鄰居]
- k = 0:B0B^{0}B0 = [目標節點 + 一階鄰居 + 二階鄰居]
再來個更具體的例子,橙色的是目標節點:
則當K=3時,每個mini-batch中的數據如下:
可以看到隨著 k 的增大,mini-batch 中的節點數量也是遞減的,當 k = K 時,mini-batch 中的數據只剩下我們最終需要計算embedding的目標節點了。反之,當 k = 0 時,mini-batch 中節點的個數最多,因為這里面存放了進行 K 次聚合要到的全部節點。
接下來看看第9~15行,聚合操作是從最外層的多跳鄰居開始向內層跳數更少的鄰居開始聚合,也就是從上圖中的下層向著上層進行聚合:B0→B1→B2B^0 \rightarrow B^1\rightarrow B^2B0→B1→B2。
這個過程實際上是:
代碼實現(tensorflow2.0)
有監督學習
mini-batch 采樣:
# 獲得目標節點數據 def _compute_diffusion_matrix(dst_nodes, neigh_dict, sample_size, max_node_id):# 對鄰居序列采樣def sample(ns):return np.random.choice(ns, min(len(ns), sample_size), replace=False)# 鄰居序列向量化,得到鄰接向量def vectorize(ns):v = np.zeros(max_node_id + 1, dtype=np.float32)v[ns] = 1return v# 對鄰居采樣,得到鄰接矩陣adj_mat_full = np.stack([vectorize(sample(neigh_dict[n])) for n in dst_nodes])# 標記哪些列非零,后面用于壓縮矩陣nonzero_cols_mask = np.any(adj_mat_full.astype(np.bool), axis=0)# 壓縮矩陣:取出不全為零的列adj_mat = adj_mat_full[:, nonzero_cols_mask]# 按行求和adj_mat_sum = np.sum(adj_mat, axis=1, keepdims=True)# 按行歸一化dif_mat = adj_mat / adj_mat_sum# 得到所有目標節點的鄰接序號src_nodes = np.arange(nonzero_cols_mask.size)[nonzero_cols_mask]# 將目標節點與鄰接節點取并集,并且升序排序dstsrc = np.union1d(dst_nodes, src_nodes)# 標記哪些節點是鄰接節點dstsrc2src = np.searchsorted(dstsrc, src_nodes)# 標記哪些節點是目標節點dstsrc2dst = np.searchsorted(dstsrc, dst_nodes)return dstsrc, dstsrc2src, dstsrc2dst, dif_mat# 根據節點構造mini-batch數據 def build_batch_from_nodes(nodes, neigh_dict, sample_sizes):"""參數:nodes: 目標節點列表neigh_dict: 鄰居節點列表sample_sizes: 每層采樣的個數"""# dst_nodes 實際上是棧,存儲了0,1,2...,K階(鄰居)節點集合dst_nodes = [nodes]dstsrc2dsts = []dstsrc2srcs = []dif_mats = []max_node_id = max(list(neigh_dict.keys()))"""以下是mini-batch采樣算法,這里以K層為例,說明一下采樣順序與dst_nodes棧內的數據:采樣順序是從K,K-1,... 1:B_K(棧底元素): 輸入目標節點集合nodes;B_K-1: 目標節點+其一階鄰居節點;B_K-2: 目標節點+其一階鄰居節點+其二階鄰居節點;...B_0(棧頂元素): 目標節點+其一階鄰居節點+二階鄰居節點+...+K階鄰居節點。"""for sample_size in reversed(sample_sizes):# _compute_diffusion_matrix:# 對目標節點dst_nodes[-1]鄰居采樣sample_size個# 當dst_nodes[-1]==nodes時,需要對nodes的鄰居# ds 是目標節點、鄰居節點并集# d2s 是ds中鄰居節點的序號# d2d 是ds中目標節點的序號ds, d2s, d2d, dm = _compute_diffusion_matrix ( dst_nodes[-1], neigh_dict, sample_size, max_node_id)dst_nodes.append(ds)dstsrc2srcs.append(d2s)dstsrc2dsts.append(d2d)dif_mats.append(dm)src_nodes = dst_nodes.pop()MiniBatchFields = ["src_nodes", "dstsrc2srcs", "dstsrc2dsts", "dif_mats"]MiniBatch = collections.namedtuple ("MiniBatch", MiniBatchFields)return MiniBatch(src_nodes, dstsrc2srcs, dstsrc2dsts, dif_mats)平均值聚合器:
# 平均值聚合器 class MeanAggregator(tf.keras.layers.Layer):def __init__(self, src_dim, dst_dim, activ=True, **kwargs):"""src_dim: 輸入維度dst_dim: 輸出維度"""super().__init__(**kwargs)self.activ_fn = tf.nn.relu if activ else tf.identityself.w = self.add_weight( name = kwargs["name"] + "_weight", shape = (src_dim*2, dst_dim), dtype = tf.float32, initializer = init_fn, trainable = True)def call(self, dstsrc_features, dstsrc2src, dstsrc2dst, dif_mat):"""dstsrc_features: 第 K-1 層所有節點的 embeddingdstsrc2dst: 當前層的目標節點dstsrc2src: 當前層的鄰居節點dif_mat: 歸一化矩陣"""# 從當前batch所有節點中取出目標節點dst_features = tf.gather(dstsrc_features, dstsrc2dst)# 從當前batch所有節點中取出鄰居節點src_features = tf.gather(dstsrc_features, dstsrc2src)# 對鄰居節點加權求和,得到鄰居節點embedding之和的均值# (batch_size, num_neighbors) x (num_neighbors, src_dim)aggregated_features = tf.matmul(dif_mat, src_features)# 將第k-1層的embedding與聚合結果進行拼接concatenated_features = tf.concat([aggregated_features, dst_features], 1)# 乘上權重矩陣 w x = tf.matmul(concatenated_features, self.w)return self.activ_fn(x)有監督 GraphSage 模型:
class GraphSageBase(tf.keras.Model):def __init__(self, raw_features, internal_dim, num_layers, last_has_activ):assert num_layers > 0, 'illegal parameter "num_layers"'assert internal_dim > 0, 'illegal parameter "internal_dim"'super().__init__()self.input_layer = RawFeature(raw_features, name="raw_feature_layer")self.seq_layers = []for i in range (1, num_layers + 1):layer_name = "agg_lv" + str(i)input_dim = internal_dim if i > 1 else raw_features.shape[-1]has_activ = last_has_activ if i == num_layers else Trueaggregator_layer = MeanAggregator ( input_dim, internal_dim, name=layer_name, activ = has_activ)self.seq_layers.append(aggregator_layer)def call(self, minibatch):# 取出當前batch中用到的所有節點x = self.input_layer(tf.squeeze(minibatch.src_nodes))for aggregator_layer in self.seq_layers:# 逐層聚合x = aggregator_layer ( x, minibatch.dstsrc2srcs.pop(), minibatch.dstsrc2dsts.pop(), minibatch.dif_mats.pop())return x # shape: (batch_size, src_dim)class GraphSageSupervised(GraphSageBase):def __init__(self, raw_features, internal_dim, num_layers, num_classes):super().__init__(raw_features, internal_dim, num_layers, True)self.classifier = tf.keras.layers.Dense ( num_classes, activation = tf.nn.softmax, use_bias = False, kernel_initializer = init_fn, name = "classifier")def call(self, minibatch):return self.classifier( super().call(minibatch) )無監督學習
mini-batch 采樣:
def _get_neighbors(nodes, neigh_dict):return np.unique(np.concatenate([neigh_dict[n] for n in nodes]))# 無監督學習時,根據邊得到 mini-batch 數據 def build_batch_from_edges(edges, nodes, neigh_dict, sample_sizes, neg_size):# batchA 目標節點列表# batchB 與目標節點對應的鄰居節點列表batchA, batchB = edges.transpose()# 從 nodes 中去除 batchA、batchA節點鄰居,batchB、batchB節點鄰居# 執行過程:((((nodes-batchA)-neighbor_A)-batchB) - neighbor_B)# 得到所有可能的負樣本possible_negs = reduce ( np.setdiff1d, ( nodes, batchA, _get_neighbors(batchA, neigh_dict), batchB, _get_neighbors(batchB, neigh_dict)))# 從所有負樣本中采樣出neg_size個batchN = np.random.choice ( possible_negs, min(neg_size, len(possible_negs)), replace=False)# np.unique:去重,結果已排序batch_all = np.unique(np.concatenate((batchA, batchB, batchN)))# 得到batchA、batchB在batch_all中的序號dst2batchA = np.searchsorted(batch_all, batchA)dst2batchB = np.searchsorted(batch_all, batchB)# 計算batch_all每個元素在batchN中是否出現dst2batchN = np.in1d(batch_all, batchN)# 上面已經完成了邊的采樣,并且得到邊的節點# 接下來是構造mini-batch數據minibatch_plain = build_batch_from_nodes ( batch_all, neigh_dict, sample_sizes)MiniBatchFields = [ "src_nodes", "dstsrc2srcs", "dstsrc2dsts", "dif_mats", "dst2batchA", "dst2batchB", "dst2batchN" ]MiniBatch = collections.namedtuple ("MiniBatch", MiniBatchFields)return MiniBatch ( minibatch_plain.src_nodes # 目標節點與鄰居節點集合, minibatch_plain.dstsrc2srcs # 鄰居節點集合, minibatch_plain.dstsrc2dsts # 目標節點集合, minibatch_plain.dif_mats # 歸一化矩陣, dst2batchA # 隨機采樣邊的左頂點, dst2batchB # 隨機采樣邊的右頂點, dst2batchN # 標記是否為負采樣節點的mask)無監督損失函數:
# 無監督學習的損失函數 @tf.function def compute_uloss(embeddingA, embeddingB, embeddingN, neg_weight):# 計算邊的兩個節點的內積,得到相似度# (batch_size, emb_dim) * (batch_size, emb_dim) # -> (batch_size, emb_dim) -> (batch_size, )pos_affinity = tf.reduce_sum ( tf.multiply ( embeddingA, embeddingB ), axis=1 )# 相當于每個節點都和負樣本的 embedding 計算內積,# 得到每個節點與每個負樣本的相似度# (batch_size, emb_dim) x (emb_dim, neg_size) -> (batch_size, neg_size)neg_affinity = tf.matmul ( embeddingA, tf.transpose ( embeddingN ) )# shape: (batch_size, )pos_xent = tf.nn.sigmoid_cross_entropy_with_logits ( tf.ones_like(pos_affinity), pos_affinity, "positive_xent" )# shape: (batch_size, neg_num)neg_xent = tf.nn.sigmoid_cross_entropy_with_logits ( tf.zeros_like(neg_affinity), neg_affinity, "negative_xent" )# 對neg_xent所有元素求和后乘上權重weighted_neg = tf.multiply ( neg_weight, tf.reduce_sum(neg_xent) )# 對兩個 loss 進行累加batch_loss = tf.add ( tf.reduce_sum(pos_xent), weighted_neg )# loss 除以樣本個數return tf.divide ( batch_loss, embeddingA.shape[0] )無監督 GraphSage 模型:
class GraphSageUnsupervised(GraphSageBase):def __init__(self, raw_features, internal_dim, num_layers, neg_weight):super().__init__(raw_features, internal_dim, num_layers, False)self.neg_weight = neg_weightdef call(self, minibatch):# 對 embedding 結果進行正則化embeddingABN = tf.math.l2_normalize(super().call(minibatch), 1)# 損失函數的計算self.add_loss (compute_uloss ( tf.gather(embeddingABN, minibatch.dst2batchA), tf.gather(embeddingABN, minibatch.dst2batchB), tf.boolean_mask(embeddingABN, minibatch.dst2batchN), self.neg_weight))return embeddingABN參考文章:
GraphSAGE: GCN落地必讀論文
OhMyGraphs: GraphSAGE and inductive representation learning
全面理解PinSage
GraphSAGE論文總結及源碼解讀
https://github.com/subbyte/graphsage-tf2
總結
以上是生活随笔為你收集整理的GraphSAGE 模型解读与tensorflow2.0代码实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 发票章怎么盖
- 下一篇: excel 创建数据有效性及背景颜色