Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,70 @@ void SplitSelector::UpdateBuilder(NodesToOptimizeIndicesBuilder& builder) const
builder.num_output_defs = 1; // set to 1 as the first output is variadic
}

// Validates that the bias DQ's scale matches input_scale * weight_scale[i] for each output channel.
// ONNX QLinearConv requires bias to be in int32 with scale = x_scale * w_scale[i].
// If this condition is violated, the fused output would be silently incorrect.
// Returns false (conservative) if any scale initializer is not a constant or types are non-conformant.
static bool CheckConvBiasScale(const GraphViewer& graph_viewer,
const Node& input_dq, const Node& weight_dq, const Node& bias_dq) {
const auto* x_scale_arg = input_dq.InputDefs()[QDQ::InputIndex::SCALE_ID];
const auto* w_scale_arg = weight_dq.InputDefs()[QDQ::InputIndex::SCALE_ID];
const auto* b_scale_arg = bias_dq.InputDefs()[QDQ::InputIndex::SCALE_ID];

const auto* x_scale_proto = graph_viewer.GetConstantInitializer(x_scale_arg->Name(), true);
const auto* w_scale_proto = graph_viewer.GetConstantInitializer(w_scale_arg->Name(), true);
const auto* b_scale_proto = graph_viewer.GetConstantInitializer(b_scale_arg->Name(), true);

if (!x_scale_proto || !w_scale_proto || !b_scale_proto) {
return false; // conservative: cannot verify
}

// Input scale must be scalar (rank 0 or 1-element rank-1).
if (x_scale_proto->dims_size() != 0 &&
!(x_scale_proto->dims_size() == 1 && x_scale_proto->dims(0) == 1)) {
return false;
}

const Initializer x_scale_init{graph_viewer.GetGraph(), *x_scale_proto, graph_viewer.ModelPath()};
const Initializer w_scale_init{graph_viewer.GetGraph(), *w_scale_proto, graph_viewer.ModelPath()};
const Initializer b_scale_init{graph_viewer.GetGraph(), *b_scale_proto, graph_viewer.ModelPath()};

// All scales must be float32 for standard QLinearConv.
if (x_scale_init.data_type() != ONNX_NAMESPACE::TensorProto_DataType_FLOAT ||
w_scale_init.data_type() != ONNX_NAMESPACE::TensorProto_DataType_FLOAT ||
b_scale_init.data_type() != ONNX_NAMESPACE::TensorProto_DataType_FLOAT) {
return false;
}

const auto x_scales = x_scale_init.DataAsSpan<float>();
const auto w_scales = w_scale_init.DataAsSpan<float>();
const auto b_scales = b_scale_init.DataAsSpan<float>();

const float x_scale = x_scales[0];
const size_t num_channels = w_scales.size(); // 1 for per-tensor, C_out for per-channel
const size_t b_num = b_scales.size();

// b_scale must be scalar or match num_channels.
if (b_num != 1 && b_num != num_channels) {
return false;
}
Comment on lines +452 to +459
Copy link

Copilot AI Apr 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheckConvBiasScale derives num_channels from w_scales.size(), which is 1 for per-tensor weight scales even when the Conv has multiple output channels. This makes the loop validate only one channel and also rejects valid cases where b_scale is a per-channel vector (length C_out) but w_scale is scalar (all channels share the same expected bias scale). Consider deriving the channel count from the per-channel scale vector when present (e.g., use b_scales.size() when >1, else w_scales.size()), and when w_scales.size()==1 compare each channel against w_scales[0] while allowing b_scale to be either scalar or length C_out.

Copilot uses AI. Check for mistakes.

// Tolerance values matching convention in optimizer/utils.cc.
constexpr float atol = 1e-6f;
constexpr float rtol = 1e-2f;

for (size_t i = 0; i < num_channels; ++i) {
const float w_scale = w_scales[i];
const float b_scale = (b_num == 1) ? b_scales[0] : b_scales[i];
const float expected = x_scale * w_scale;
if (std::abs(b_scale - expected) > (atol + rtol * std::abs(expected))) {
return false;
}
}

return true;
}

bool ConvNodeGroupSelector::Check(const GraphViewer& graph_viewer, const Node& node, const Node* redundant_clip_node,
const std::vector<const Node*>& dq_nodes,
const std::vector<const Node*>& q_nodes) const {
Expand Down Expand Up @@ -440,6 +504,12 @@ bool ConvNodeGroupSelector::Check(const GraphViewer& graph_viewer, const Node& n
if (dt_bias != ONNX_NAMESPACE::TensorProto_DataType::TensorProto_DataType_INT32) {
return false;
}

// Verify bias scale == input_scale * weight_scale[i] per ONNX QLinearConv spec.
// If scales don't match within tolerance, skip fusion to avoid silent numerical errors.
if (!CheckConvBiasScale(graph_viewer, *dq_nodes[0], *dq_nodes[1], *dq_nodes[2])) {
return false;
}
}

// 16-bit int types must be explicitly allowed.
Expand Down
178 changes: 178 additions & 0 deletions onnxruntime/test/python/quantization/test_qdq.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
get_tensor_consumers_and_producers,
)

import onnxruntime as ort
from onnxruntime.quantization import QDQQuantizer, QuantFormat, QuantType, quantize_static, write_calibration_table
from onnxruntime.quantization.calibrate import CalibrationMethod, TensorData, TensorsData
from onnxruntime.quantization.quant_utils import quantize_nparray
Expand Down Expand Up @@ -2224,5 +2225,182 @@ def test_quantize_with_prequantized_input(self):
self.assertEqual(zp_val, zp_val_original)


class TestConvBiasScaleValidation(unittest.TestCase):
"""Tests that QDQ Conv -> QLinearConv fusion is skipped when bias scale does not match
input_scale * weight_scale, as required by the ONNX QLinearConv specification."""

@classmethod
def setUpClass(cls):
cls._tmp_model_dir = tempfile.TemporaryDirectory(prefix="ort.qdq.bias_scale_validation_")
# Note: swap with the commented line if you want to see the models in local test dir.
cls._tmp_dir_path = cls._tmp_model_dir.name
# cls._tmp_dir_path = "."

@classmethod
def tearDownClass(cls):
cls._tmp_model_dir.cleanup()

def build_qdq_conv_model(
self,
inp_shape: list[int],
weight_shape: list[int],
x_scale: float,
w_scale: float,
b_scale: float,
x_zp: int = 128,
w_zp: int = 0,
) -> onnx.ModelProto:
"""Builds a QDQ Conv model (DQ->Conv->Q) with the given quantization scales.

The bias is encoded as int32 with the provided b_scale. When b_scale != x_scale * w_scale
the model is non-conformant and fusion must be skipped.
"""
num_out_channels = weight_shape[0]
np_x_scale = np.float32(x_scale)
np_w_scale = np.float32(w_scale)
np_b_scale = np.float32(b_scale)

# --- Quantized input (graph input: uint8) ---
input_q = onnx.helper.make_tensor_value_info("input_q", onnx.TensorProto.UINT8, inp_shape)

# --- Weight: random int8 constant ---
rng = np.random.default_rng(42)
weight_quant_data = rng.integers(-5, 6, size=weight_shape, dtype=np.int8)
weight_quant = onnx.numpy_helper.from_array(weight_quant_data, "weight_q")

# --- Scales / zero-points as scalar constants ---
x_scale_tensor = onnx.numpy_helper.from_array(np.array(np_x_scale, dtype=np.float32), "x_scale")
x_zp_tensor = onnx.numpy_helper.from_array(np.array(x_zp, dtype=np.uint8), "x_zp")
w_scale_tensor = onnx.numpy_helper.from_array(np.array(np_w_scale, dtype=np.float32), "w_scale")
w_zp_tensor = onnx.numpy_helper.from_array(np.array(w_zp, dtype=np.int8), "w_zp")
b_scale_tensor = onnx.numpy_helper.from_array(np.array(np_b_scale, dtype=np.float32), "b_scale")
b_zp_tensor = onnx.numpy_helper.from_array(np.array(0, dtype=np.int32), "b_zp")

# Bias as int32 (quantized with b_scale)
bias_float = np.ones(num_out_channels, dtype=np.float32)
bias_q_data = np.round(bias_float / np_b_scale).astype(np.int32)
bias_q = onnx.numpy_helper.from_array(bias_q_data, "bias_q")

# Output scale/zp for Q node
out_scale_val = np.float32(x_scale)
out_zp_val = np.uint8(128)
out_scale_tensor = onnx.numpy_helper.from_array(np.array(out_scale_val, dtype=np.float32), "out_scale")
out_zp_tensor = onnx.numpy_helper.from_array(np.array(out_zp_val, dtype=np.uint8), "out_zp")

# Nodes: DQ(input) -> | -> Conv -> Q(output)
# DQ(weight) -/
# DQ(bias) -/
dq_input = onnx.helper.make_node(
"DequantizeLinear", ["input_q", "x_scale", "x_zp"], ["input_f"], name="DQ_input"
)
dq_weight = onnx.helper.make_node(
"DequantizeLinear", ["weight_q", "w_scale", "w_zp"], ["weight_f"], name="DQ_weight"
)
dq_bias = onnx.helper.make_node("DequantizeLinear", ["bias_q", "b_scale", "b_zp"], ["bias_f"], name="DQ_bias")
conv_node = onnx.helper.make_node("Conv", ["input_f", "weight_f", "bias_f"], ["conv_out"], name="Conv0")
q_output = onnx.helper.make_node(
"QuantizeLinear", ["conv_out", "out_scale", "out_zp"], ["output_q"], name="Q_output"
)

output_q = onnx.helper.make_tensor_value_info("output_q", onnx.TensorProto.UINT8, None)

graph = onnx.helper.make_graph(
[dq_input, dq_weight, dq_bias, conv_node, q_output],
"QDQConvBiasScaleTest",
[input_q],
[output_q],
initializer=[
weight_quant,
x_scale_tensor,
x_zp_tensor,
w_scale_tensor,
w_zp_tensor,
b_scale_tensor,
b_zp_tensor,
bias_q,
out_scale_tensor,
out_zp_tensor,
],
)
opset_imports = [onnx.helper.make_opsetid("", 21)]
model = onnx.helper.make_model(graph, opset_imports=opset_imports)
model = onnx.shape_inference.infer_shapes(model)
return model

def _run_model(self, model_path: str, inputs: dict, optimize: bool) -> list:
"""Run model with the given optimization level and return outputs."""
sess_opts = ort.SessionOptions()
if optimize:
sess_opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
else:
sess_opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_DISABLE_ALL
sess = ort.InferenceSession(model_path, sess_options=sess_opts, providers=["CPUExecutionProvider"])
return sess.run(None, inputs)

def test_mismatched_bias_scale_skips_fusion(self):
"""When bias_scale != input_scale * weight_scale, fusion must be skipped.

The outputs of the optimized and unoptimized sessions should match, proving
that no silent numerical corruption occurred.
"""
inp_shape = [1, 1, 4, 4]
weight_shape = [1, 1, 1, 1]

x_scale = 0.05
w_scale = 0.02
# Intentionally wrong bias scale: 2x the correct value
correct_b_scale = x_scale * w_scale
wrong_b_scale = 2.0 * correct_b_scale

model = self.build_qdq_conv_model(inp_shape, weight_shape, x_scale, w_scale, wrong_b_scale)
model_path = os.path.join(self._tmp_dir_path, "conv_mismatched_bias_scale.qdq.onnx")
onnx.save_model(model, model_path)

# Build a representative input
rng = np.random.default_rng(0)
input_data = rng.integers(100, 200, size=inp_shape, dtype=np.uint8)
inputs = {"input_q": input_data}

out_optimized = self._run_model(model_path, inputs, optimize=True)
out_unoptimized = self._run_model(model_path, inputs, optimize=False)

# Both should produce the same uint8 output (fusion was skipped or fallback path is correct).
np.testing.assert_array_equal(
out_optimized[0],
out_unoptimized[0],
err_msg="Mismatched bias scale: optimized and unoptimized outputs differ, "
"indicating silent numerical corruption from incorrect fusion.",
)

def test_matching_bias_scale_allows_fusion(self):
"""When bias_scale == input_scale * weight_scale, fusion should be allowed.

Both optimized and unoptimized sessions must produce identical outputs.
"""
inp_shape = [1, 1, 4, 4]
weight_shape = [1, 1, 1, 1]

Comment on lines +2375 to +2382
Copy link

Copilot AI Apr 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests only compare optimized vs unoptimized outputs; they don’t assert whether fusion actually occurred (or was skipped). That can lead to false positives if fusion is disabled for an unrelated reason (both tests would still pass). Consider saving the optimized model (e.g., via SessionOptions.optimized_model_filepath) and asserting QLinearConv is present for the matching-scale case and absent for the mismatched-scale case (or use existing op-count helpers) so the tests directly validate the fusion decision.

Copilot uses AI. Check for mistakes.
x_scale = 0.05
w_scale = 0.02
correct_b_scale = x_scale * w_scale # exactly matching

model = self.build_qdq_conv_model(inp_shape, weight_shape, x_scale, w_scale, correct_b_scale)
model_path = os.path.join(self._tmp_dir_path, "conv_matching_bias_scale.qdq.onnx")
onnx.save_model(model, model_path)

rng = np.random.default_rng(0)
input_data = rng.integers(100, 200, size=inp_shape, dtype=np.uint8)
inputs = {"input_q": input_data}

out_optimized = self._run_model(model_path, inputs, optimize=True)
out_unoptimized = self._run_model(model_path, inputs, optimize=False)

np.testing.assert_array_equal(
out_optimized[0],
out_unoptimized[0],
err_msg="Matching bias scale: optimized and unoptimized outputs differ.",
)


if __name__ == "__main__":
unittest.main()