pytorch geometric的GNN、GCN的节点分类方式
作者:zhangztSky
这篇文章主要介绍了pytorch geometric的GNN、GCN的节点分类方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
pytorch geometric的GNN、GCN节点分类
# -*- coding: utf-8 -*- import os import torch import torch.nn.functional as F import torch.nn as nn from torch_geometric.datasets import Planetoid import torch_geometric.nn as pyg_nn import torch_geometric.transforms as T # load dataset def get_data(folder="node_classify/cora", data_name="cora"): # dataset = Planetoid(root=folder, name=data_name) dataset = Planetoid(root=folder, name=data_name, transform=T.NormalizeFeatures()) return dataset # create the graph cnn model class GraphCNN(nn.Module): def __init__(self, in_c, hid_c, out_c): super(GraphCNN, self).__init__() self.conv1 = pyg_nn.GCNConv(in_channels=in_c, out_channels=hid_c) self.conv2 = pyg_nn.GCNConv(in_channels=hid_c, out_channels=out_c) def forward(self, data): # data.x data.edge_index x = data.x # [N, C] edge_index = data.edge_index # [2 ,E] hid = self.conv1(x=x, edge_index=edge_index) # [N, D] hid = F.relu(hid) out = self.conv2(x=hid, edge_index=edge_index) # [N, out_c] out = F.log_softmax(out, dim=1) # [N, out_c] return out class OwnGCN(nn.Module): def __init__(self, in_c, hid_c, out_c): super(OwnGCN, self).__init__() self.in_ = pyg_nn.SGConv(in_c, hid_c, K=2) self.conv1 = pyg_nn.APPNP(K=2, alpha=0.1) self.conv2 = pyg_nn.APPNP(K=2, alpha=0.1) self.out_ = pyg_nn.SGConv(hid_c, out_c, K=2) def forward(self, data): x, edge_index = data.x, data.edge_index x = self.in_(x, edge_index) x = F.dropout(x, p=0.1, training=self.training) x = F.relu(self.conv1(x, edge_index)) x = F.dropout(x, p=0.1, training=self.training) x = F.relu(self.conv2(x, edge_index)) x = F.dropout(x, p=0.1, training=self.training) x = self.out_(x, edge_index) return F.log_softmax(x, dim=1) # todo list class YourOwnGCN(nn.Module): pass def analysis_data(dataset): print("Basic Info: ", dataset[0]) print("# Nodes: ", dataset[0].num_nodes) print("# Features: ", dataset[0].num_features) print("# Edges: ", dataset[0].num_edges) print("# Classes: ", dataset.num_classes) print("# Train samples: ", dataset[0].train_mask.sum().item()) print("# Valid samples: ", dataset[0].val_mask.sum().item()) print("# Test samples: ", dataset[0].test_mask.sum().item()) print("Undirected: ", dataset[0].is_undirected()) def main(): os.environ["CUDA_VISIBLE_DEVICES"] = "0" cora_dataset = get_data() # todo list # my_net = GraphCNN(in_c=cora_dataset.num_features, hid_c=150, out_c=cora_dataset.num_classes) my_net = OwnGCN(in_c=cora_dataset.num_features, hid_c=300, out_c=cora_dataset.num_classes) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") my_net = my_net.to(device) data = cora_dataset[0].to(device) optimizer = torch.optim.Adam(my_net.parameters(), lr=1e-2, weight_decay=1e-3) """ # model train my_net.train() for epoch in range(500): optimizer.zero_grad() output = my_net(data) loss = F.nll_loss(output[data.train_mask], data.y[data.train_mask]) loss.backward() optimizer.step() _, prediction = output.max(dim=1) valid_correct = prediction[data.val_mask].eq(data.y[data.val_mask]).sum().item() valid_number = data.val_mask.sum().item() valid_acc = valid_correct / valid_number print("Epoch: {:03d}".format(epoch + 1), "Loss: {:.04f}".format(loss.item()), "Valid Accuracy:: {:.4f}".format(valid_acc)) """ # model test my_net = torch.load("node_classify/best.pth") my_net.eval() _, prediction = my_net(data).max(dim=1) target = data.y test_correct = prediction[data.test_mask].eq(target[data.test_mask]).sum().item() test_number = data.test_mask.sum().item() train_correct = prediction[data.train_mask].eq(target[data.train_mask]).sum().item() train_number = data.train_mask.sum().item() print("==" * 20) print("Accuracy of Train Samples: {:.04f}".format(train_correct / train_number)) print("Accuracy of Test Samples: {:.04f}".format(test_correct / test_number)) def test_main(): os.environ["CUDA_VISIBLE_DEVICES"] = "0" device = torch.device("cuda" if torch.cuda.is_available() else "cpu") cora_dataset = get_data() data = cora_dataset[0].to(device) my_net = torch.load("node_classify/best.pth") my_net.eval() _, prediction = my_net(data).max(dim=1) target = data.y test_correct = prediction[data.test_mask].eq(target[data.test_mask]).sum().item() test_number = data.test_mask.sum().item() train_correct = prediction[data.train_mask].eq(target[data.train_mask]).sum().item() train_number = data.train_mask.sum().item() print("==" * 20) print("Accuracy of Train Samples: {:.04f}".format(train_correct / train_number)) print("Accuracy of Test Samples: {:.04f}".format(test_correct / test_number)) if __name__ == '__main__': test_main() # main() # dataset = get_data() # analysis_data(dataset)
pytorch下GCN代码解读
def main(): print("hello world") main() import os.path as osp import argparse import torch import torch.nn.functional as F from torch_geometric.datasets import Planetoid import torch_geometric.transforms as T from torch_geometric.nn import GCNConv, ChebConv # noqa #GCN用于提取图的特征参数然后用于分类 #数据集初始化部分 parser = argparse.ArgumentParser() parser.add_argument('--use_gdc', action='store_true', help='Use GDC preprocessing.') args = parser.parse_args()#是否使用GDC优化 dataset = 'CiteSeer'#训练用的数据集 path = osp.join(osp.dirname(osp.realpath(__file__)), '..', 'data', dataset)#数据集存放位置 dataset = Planetoid(path, dataset, transform=T.NormalizeFeatures())#数据初始化类,其dataset的基类是一个torch.utils.data.Dataset对象 data = dataset[0]#只有一个图作为训练数据 #print(data) #预处理和模型定义 if args.use_gdc: gdc = T.GDC(self_loop_weight=1, normalization_in='sym', normalization_out='col', diffusion_kwargs=dict(method='ppr', alpha=0.05), sparsification_kwargs=dict(method='topk', k=128, dim=0), exact=True) data = gdc(data)#图扩散卷积用于预处理 #搭建模型 class Net(torch.nn.Module): #放置参数层(一般为可学习层,不可学习层也可放置,若不放置,则在forward中用functional实现) def __init__(self): super(Net, self).__init__()#在不覆盖Module的Init函数的情况下设置Net的init函数 self.conv1 = GCNConv(dataset.num_features, 16, cached=True, normalize=not args.use_gdc)#第一层GCN卷积运算输入特征向量大小为num_features输出大小为16 #GCNConv的def init需要in_channel和out_channel(卷积核的数量)的参数,并对in_channel和out_channel调用linear函数,而该函数的作用为构建全连接层 self.conv2 = GCNConv(16, dataset.num_classes, cached=True, normalize=not args.use_gdc)#第二层GCN卷积运算输入为16(第一层的输出)输出为num_class # self.conv1 = ChebConv(data.num_features, 16, K=2) # self.conv2 = ChebConv(16, data.num_features, K=2) #实现模型的功能各个层之间的连接关系(具体实现) def forward(self): x, edge_index, edge_weight = data.x, data.edge_index, data.edge_attr#赋值data.x特征向量edge_index图的形状,edge_attr权重矩阵 x = F.relu(self.conv1(x, edge_index, edge_weight))#第一层用非线性激活函数relu #x,edge_index,edge_weight特征矩阵,邻接矩阵,权重矩阵组成GCN核心公式 x = F.dropout(x, training=self.training)#用dropout函数防止过拟合 x = self.conv2(x, edge_index, edge_weight)#第二层输出 return F.log_softmax(x, dim=1)#log_softmax激活函数用于最后一层返回分类结果 #Z=log_softmax(A*RELU(A*X*W0)*W1)A连接关系X特征矩阵W参数矩阵 #得到Z后即可用于分类 #softmax(dim=1)行和为1再取log x为节点的embedding device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')#指定设备 model, data = Net().to(device), data.to(device)#copy model,data到device上 #优化算法 optimizer = torch.optim.Adam([ dict(params=model.conv1.parameters(), weight_decay=5e-4),#权重衰减避免过拟合 dict(params=model.conv2.parameters(), weight_decay=0)#需要优化的参数 ], lr=0.01) # Only perform weight-decay on first convolution. #lr步长因子或者是学习率 #模型训练 def train(): model.train()#设置成train模式 optimizer.zero_grad()#清空所有被优化的变量的梯度 F.nll_loss(model()[data.train_mask], data.y[data.train_mask]).backward()#损失函数训练参数用于节点分类 optimizer.step()#步长 @torch.no_grad()#不需要计算梯度,也不进行反向传播 #测试 def test(): model.eval()#设置成evaluation模式 logits, accs = model(), [] for _, mask in data('train_mask', 'val_mask', 'test_mask'):#mask矩阵,掩膜作用与之相应的部分不会被更新 pred = logits[mask].max(1)[1]#mask对应点的输出向量最大值并取序号1 acc = pred.eq(data.y[mask]).sum().item() / mask.sum().item()#判断是否相等计算准确度 accs.append(acc) return accs best_val_acc = test_acc = 0 #执行 for epoch in range(1, 201): train() train_acc, val_acc, tmp_test_acc = test()#训练准确率,实际输入的准确率,测试准确率 if val_acc > best_val_acc: best_val_acc = val_acc test_acc = tmp_test_acc log = 'Epoch: {:03d}, Train: {:.4f}, Val: {:.4f}, Test: {:.4f}'#类型及保留位数 print(log.format(epoch, train_acc, best_val_acc, test_acc))#输出格式化函数'''
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。