diff options
| author | ericmarin <maarin.eric@gmail.com> | 2026-03-31 11:14:57 +0200 |
|---|---|---|
| committer | ericmarin <maarin.eric@gmail.com> | 2026-03-31 16:43:27 +0200 |
| commit | d1b25fbde6b01529fd1bcfdd5778b6cb378eb865 (patch) | |
| tree | 0e121831cad6af60415eccccfbdb4c337d841c3a | |
| parent | dbfc224384cd38b26c3e32d4c4dd8be7bb6d5bdc (diff) | |
| download | vein-d1b25fbde6b01529fd1bcfdd5778b6cb378eb865.tar.gz vein-d1b25fbde6b01529fd1bcfdd5778b6cb378eb865.zip | |
added ONNX Add and ONNX Sub
| -rw-r--r-- | nneq.py | 108 | ||||
| -rw-r--r-- | proof.md | 28 | ||||
| -rw-r--r-- | proof.norg | 28 |
3 files changed, 142 insertions, 22 deletions
@@ -16,6 +16,7 @@ import re import numpy as np import subprocess import onnx +import onnx.shape_inference from onnx import numpy_helper from typing import List, Dict, Optional, Tuple import os @@ -144,7 +145,7 @@ def inpla_export(model: onnx.ModelProto, bounds: Optional[Dict[str, List[float]] for i in range(in_dim): weight = float(alpha * W[j, i]) if weight != 0: - v = var_gen.next() + v = wire_gen.next() interactions[node.input[0]][i].append(f"Mul({v}, Concrete({weight}))") neuron_terms.append(v) @@ -153,35 +154,92 @@ def inpla_export(model: onnx.ModelProto, bounds: Optional[Dict[str, List[float]] neuron_terms.append(f"Concrete({bias_val})") balance_add(neuron_terms, sink) - yield from [] def op_matmul(node): - return op_gemm(node, override_attrs={"alpha": 1.0, "beta": 0.0, "transB": 0}) + op_gemm(node, override_attrs={"alpha": 1.0, "beta": 0.0, "transB": 0}) def op_relu(node): out_name, in_name = node.output[0], node.input[0] - if out_name in interactions: - dim = len(interactions[out_name]) - if in_name not in interactions: - interactions[in_name] = [[] for _ in range(dim)] - for i in range(dim): - sink = flatten_nest("Dup", interactions[out_name][i]) - v = var_gen.next() - interactions[in_name][i].append(f"ReLU({v})") - script.append(f"{v} ~ {sink};") - yield from [] + dim = get_dim(out_name) or 1 + + if in_name not in interactions: + interactions[in_name] = [[] for _ in range(dim)] + + out_terms = interactions.get(node.output[0]) or [[f"Materialize(result{j})"] for j in range(dim)] + + for i in range(dim): + sink = flatten_nest("Dup", out_terms[i]) + v = wire_gen.next() + interactions[in_name][i].append(f"ReLU({v})") + script.append(f"{v} ~ {sink};") def op_flatten(node): out_name, in_name = node.output[0], node.input[0] if out_name in interactions: interactions[in_name] = interactions[out_name] - yield from [] def op_reshape(node): - return op_flatten(node) + op_flatten(node) + + def op_add(node): + out_name = node.output[0] + in_a, in_b = node.input[0], node.input[1] + + dim = get_dim(out_name) or get_dim(in_a) or get_dim(in_b) or 1 + + if in_a not in interactions: interactions[in_a] = [[] for _ in range(dim)] + if in_b not in interactions: interactions[in_b] = [[] for _ in range(dim)] + + out_terms = interactions.get(out_name) or [[f"Materialize(result{j})"] for j in range(dim)] + + b_const = initializers.get(in_b) + a_const = initializers.get(in_a) + + for i in range(dim): + sink = flatten_nest("Dup", out_terms[i]) + if b_const is not None: + val = float(b_const.flatten()[i % b_const.size]) + interactions[in_a][i].append(f"Add({sink}, Concrete({val}))") + elif a_const is not None: + val = float(a_const.flatten()[i % a_const.size]) + interactions[in_b][i].append(f"Add({sink}, Concrete({val}))") + else: + v_b = wire_gen.next() + interactions[in_a][i].append(f"Add({sink}, {v_b})") + interactions[in_b][i].append(f"{v_b}") + + def op_sub(node): + out_name = node.output[0] + in_a, in_b = node.input[0], node.input[1] + + dim = get_dim(out_name) or get_dim(in_a) or get_dim(in_b) or 1 + + if out_name not in interactions: + interactions[out_name] = [[f"Materialize(result{i})"] for i in range(dim)] + + out_terms = interactions.get(out_name) or [[f"Materialize(result{j})"] for j in range(dim)] + + if in_a not in interactions: interactions[in_a] = [[] for _ in range(dim)] + if in_b not in interactions: interactions[in_b] = [[] for _ in range(dim)] + + b_const = initializers.get(in_b) + a_const = initializers.get(in_a) + + for i in range(dim): + sink = flatten_nest("Dup", out_terms[i]) + if b_const is not None: + val = float(b_const.flatten()[i % b_const.size]) + interactions[in_a][i].append(f"Add({sink}, Concrete({-val}))") + elif a_const is not None: + val = float(a_const.flatten()[i % a_const.size]) + interactions[in_b][i].append(f"Add({sink}, Concrete({-val}))") + else: + v_b = wire_gen.next() + interactions[in_a][i].append(f"Add({sink}, Mul({v_b}, Concrete(-1.0)))") + interactions[in_b][i].append(f"{v_b}") graph, initializers = model.graph, get_initializers(model.graph) - var_gen, wire_gen = NameGen("v"), NameGen("w") + wire_gen = NameGen("w") interactions: Dict[str, List[List[str]]] = {} script = [] ops = { @@ -189,7 +247,9 @@ def inpla_export(model: onnx.ModelProto, bounds: Optional[Dict[str, List[float]] "Relu": op_relu, "Flatten": op_flatten, "Reshape": op_reshape, - "MatMul": op_matmul + "MatMul": op_matmul, + "Add": op_add, + "Sub": op_sub } if graph.output: @@ -200,12 +260,13 @@ def inpla_export(model: onnx.ModelProto, bounds: Optional[Dict[str, List[float]] for node in reversed(graph.node): if node.op_type in ops: - for _ in ops[node.op_type](node): pass + ops[node.op_type](node) else: raise RuntimeError(f"Unsupported ONNX operator: {node.op_type}") - if graph.input and graph.input[0].name in interactions: - for i, terms in enumerate(interactions[graph.input[0].name]): + if graph.input: + input = "input" if "input" in interactions else graph.input[0].name + for i, terms in enumerate(interactions[input]): sink = flatten_nest("Dup", terms) script.append(f"{sink} ~ Linear(Symbolic(X_{i}), 1.0, 0.0);") @@ -218,7 +279,7 @@ def inpla_run(model: str) -> str: f.write(f"{rules}\n{model}") temp_path = f.name try: - res = subprocess.run(["./inpla", "-f", temp_path], capture_output=True, text=True) + res = subprocess.run(["./inpla", "-f", temp_path, "-foptimise-tail-calls"], capture_output=True, text=True) if res.stderr: print(res.stderr) return res.stdout finally: @@ -279,7 +340,9 @@ def net(model: onnx.ModelProto, X, bounds: Optional[Dict[str, List[float]]] = No cache_key = (model_hash, bounds_key) if cache_key not in _INPLA_CACHE: - _INPLA_CACHE[cache_key] = inpla_run(inpla_export(model, bounds)) + exported = inpla_export(model, bounds) + reduced = inpla_run(exported) + _INPLA_CACHE[cache_key] = reduced res = z3_evaluate(_INPLA_CACHE[cache_key], X) return res if res is not None else [] @@ -309,6 +372,7 @@ class Solver(z3.Solver): def load_onnx(self, file, name=None): model = onnx.load(file) + model = onnx.shape_inference.infer_shapes(model) self.pending_nets.append((model, name)) def _process_nets(self): @@ -46,6 +46,34 @@ Equal to Gemm with  + +The translation defines the interactions: +\sim&space;a_i) + +By definition this interaction is equal to: + + +By grouping the operations we get: + + +### Sub +ONNX Sub node is defined as: + + +The translation defines the interactions: +\sim&space;a_i) +)\sim&space;b_i) + +By definition this interaction is equal to: + + + +By grouping the operations we get: + + ## Soundness of Interaction Rules ### Materialize The Materialize agent transforms a Linear agent into a tree of explicit mathematical operations @@ -56,6 +56,34 @@ version: 1.1.1 *** Reshape Just identity mapping. +*** Add + ONNX Add node is defined as: + $C = A + B$ + + The translation defines the interactions: + $Add(c_i, b_i) ~ a_i$ + + By definition this interaction is equal to: + $c_i = a_i + b_i$ + + By grouping the operations we get: + $C = A + B$ + +*** Sub + ONNX Sub node is defined as: + $C = A - B$ + + The translation defines the interactions: + $Add(c_i, neg_b_i) ~ a_i$ + $Mul(neg_b_i, Concrete(-1)) ~ b_i$ + + By definition this interaction is equal to: + $c_i = a_i + neg_b_i$ + $neg_b_i = -1 * b_i$ + + By grouping the operations we get: + $C = A - B$ + ** Soundness of Interaction Rules *** Materialize The Materialize agent transforms a Linear agent into a tree of explicit mathematical operations |
