dtw算法介绍,假设存在两个时间序列Q与C,长度为n与m
Q
=
q
1
,
q
2
.
.
.
Q=q_1,q_2...
Q=q1,q2...
C
=
c
1
,
c
2
.
.
.
C=c_1,c_2...
C=c1,c2...
为了对齐这两个序列,我们需要构造一个n x m的矩阵网格,矩阵元素(i, j)表示qi与cj两个点的距离d(qi, cj)(也就是序列Q的每一个点和C的每一个点之间的相似度,距离越小则相似度越高。这里先不管顺序),一般采用欧式距离,d(qi, cj)= (qi-cj)2(也可以理解为失真度)。每一个矩阵元素(i, j)表示点qi和cj的对齐。DP算法可以归结为寻找一条通过此网格中若干格点的路径,路径通过的格点即为两个序列进行计算的对齐的点。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-J0JTCHky-1667047766490)(./%E4%BC%81%E4%B8%9A%E5%BE%AE%E4%BF%A1%E6%88%AA%E5%9B%BE_16667786814123.jpg “图片”)]
那么这条路径我们怎么找到呢?那条路径才是最好的呢?也就是刚才那个问题,怎么样的warping才是最好的。
首先,这条路径不是随意选择的,需要满足以下几个约束:
1)边界条件:w1=(1, 1)和wK=(m, n)。任何一种语音的发音快慢都有可能变化,但是其各部分的先后次序不可能改变,因此所选的路径必定是从左下角出发,在右上角结束。
2)连续性:如果wk-1= (a’, b’),那么对于路径的下一个点wk=(a, b)需要满足 (a-a’) <=1和 (b-b’) <=1。也就是不可能跨过某个点去匹配,只能和自己相邻的点对齐。这样可以保证Q和C中的每个坐标都在W中出现。
3)单调性:如果wk-1= (a’, b’),那么对于路径的下一个点wk=(a, b)需要满足0<=(a-a’)和0<= (b-b’)。这限制W上面的点必须是随着时间单调进行的。以保证图B中的虚线不会相交。
结合连续性和单调性约束,每一个格点的路径就只有三个方向了。例如如果路径已经通过了格点(i, j),那么下一个通过的格点只可能是下列三种情况之一:(i+1, j),(i, j+1)或者(i+1, j+1)。
满足上面这些约束条件的路径可以有指数个,最佳路径是使得沿路径的积累距离达到最小值这条路径。这条路径可以通过动态规划(dynamic programming)算法得到。具体搜索或者求解过程的直观例子解释可以参考:链接
import math
def DTWDistance(s1, s2, w):
DTW={}
w = max(w, abs(len(s1)-len(s2)))
for i in range(-1,len(s1)):
for j in range(-1,len(s2)):
DTW[(i, j)] = float('inf')
DTW[(-1, -1)] = 0
for i in range(len(s1)):
for j in range(max(0, i-w), min(len(s2), i+w)):
dist= (s1[i]-s2[j])**2
DTW[(i, j)] = dist + min(DTW[(i-1, j)],DTW[(i, j-1)], DTW[(i-1, j-1)])
return math.sqrt(DTW[len(s1)-1, len(s2)-1])
urllib.request.urlretrieve(
"https://data.dgl.ai/tutorial/dataset/graph_edges.csv", "./graph_edges.csv"
)
urllib.request.urlretrieve(
"https://data.dgl.ai/tutorial/dataset/graph_properties.csv",
"./graph_properties.csv",
)
edges = pd.read_csv("./graph_edges.csv")
properties = pd.read_csv("./graph_properties.csv")
edges.head()
properties.head()
class SyntheticDataset(DGLDataset):
def __init__(self):
super().__init__(name="synthetic")
def process(self):
edges = pd.read_csv("./graph_edges.csv")
properties = pd.read_csv("./graph_properties.csv")
self.graphs = []
self.labels = []
# Create a graph for each graph ID from the edges table.
# First process the properties table into two dictionaries with graph IDs as keys.
# The label and number of nodes are values.
label_dict = {}
num_nodes_dict = {}
for _, row in properties.iterrows():
label_dict[row["graph_id"]] = row["label"]
num_nodes_dict[row["graph_id"]] = row["num_nodes"]
# For the edges, first group the table by graph IDs.
edges_group = edges.groupby("graph_id")
# For each graph ID...
for graph_id in edges_group.groups:
# Find the edges as well as the number of nodes and its label.
edges_of_id = edges_group.get_group(graph_id)
src = edges_of_id["src"].to_numpy()
dst = edges_of_id["dst"].to_numpy()
num_nodes = num_nodes_dict[graph_id]
label = label_dict[graph_id]
# Create a graph and add it to the list of graphs and labels.
g = dgl.graph((src, dst), num_nodes=num_nodes)
self.graphs.append(g)
self.labels.append(label)
# Convert the label list to tensor for saving.
self.labels = torch.LongTensor(self.labels)
def __getitem__(self, i):
return self.graphs[i], self.labels[i]
def __len__(self):
return len(self.graphs)
dataset = SyntheticDataset()
graph, label = dataset[0]
print(graph, label)
import torch
import dgl
src = torch.LongTensor(
[0, 0, 0, 1, 2, 2, 2, 3, 3, 4, 4, 5, 5, 6, 7, 7, 8, 9, 10,
1, 2, 3, 3, 3, 4, 5, 5, 6, 5, 8, 6, 8, 9, 8, 11, 11, 10, 11])
dst = torch.LongTensor(
[1, 2, 3, 3, 3, 4, 5, 5, 6, 5, 8, 6, 8, 9, 8, 11, 11, 10, 11,
0, 0, 0, 1, 2, 2, 2, 3, 3, 4, 4, 5, 5, 6, 7, 7, 8, 9, 10])
g = dgl.graph((src, dst))
sampler = dgl.dataloading.NeighborSampler([1, 2, 3])
dataloader = dgl.dataloading.DataLoader(
g, [5], sampler,
batch_size=1, shuffle=True, drop_last=False, num_workers=4)
for input_nodes, output_nodes, blocks in dataloader:
print(input_nodes) # 输入的节点数
print(output_nodes) # 输出的节点数
print(blocks)
for i in range(len(blocks)):
left, right = blocks[i].edges()
u = [int(input_nodes[l]) for l in left]
v = [int(input_nodes[r]) for r in right]
print("u_v", u, v)
print("此块输入节点", blocks[i].srcdata)
print("此块输出节点", blocks[i].dstdata)
print("*" * 10)
print("="*10)
# mfg_0_src = mfgs[0].srcdata[dgl.NID]
# mfg_0_dst = mfgs[0].dstdata[dgl.NID]
# print(mfg_0_src)
# print(mfg_0_dst)
# print(torch.equal(mfg_0_src[: mfgs[0].num_dst_nodes()], mfg_0_dst))
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-d9BYQime-1667047766494)(./df2094b22f784d74adbfa2123025c3e9.png)]
import dgl
dataloader = dgl.dataloading.EdgeDataLoader(
g, train_seeds, sampler,
negative_sampler=dgl.dataloading.negative_sampler.Uniform(5),#表示负采样的样本个数
batch_size=4,
shuffle=True,
drop_last=False,
pin_memory=True,
num_workers=False)
def train(g, model):
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
best_val_acc = 0
best_test_acc = 0
features = g.ndata["feat"]
labels = g.ndata["label"]
train_mask = g.ndata["train_mask"]
val_mask = g.ndata["val_mask"]
test_mask = g.ndata["test_mask"]
for e in range(100):
# Forward
logits = model(g, features)
# Compute prediction
pred = logits.argmax(1)
# Compute loss
# Note that you should only compute the losses of the nodes in the training set.
loss = F.cross_entropy(logits[train_mask], labels[train_mask])
# Compute accuracy on training/validation/test
train_acc = (pred[train_mask] == labels[train_mask]).float().mean()
val_acc = (pred[val_mask] == labels[val_mask]).float().mean()
test_acc = (pred[test_mask] == labels[test_mask]).float().mean()
# Save the best validation accuracy and the corresponding test accuracy.
if best_val_acc < val_acc:
best_val_acc = val_acc
best_test_acc = test_acc
# Backward
optimizer.zero_grad()
loss.backward()
optimizer.step()
if e % 5 == 0:
print(
"In epoch {}, loss: {:.3f}, val acc: {:.3f} (best {:.3f}), test acc: {:.3f} (best {:.3f})".format(
e, loss, val_acc, best_val_acc, test_acc, best_test_acc
)
)
model = GCN(g.ndata["feat"].shape[1], 16, dataset.num_classes)
train(g, model)
class GCN(nn.Module):
def __init__(self, in_feats, h_feats, num_classes):
super(GCN, self).__init__()
self.conv1 = GraphConv(in_feats, h_feats)
self.conv2 = GraphConv(h_feats, num_classes)
def forward(self, g, in_feat):
h = self.conv1(g, in_feat)
h = F.relu(h)
h = self.conv2(g, h)
return h
model = GCN(g.ndata["feat"].shape[1], 16, dataset.num_classes)
class SAGEConv(nn.Module):
"""Graph convolution module used by the GraphSAGE model.
Parameters
----------
in_feat : int
Input feature size.
out_feat : int
Output feature size.
"""
def __init__(self, in_feat, out_feat):
super(SAGEConv, self).__init__()
# A linear submodule for projecting the input and neighbor feature to the output.
self.linear = nn.Linear(in_feat * 2, out_feat)
def forward(self, g, h):
"""Forward computation
Parameters
----------
g : Graph
The input graph.
h : Tensor
The input node feature.
"""
with g.local_scope():
g.ndata["h"] = h
# update_all is a message passing API.
g.update_all(
message_func=fn.copy_u("h", "m"),
reduce_func=fn.mean("m", "h_N"),
)
h_N = g.ndata["h_N"]
h_total = torch.cat([h, h_N], dim=1)
return self.linear(h_total)
class Model(nn.Module):
def __init__(self, in_feats, h_feats, num_classes):
super(Model, self).__init__()
self.conv1 = SAGEConv(in_feats, h_feats)
self.conv2 = SAGEConv(h_feats, num_classes)
def forward(self, g, in_feat):
h = self.conv1(g, in_feat)
h = F.relu(h)
h = self.conv2(g, h)
return h
import dgl.data
dataset = dgl.data.CoraGraphDataset()
g = dataset[0]
def train(g, model):
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
all_logits = []
best_val_acc = 0
best_test_acc = 0
features = g.ndata["feat"]
labels = g.ndata["label"]
train_mask = g.ndata["train_mask"]
val_mask = g.ndata["val_mask"]
test_mask = g.ndata["test_mask"]
for e in range(200):
# Forward
logits = model(g, features)
# Compute prediction
pred = logits.argmax(1)
# Compute loss
# Note that we should only compute the losses of the nodes in the training set,
# i.e. with train_mask 1.
loss = F.cross_entropy(logits[train_mask], labels[train_mask])
# Compute accuracy on training/validation/test
train_acc = (pred[train_mask] == labels[train_mask]).float().mean()
val_acc = (pred[val_mask] == labels[val_mask]).float().mean()
test_acc = (pred[test_mask] == labels[test_mask]).float().mean()
# Save the best validation accuracy and the corresponding test accuracy.
if best_val_acc < val_acc:
best_val_acc = val_acc
best_test_acc = test_acc
# Backward
optimizer.zero_grad()
loss.backward()
optimizer.step()
all_logits.append(logits.detach())
if e % 5 == 0:
print(
"In epoch {}, loss: {:.3f}, val acc: {:.3f} (best {:.3f}), test acc: {:.3f} (best {:.3f})".format(
e, loss, val_acc, best_val_acc, test_acc, best_test_acc
)
)
model = Model(g.ndata["feat"].shape[1], 16, dataset.num_classes)
train(g, model)
class WeightedSAGEConv(nn.Module):
"""Graph convolution module used by the GraphSAGE model with edge weights.
Parameters
----------
in_feat : int
Input feature size.
out_feat : int
Output feature size.
"""
def __init__(self, in_feat, out_feat):
super(WeightedSAGEConv, self).__init__()
# A linear submodule for projecting the input and neighbor feature to the output.
self.linear = nn.Linear(in_feat * 2, out_feat)
def forward(self, g, h, w):
"""Forward computation
Parameters
----------
g : Graph
The input graph.
h : Tensor
The input node feature.
w : Tensor
The edge weight.
"""
with g.local_scope():
g.ndata["h"] = h
g.edata["w"] = w
g.update_all(
message_func=fn.u_mul_e("h", "w", "m"),
reduce_func=fn.mean("m", "h_N"),
)
h_N = g.ndata["h_N"]
h_total = torch.cat([h, h_N], dim=1)
return self.linear(h_total)
class Model(nn.Module):
def __init__(self, in_feats, h_feats, num_classes):
super(Model, self).__init__()
self.conv1 = WeightedSAGEConv(in_feats, h_feats)
self.conv2 = WeightedSAGEConv(h_feats, num_classes)
def forward(self, g, in_feat):
h = self.conv1(g, in_feat, torch.ones(g.num_edges(), 1).to(g.device))
h = F.relu(h)
h = self.conv2(g, h, torch.ones(g.num_edges(), 1).to(g.device))
return h
model = Model(g.ndata["feat"].shape[1], 16, dataset.num_classes)
train(g, model)
def u_mul_e_udf(edges):
return {"m": edges.src["h"] * edges.data["w"]}
def mean_udf(nodes):
return {"h_N": nodes.mailbox["m"].mean(1)}
g, in_feat, torch.ones(g.num_edges(), 1).to(g.device))
h = F.relu(h)
h = self.conv2(g, h, torch.ones(g.num_edges(), 1).to(g.device))
return h
model = Model(g.ndata[“feat”].shape[1], 16, dataset.num_classes)
train(g, model)
def u_mul_e_udf(edges):
return {“m”: edges.src[“h”] * edges.data[“w”]}
def mean_udf(nodes):
return {“h_N”: nodes.mailbox[“m”].mean(1)}