diff options
| author | ericmarin <maarin.eric@gmail.com> | 2026-04-13 19:42:39 +0200 |
|---|---|---|
| committer | ericmarin <maarin.eric@gmail.com> | 2026-04-13 21:38:16 +0200 |
| commit | fcbbc960f43137aa170b78ba0be2d89aec3bc766 (patch) | |
| tree | 15e0249bf429888d9b64f19eb0c6e2d9af0901e4 | |
| parent | 8f4f24523235965cfa2041ed00cc40fc0b4bd367 (diff) | |
| download | vein-master.tar.gz vein-master.zip | |
New ops: Slice, Squeeze, Unsqueeze
New tests based on papers:
- Wide-to-Deep, Deep-to-Wide Transformation
- Pruining of stably inactive (always negative) and active (always
positive) ReLUs
| -rw-r--r-- | docs/proof.md | 15 | ||||
| -rw-r--r-- | examples/iris/iris.py | 22 | ||||
| -rw-r--r-- | examples/iris/iris_deep_to_wide.py | 87 | ||||
| -rw-r--r-- | examples/iris/iris_stably_active.py | 71 | ||||
| -rw-r--r-- | examples/iris/iris_stably_inactive.py | 57 | ||||
| -rw-r--r-- | examples/iris/iris_wide_to_deep.py | 83 | ||||
| -rw-r--r-- | examples/mnist/mnist.py | 5 | ||||
| -rw-r--r-- | examples/mnist/mnist_deep_to_wide.py | 85 | ||||
| -rw-r--r-- | examples/mnist/mnist_stably_active.py | 69 | ||||
| -rw-r--r-- | examples/mnist/mnist_stably_inactive.py | 52 | ||||
| -rw-r--r-- | examples/mnist/mnist_wide_to_deep.py | 82 | ||||
| -rw-r--r-- | vein.py | 45 |
12 files changed, 640 insertions, 33 deletions
diff --git a/docs/proof.md b/docs/proof.md index 07717f9..99c88aa 100644 --- a/docs/proof.md +++ b/docs/proof.md @@ -37,17 +37,13 @@ By definition this interaction is equal to: By grouping the operations we get:  -### Flatten -Just identity mapping because the wires are always Flatten. +### Identiry / Flatten / Reshape / Squeeze / Unsqueeze +Just identity mapping because wires represent a single element and they are not structured as Tensors.  ### MatMul Equal to Gemm with ,  and . -### Reshape -Just identity mapping because the wires always Flatten. - - ### Add ONNX Add node is defined as:  @@ -76,6 +72,13 @@ By definition this interaction is equal to: By grouping the operations we get:  +### Slice +ONNX Slice is defined as: +}) + +The translations creates a wiring analog to the above definition: +}) + ## Soundness of Interaction Rules ### Materialize The Materialize agent transforms a Linear agent into a tree of explicit mathematical operations diff --git a/examples/iris/iris.py b/examples/iris/iris.py index db631c0..84d1ac6 100644 --- a/examples/iris/iris.py +++ b/examples/iris/iris.py @@ -1,5 +1,4 @@ -import torch -import torch.nn as nn +import torch, torch.nn as nn from sklearn.datasets import load_iris from sklearn.preprocessing import StandardScaler from torch.utils.data import DataLoader, TensorDataset @@ -29,7 +28,7 @@ def train_model(name: str, dim): optimizer = torch.optim.Adam(net.parameters(), lr=1e-2) print(f"Training {name} ({dim} neurons)...") - for epoch in range(200): + for epoch in range(100): global loss for data in trainloader: inputs, targets = data @@ -38,26 +37,13 @@ def train_model(name: str, dim): loss = loss_fn(outputs, targets) loss.backward() optimizer.step() - if (epoch + 1) % 100 == 0: + if (epoch + 1) % 10 == 0: print(f" Epoch {epoch+1}, Loss: {loss.item():.4f}") return net if __name__ == "__main__": torch_net_a = train_model("Network A", 10).eval() - torch_net_b = Iris_MLP(hidden_dim=20).eval() - - with torch.no_grad(): - torch_net_b.layers[0].weight[:10].copy_(torch_net_a.layers[0].weight) # pyright: ignore - torch_net_b.layers[0].bias[:10].copy_(torch_net_a.layers[0].bias) # pyright: ignore - torch_net_b.layers[0].weight[10:].copy_(torch_net_a.layers[0].weight) # pyright: ignore - torch_net_b.layers[0].bias[10:].copy_(torch_net_a.layers[0].bias) # pyright: ignore - - half_weights = torch_net_a.layers[2].weight / 2.0 # pyright: ignore - - torch_net_b.layers[2].weight[:, :10].copy_(half_weights) # pyright: ignore - torch_net_b.layers[2].weight[:, 10:].copy_(half_weights) # pyright: ignore - - torch_net_b.layers[2].bias.copy_(torch_net_a.layers[2].bias) # pyright: ignore + torch_net_b = train_model("Network B", 20).eval() torch.onnx.export(torch_net_a, (torch.randn(1, 4),), "iris_a.onnx") torch.onnx.export(torch_net_b, (torch.randn(1, 4),), "iris_b.onnx") diff --git a/examples/iris/iris_deep_to_wide.py b/examples/iris/iris_deep_to_wide.py new file mode 100644 index 0000000..d94cce7 --- /dev/null +++ b/examples/iris/iris_deep_to_wide.py @@ -0,0 +1,87 @@ +import torch, torch.nn as nn +from sklearn.datasets import load_iris +from sklearn.preprocessing import StandardScaler +from torch.utils.data import DataLoader, TensorDataset + +class Deep_Block(nn.Module): + def __init__(self): + super().__init__() + self.fc1 = nn.Linear(4, 1) + self.fc2 = nn.Linear(1, 3, bias=False) + + def forward(self, x_s): + x, s = x_s + z = torch.relu(self.fc1(x)) + ds = self.fc2(z) + return (x, s + ds) + +class Deep_Iris_MLP(nn.Module): + def __init__(self, num_blocks): + super().__init__() + self.blocks = nn.Sequential(*[Deep_Block() for _ in range(num_blocks)]) + self.final_bias = nn.Parameter(torch.zeros(3)) + + def forward(self, x): + s = torch.zeros(x.shape[0], 3, device=x.device) + _, final_s = self.blocks((x, s)) + return final_s + self.final_bias + +class Wide_Iris_MLP(nn.Module): + def __init__(self, deep_net): + super().__init__() + num_neurons = len(deep_net.blocks) + self.layers = nn.Sequential( + nn.Linear(4, num_neurons), + nn.ReLU(), + nn.Linear(num_neurons, 3), + ) + with torch.no_grad(): + w1_all = [] + b1_all = [] + w2_all = [] + + for block in deep_net.blocks: + w1_all.append(block.fc1.weight.data) + b1_all.append(block.fc1.bias.data) + w2_all.append(block.fc2.weight.data) + + self.layers[0].weight.copy_(torch.cat(w1_all, dim=0)) # pyright: ignore + self.layers[0].bias.copy_(torch.cat(b1_all, dim=0)) # pyright: ignore + self.layers[2].weight.copy_(torch.cat(w2_all, dim=1)) # pyright: ignore + self.layers[2].bias.copy_(deep_net.final_bias.data) # pyright: ignore + + def forward(self, x): + return self.layers(x) + +iris = load_iris() +scaler = StandardScaler() +X = scaler.fit_transform(iris.data).astype('float32') # pyright: ignore +y = iris.target.astype('int64') # pyright: ignore + +dataset = TensorDataset(torch.from_numpy(X), torch.from_numpy(y)) +trainloader = DataLoader(dataset, batch_size=16, shuffle=True) + +def train_deep_model(name: str, num_blocks): + net = Deep_Iris_MLP(num_blocks=num_blocks) + loss_fn = nn.CrossEntropyLoss() + optimizer = torch.optim.Adam(net.parameters(), lr=1e-2) + + print(f"Training {name} ({num_blocks} blocks)...") + for epoch in range(100): + global loss + for inputs, targets in trainloader: + optimizer.zero_grad() + outputs = net(inputs) + loss = loss_fn(outputs, targets) + loss.backward() + optimizer.step() + if (epoch + 1) % 10 == 0: + print(f" Epoch {epoch+1}, Loss: {loss.item():.4f}") + return net + +if __name__ == "__main__": + torch_net_a = train_deep_model("Deep Network", 10).eval() + torch_net_b = Wide_Iris_MLP(torch_net_a).eval() + + torch.onnx.export(torch_net_a, (torch.randn(1, 4),), "iris_a.onnx") + torch.onnx.export(torch_net_b, (torch.randn(1, 4),), "iris_b.onnx") diff --git a/examples/iris/iris_stably_active.py b/examples/iris/iris_stably_active.py new file mode 100644 index 0000000..51b615c --- /dev/null +++ b/examples/iris/iris_stably_active.py @@ -0,0 +1,71 @@ +import torch, torch.nn as nn +from sklearn.datasets import load_iris +from sklearn.preprocessing import StandardScaler +from torch.utils.data import DataLoader, TensorDataset + +class Iris_MLP(nn.Module): + def __init__(self, hidden_dim): + super().__init__() + self.layers = nn.Sequential( + nn.Linear(4, hidden_dim), + nn.ReLU(), + nn.Linear(hidden_dim, 3), + ) + def forward(self, x): + return self.layers(x) + +class Iris_Linear(nn.Module): + def __init__(self, weight, bias): + super().__init__() + self.fc = nn.Linear(4, 3) + self.fc.weight.data = weight + self.fc.bias.data = bias + def forward(self, x): + return self.fc(x) + +iris = load_iris() +scaler = StandardScaler() +X = scaler.fit_transform(iris.data).astype('float32') # pyright: ignore +y = iris.target.astype('int64') # pyright: ignore + +dataset = TensorDataset(torch.from_numpy(X), torch.from_numpy(y)) +trainloader = DataLoader(dataset, batch_size=16, shuffle=True) + +def train_model(name: str, dim): + net = Iris_MLP(hidden_dim=dim) + loss_fn = nn.CrossEntropyLoss() + optimizer = torch.optim.Adam(net.parameters(), lr=1e-2) + + print(f"Training {name} ({dim} neurons)...") + for epoch in range(100): + global loss + for data in trainloader: + inputs, targets = data + optimizer.zero_grad() + outputs = net(inputs) + loss = loss_fn(outputs, targets) + loss.backward() + optimizer.step() + if (epoch + 1) % 10 == 0: + print(f" Epoch {epoch+1}, Loss: {loss.item():.4f}") + return net + +if __name__ == "__main__": + torch_net_a = train_model("Base Network", 10).eval() + + with torch.no_grad(): + torch_net_a.layers[0].weight.fill_(0.1) # pyright: ignore + torch_net_a.layers[0].bias.fill_(10.0) # pyright: ignore + + W1 = torch_net_a.layers[0].weight.data + b1 = torch_net_a.layers[0].bias.data + W2 = torch_net_a.layers[2].weight.data + b2 = torch_net_a.layers[2].bias.data + + W_collapsed = torch.matmul(W2, W1) # pyright: ignore + b_collapsed = torch.matmul(W2, b1) + b2 # pyright: ignore + + torch_net_b = Iris_Linear(W_collapsed, b_collapsed).eval() + + torch.onnx.export(torch_net_a, (torch.randn(1, 4),), "iris_a.onnx") + torch.onnx.export(torch_net_b, (torch.randn(1, 4),), "iris_b.onnx") diff --git a/examples/iris/iris_stably_inactive.py b/examples/iris/iris_stably_inactive.py new file mode 100644 index 0000000..684f967 --- /dev/null +++ b/examples/iris/iris_stably_inactive.py @@ -0,0 +1,57 @@ +import torch, torch.nn as nn +from sklearn.datasets import load_iris +from sklearn.preprocessing import StandardScaler +from torch.utils.data import DataLoader, TensorDataset + +class Iris_MLP(nn.Module): + def __init__(self, hidden_dim): + super().__init__() + self.layers = nn.Sequential( + nn.Linear(4, hidden_dim), + nn.ReLU(), + nn.Linear(hidden_dim, 3), + ) + def forward(self, x): + return self.layers(x) + +iris = load_iris() +scaler = StandardScaler() +X = scaler.fit_transform(iris.data).astype('float32') # pyright: ignore +y = iris.target.astype('int64') # pyright: ignore + +dataset = TensorDataset(torch.from_numpy(X), torch.from_numpy(y)) +trainloader = DataLoader(dataset, batch_size=16, shuffle=True) + +def train_model(name: str, dim): + net = Iris_MLP(hidden_dim=dim) + loss_fn = nn.CrossEntropyLoss() + optimizer = torch.optim.Adam(net.parameters(), lr=1e-2) + + print(f"Training {name} ({dim} neurons)...") + for epoch in range(100): + global loss + for data in trainloader: + inputs, targets = data + optimizer.zero_grad() + outputs = net(inputs) + loss = loss_fn(outputs, targets) + loss.backward() + optimizer.step() + if (epoch + 1) % 10 == 0: + print(f" Epoch {epoch+1}, Loss: {loss.item():.4f}") + return net + +if __name__ == "__main__": + torch_net_a = train_model("Network A", 10).eval() + + with torch.no_grad(): + torch_net_a.layers[0].weight[9] = -1.0 # pyright: ignore + torch_net_a.layers[0].bias[9] = -10.0 # pyright: ignore + + torch_net_b = Iris_MLP(10).eval() + torch_net_b.load_state_dict(torch_net_a.state_dict()) + + torch_net_b.layers[2].weight[:, 9] = 0.0 # pyright: ignore + + torch.onnx.export(torch_net_a, (torch.randn(1, 4),), "iris_a.onnx") + torch.onnx.export(torch_net_b, (torch.randn(1, 4),), "iris_b.onnx") diff --git a/examples/iris/iris_wide_to_deep.py b/examples/iris/iris_wide_to_deep.py new file mode 100644 index 0000000..34a82ae --- /dev/null +++ b/examples/iris/iris_wide_to_deep.py @@ -0,0 +1,83 @@ +import torch, torch.nn as nn +from sklearn.datasets import load_iris +from sklearn.preprocessing import StandardScaler +from torch.utils.data import DataLoader, TensorDataset + +class Wide_Iris_MLP(nn.Module): + def __init__(self, hidden_dim): + super().__init__() + self.layers = nn.Sequential( + nn.Linear(4, hidden_dim), + nn.ReLU(), + nn.Linear(hidden_dim, 3), + ) + def forward(self, x): + return self.layers(x) + +class Deep_Block(nn.Module): + def __init__(self, w1, b1, w2): + super().__init__() + self.fc1 = nn.Linear(4, 1) + self.fc1.weight.data = w1.clone().unsqueeze(0) + self.fc1.bias.data = b1.clone().unsqueeze(0) + self.fc2 = nn.Linear(1, 3, bias=False) + self.fc2.weight.data = w2.clone().unsqueeze(1) + + def forward(self, x_s): + x, s = x_s + z = torch.relu(self.fc1(x)) + ds = self.fc2(z) + return (x, s + ds) + +class Deep_Iris_MLP(nn.Module): + def __init__(self, wide_net): + super().__init__() + w1 = wide_net.layers[0].weight.data + b1 = wide_net.layers[0].bias.data + w2 = wide_net.layers[2].weight.data + b2 = wide_net.layers[2].bias.data + num_neurons = w1.shape[0] + + self.blocks = nn.Sequential(*[ + Deep_Block(w1[j], b1[j], w2[:, j]) for j in range(num_neurons) + ]) + self.final_bias = nn.Parameter(b2.clone()) + + def forward(self, x): + s = torch.zeros(x.shape[0], 3, device=x.device) + _, final_s = self.blocks((x, s)) + return final_s + self.final_bias + +iris = load_iris() +scaler = StandardScaler() +X = scaler.fit_transform(iris.data).astype('float32') # pyright: ignore +y = iris.target.astype('int64') # pyright: ignore + +dataset = TensorDataset(torch.from_numpy(X), torch.from_numpy(y)) +trainloader = DataLoader(dataset, batch_size=16, shuffle=True) + +def train_model(name:str, dim): + net = Wide_Iris_MLP(hidden_dim=dim) + loss_fn = nn.CrossEntropyLoss() + optimizer = torch.optim.Adam(net.parameters(), lr=1e-2) + + print(f"Training {name} ({dim} neurons)...") + for epoch in range(100): + global loss + for data in trainloader: + inputs, targets = data + optimizer.zero_grad() + outputs = net(inputs) + loss = loss_fn(outputs, targets) + loss.backward() + optimizer.step() + if (epoch + 1) % 10 == 0: + print(f" Epoch {epoch+1}, Loss: {loss.item():.4f}") + return net + +if __name__ == "__main__": + torch_net_a = train_model("Wide Network", 10).eval() + torch_net_b = Deep_Iris_MLP(torch_net_a).eval() + + torch.onnx.export(torch_net_a, (torch.randn(1, 4),), "iris_a.onnx") + torch.onnx.export(torch_net_b, (torch.randn(1, 4),), "iris_b.onnx") diff --git a/examples/mnist/mnist.py b/examples/mnist/mnist.py index 0a81878..a1706be 100644 --- a/examples/mnist/mnist.py +++ b/examples/mnist/mnist.py @@ -24,7 +24,7 @@ def train_model(name: str, dim): optimizer = torch.optim.Adam(net.parameters(), lr=0.5e-4) print(f"Training {name} ({dim} neurons)...") - for epoch in range(100): + for epoch in range(10): global loss for data in trainloader: inputs, targets = data @@ -33,8 +33,7 @@ def train_model(name: str, dim): loss = loss_fn(outputs, targets) loss.backward() optimizer.step() - if (epoch + 1) % 10 == 0: - print(f" Epoch {epoch+1}, Loss: {loss.item():.4f}") + print(f" Epoch {epoch+1}, Loss: {loss.item():.4f}") return net if __name__ == "__main__": diff --git a/examples/mnist/mnist_deep_to_wide.py b/examples/mnist/mnist_deep_to_wide.py new file mode 100644 index 0000000..6c4dde9 --- /dev/null +++ b/examples/mnist/mnist_deep_to_wide.py @@ -0,0 +1,85 @@ +import torch, torch.nn as nn +from torchvision.datasets import MNIST +from torch.utils.data import DataLoader +from torchvision import transforms + +class Deep_MNIST_Block(nn.Module): + def __init__(self): + super().__init__() + self.fc1 = nn.Linear(784, 1) + self.fc2 = nn.Linear(1, 10, bias=False) + + def forward(self, x_s): + x, s = x_s + z = torch.relu(self.fc1(x)) + ds = self.fc2(z) + return (x, s + ds) + +class Deep_MNIST_MLP(nn.Module): + def __init__(self, num_blocks): + super().__init__() + self.flatten = nn.Flatten() + self.blocks = nn.Sequential(*[Deep_MNIST_Block() for _ in range(num_blocks)]) + self.final_bias = nn.Parameter(torch.zeros(10)) + + def forward(self, x): + x = self.flatten(x) + s = torch.zeros(x.shape[0], 10, device=x.device) + _, final_s = self.blocks((x, s)) + return final_s + self.final_bias + +class Wide_MNIST_MLP(nn.Module): + def __init__(self, deep_net): + super().__init__() + self.flatten = nn.Flatten() + num_neurons = len(deep_net.blocks) + self.layers = nn.Sequential( + nn.Linear(784, num_neurons), + nn.ReLU(), + nn.Linear(num_neurons, 10), + ) + with torch.no_grad(): + w1_all = [] + b1_all = [] + w2_all = [] + + for block in deep_net.blocks: + w1_all.append(block.fc1.weight.data) + b1_all.append(block.fc1.bias.data) + w2_all.append(block.fc2.weight.data) + + self.layers[0].weight.copy_(torch.cat(w1_all, dim=0)) # pyright: ignore + self.layers[0].bias.copy_(torch.cat(b1_all, dim=0)) # pyright: ignore + self.layers[2].weight.copy_(torch.cat(w2_all, dim=1)) # pyright: ignore + self.layers[2].bias.copy_(deep_net.final_bias.data) # pyright: ignore + + def forward(self, x): + x = self.flatten(x) + return self.layers(x) + +train_dataset = MNIST('./', download=True, transform=transforms.ToTensor(), train=True) +trainloader = DataLoader(train_dataset, batch_size=128, shuffle=True) + +def train_deep_model(name: str, num_blocks): + net = Deep_MNIST_MLP(num_blocks=num_blocks) + loss_fn = nn.CrossEntropyLoss() + optimizer = torch.optim.Adam(net.parameters(), lr=1e-3) + + print(f"Training {name} ({num_blocks} blocks)...") + for epoch in range(10): + global loss + for inputs, targets in trainloader: + optimizer.zero_grad() + outputs = net(inputs) + loss = loss_fn(outputs, targets) + loss.backward() + optimizer.step() + print(f" Epoch {epoch+1}, Loss: {loss.item():.4f}") + return net + +if __name__ == "__main__": + torch_net_a = train_deep_model("Deep Network", 8).eval() + torch_net_b = Wide_MNIST_MLP(torch_net_a).eval() + + torch.onnx.export(torch_net_a, (torch.randn(1, 28, 28),), "mnist_a.onnx") + torch.onnx.export(torch_net_b, (torch.randn(1, 28, 28),), "mnist_b.onnx") diff --git a/examples/mnist/mnist_stably_active.py b/examples/mnist/mnist_stably_active.py new file mode 100644 index 0000000..267d682 --- /dev/null +++ b/examples/mnist/mnist_stably_active.py @@ -0,0 +1,69 @@ +import torch, torch.nn as nn +from torchvision.datasets import MNIST +from torch.utils.data import DataLoader +from torchvision import transforms + +class MNIST_MLP(nn.Module): + def __init__(self, hidden_dim): + super().__init__() + self.flatten = nn.Flatten() + self.layers = nn.Sequential( + nn.Linear(784, hidden_dim), + nn.ReLU(), + nn.Linear(hidden_dim, 10), + ) + def forward(self, x): + x = self.flatten(x) + return self.layers(x) + +class MNIST_Linear(nn.Module): + def __init__(self, weight, bias): + super().__init__() + self.flatten = nn.Flatten() + self.fc = nn.Linear(784, 10) + self.fc.weight.data = weight + self.fc.bias.data = bias + def forward(self, x): + x = self.flatten(x) + return self.fc(x) + +train_dataset = MNIST('./', download=True, transform=transforms.ToTensor(), train=True) +trainloader = DataLoader(train_dataset, batch_size=128, shuffle=True) + +def train_model(name: str, dim): + net = MNIST_MLP(hidden_dim=dim) + loss_fn = nn.CrossEntropyLoss() + optimizer = torch.optim.Adam(net.parameters(), lr=0.5e-4) + + print(f"Training {name} ({dim} neurons)...") + for epoch in range(10): + global loss + for data in trainloader: + inputs, targets = data + optimizer.zero_grad() + outputs = net(inputs) + loss = loss_fn(outputs, targets) + loss.backward() + optimizer.step() + print(f" Epoch {epoch+1}, Loss: {loss.item():.4f}") + return net + +if __name__ == "__main__": + torch_net_a = train_model("Base Network", 6).eval() + + with torch.no_grad(): + torch_net_a.layers[0].weight.fill_(0.01) # pyright: ignore + torch_net_a.layers[0].bias.fill_(5.0) # pyright: ignore + + W1 = torch_net_a.layers[0].weight.data + b1 = torch_net_a.layers[0].bias.data + W2 = torch_net_a.layers[2].weight.data + b2 = torch_net_a.layers[2].bias.data + + W_collapsed = torch.matmul(W2, W1) # pyright: ignore + b_collapsed = torch.matmul(W2, b1) + b2 # pyright: ignore + + torch_net_b = MNIST_Linear(W_collapsed, b_collapsed).eval() + + torch.onnx.export(torch_net_a, (torch.randn(1, 28, 28),), "mnist_a.onnx") + torch.onnx.export(torch_net_b, (torch.randn(1, 28, 28),), "mnist_b.onnx") diff --git a/examples/mnist/mnist_stably_inactive.py b/examples/mnist/mnist_stably_inactive.py new file mode 100644 index 0000000..ad81461 --- /dev/null +++ b/examples/mnist/mnist_stably_inactive.py @@ -0,0 +1,52 @@ +import torch, torch.nn as nn +from torchvision.datasets import MNIST +from torch.utils.data import DataLoader +from torchvision import transforms + +class MNIST_MLP(nn.Module): + def __init__(self, hidden_dim): + super().__init__() + self.layers = nn.Sequential( + nn.Flatten(), + nn.Linear(28 * 28, hidden_dim), + nn.ReLU(), + nn.Linear(hidden_dim, 10), + ) + def forward(self, x): + return self.layers(x) + +train_dataset = MNIST('./', download=True, transform=transforms.ToTensor(), train=True) +trainloader = DataLoader(train_dataset, batch_size=128, shuffle=True) + +def train_model(name: str, dim): + net = MNIST_MLP(hidden_dim=dim) + loss_fn = nn.CrossEntropyLoss() + optimizer = torch.optim.Adam(net.parameters(), lr=0.5e-4) + + print(f"Training {name} ({dim} neurons)...") + for epoch in range(10): + global loss + for data in trainloader: + inputs, targets = data + optimizer.zero_grad() + outputs = net(inputs) + loss = loss_fn(outputs, targets) + loss.backward() + optimizer.step() + print(f" Epoch {epoch+1}, Loss: {loss.item():.4f}") + return net + +if __name__ == "__main__": + torch_net_a = train_model("Base Network", 6).eval() + + with torch.no_grad(): + torch_net_a.layers[1].weight[5] = -1.0 # pyright: ignore + torch_net_a.layers[1].bias[5] = -1.0 # pyright: ignore + + torch_net_b = MNIST_MLP(6).eval() + torch_net_b.load_state_dict(torch_net_a.state_dict()) + + torch_net_b.layers[3].weight[:, 5] = 0.0 # pyright: ignore + + torch.onnx.export(torch_net_a, (torch.randn(1, 28, 28),), "mnist_a.onnx") + torch.onnx.export(torch_net_b, (torch.randn(1, 28, 28),), "mnist_b.onnx") diff --git a/examples/mnist/mnist_wide_to_deep.py b/examples/mnist/mnist_wide_to_deep.py new file mode 100644 index 0000000..64f4ec7 --- /dev/null +++ b/examples/mnist/mnist_wide_to_deep.py @@ -0,0 +1,82 @@ +import torch, torch.nn as nn +from torchvision.datasets import MNIST +from torch.utils.data import DataLoader +from torchvision import transforms + +class Wide_MNIST_MLP(nn.Module): + def __init__(self, hidden_dim): + super().__init__() + self.flatten = nn.Flatten() + self.layers = nn.Sequential( + nn.Linear(28 * 28, hidden_dim), + nn.ReLU(), + nn.Linear(hidden_dim, 10), + ) + def forward(self, x): + x = self.flatten(x) + return self.layers(x) + +class Deep_MNIST_Block(nn.Module): + def __init__(self, w1, b1, w2): + super().__init__() + self.fc1 = nn.Linear(784, 1) + self.fc1.weight.data = w1.clone().unsqueeze(0) + self.fc1.bias.data = b1.clone().unsqueeze(0) + self.fc2 = nn.Linear(1, 10, bias=False) + self.fc2.weight.data = w2.clone().unsqueeze(1) + + def forward(self, x_s): + x, s = x_s + z = torch.relu(self.fc1(x)) + ds = self.fc2(z) + return (x, s + ds) + +class Deep_MNIST_MLP(nn.Module): + def __init__(self, wide_net): + super().__init__() + self.flatten = nn.Flatten() + + w1 = wide_net.layers[0].weight.data + b1 = wide_net.layers[0].bias.data + w2 = wide_net.layers[2].weight.data + b2 = wide_net.layers[2].bias.data + num_neurons = w1.shape[0] + + self.blocks = nn.Sequential(*[ + Deep_MNIST_Block(w1[j], b1[j], w2[:, j]) for j in range(num_neurons) + ]) + self.final_bias = nn.Parameter(b2.clone()) + + def forward(self, x): + x = self.flatten(x) + s = torch.zeros(x.shape[0], 10, device=x.device) + _, final_s = self.blocks((x, s)) + return final_s + self.final_bias + +train_dataset = MNIST('./', download=True, transform=transforms.ToTensor(), train=True) +trainloader = DataLoader(train_dataset, batch_size=128, shuffle=True) + +def train_model(name: str, dim): + net = Wide_MNIST_MLP(hidden_dim=dim) + loss_fn = nn.CrossEntropyLoss() + optimizer = torch.optim.Adam(net.parameters(), lr=0.5e-4) + + print(f"Training {name} ({dim} neurons)...") + for epoch in range(10): + global loss + for data in trainloader: + inputs, targets = data + optimizer.zero_grad() + outputs = net(inputs) + loss = loss_fn(outputs, targets) + loss.backward() + optimizer.step() + print(f" Epoch {epoch+1}, Loss: {loss.item():.4f}") + return net + +if __name__ == "__main__": + torch_net_a = train_model("Wide Network", 8).eval() + torch_net_b = Deep_MNIST_MLP(torch_net_a).eval() + + torch.onnx.export(torch_net_a, (torch.randn(1, 28, 28),), "mnist_a.onnx") + torch.onnx.export(torch_net_b, (torch.randn(1, 28, 28),), "mnist_b.onnx") @@ -178,12 +178,10 @@ def inpla_export(model: onnx.ModelProto, bounds: Optional[Dict[str, List[float]] script.append(f"{v} ~ {sink};") def op_flatten(node): - out_name, in_name = node.output[0], node.input[0] - if out_name in interactions: - interactions[in_name] = interactions[out_name] + op_identity(node) def op_reshape(node): - op_flatten(node) + op_identity(node) def op_add(node): out_name = node.output[0] @@ -242,6 +240,36 @@ def inpla_export(model: onnx.ModelProto, bounds: Optional[Dict[str, List[float]] interactions[in_a][i].append(f"Add({sink}, Mul({v_b}, Concrete(-1.0)))") interactions[in_b][i].append(f"{v_b}") + def op_slice(node): + in_name, out_name = node.input[0], node.output[0] + if out_name in interactions: + starts = initializers.get(node.input[1]) + steps = initializers.get(node.input[4]) if len(node.input) > 4 else None + + start = int(starts.flatten()[0]) if starts is not None else 0 + step = int(steps.flatten()[0]) if steps is not None else 1 + + in_dim = get_dim(in_name) or 1 + if in_name not in interactions: + interactions[in_name] = [[] for _ in range(in_dim)] + + for i, terms in enumerate(interactions[out_name]): + input_index = start + (i * step) + if input_index < in_dim: + interactions[in_name][input_index].extend(terms) + + def op_squeeze(node): + op_identity(node) + + def op_unsqueeze(node): + op_identity(node) + + def op_identity(node): + in_name, out_name = node.input[0], node.output[0] + if out_name in interactions: + interactions[in_name] = interactions[out_name] + + graph, initializers = model.graph, get_initializers(model.graph) wire_gen = NameGen("w") interactions: Dict[str, List[List[str]]] = {} @@ -253,7 +281,11 @@ def inpla_export(model: onnx.ModelProto, bounds: Optional[Dict[str, List[float]] "Reshape": op_reshape, "MatMul": op_matmul, "Add": op_add, - "Sub": op_sub + "Sub": op_sub, + "Slice": op_slice, + "Squeeze": op_squeeze, + "Unsqueeze": op_unsqueeze, + "Identity": op_identity } if graph.output: @@ -284,7 +316,8 @@ def inpla_run(model: str) -> str: temp_path = f.name try: res = subprocess.run(["./inpla", "-f", temp_path, "-foptimise-tail-calls"], capture_output=True, text=True) - if res.stderr: print(res.stderr) + if res.stderr: + raise RuntimeError(res.stderr) return res.stdout finally: if os.path.exists(temp_path): |
