#!/usr/bin/env python # -*- coding: utf-8 -*- ''' @File : exptool.py @Ver : 1.0 @Desc : None @Author : Claude,refined by YangTianxi @Time : 2025/08/09 05:21:17 @Dev Software: Vscode ''' import os import torch import onnx import funcs import onnx.helper as helper import numpy as np import spconv.pytorch as spconv import cumm.tensorview as tv from spconv.core import ConvAlgo from pathlib import Path from getModuleTypes import * #debug avoid_reuse_container = [] obj_to_tensor_id = {} nodes = [] initializers = [] enable_trace = False def register_node(fn): fnnames = fn.split(".") fn_module = eval(".".join(fnnames[:-1])) fn_name = fnnames[-1] oldfn = getattr(fn_module, fn_name) def make_hook(bind_fn): ilayer = 0 def internal_forward(self, *args, **kwargs): global enable_trace if not enable_trace: return oldfn(self, *args,**kwargs) global avoid_reuse_container nonlocal ilayer # Use the enable_trace flag to avoid internal trace calls enable_trace = False y = oldfn(self, *args,**kwargs) try: bind_fn(self, ilayer, y, *args, **kwargs) except TypeError: # Fallback: some bind_fn functions may not accept **kwargs bind_fn(self, ilayer, y, *args) enable_trace = True avoid_reuse_container.extend(list(args) + [y]) ilayer += 1 return y setattr(fn_module, fn_name, internal_forward) return make_hook ''' claude version @register_node("spconv.conv.SparseConvolution.forward") def symbolic_sparse_convolution(self, ilayer, y, x): register_tensor(y) print(f" --> SparseConvolution{ilayer}[{'subm' if self.subm else 'conv'}] -> Input {get_tensor_id(x)}, Output {get_tensor_id(y)}") if self.transposed: output_size = spconv.ops.get_deconv_output_size( x.features.size(), self.kernel_size, self.stride, self.padding, self.dilation, self.output_padding ) else: output_size = spconv.ops.get_conv_output_size( x.features.size(), self.kernel_size, self.stride, self.padding, self.dilation ) if self.subm: output_size[0] = x.features.size(0) output_size[1] = self.out_channels inputs = [ get_tensor_id(x), append_initializer(self.weight.data, f"spconv{ilayer}.weight"), ] if self.bias is not None: inputs.append(append_initializer(self.bias.data, f"spconv{ilayer}.bias")) act_type_name = { tv.gemm.Activation.ReLU : "ReLU", tv.gemm.Activation.None_ : "None", tv.gemm.Activation.Sigmoid : "Sigmoid", tv.gemm.Activation.LeakyReLU : "LeakyReLU" } algo_name = { ConvAlgo.MaskImplicitGemm : "MaskImplicitGemm", ConvAlgo.MaskSplitImplicitGemm : "MaskSplitImplicitGemm", ConvAlgo.Native : "Native", } # Adjust output_bound for different model types output_bound = 200000 if hasattr(self, "output_bound"): output_bound = self.output_bound elif hasattr(x, 'spatial_shape') and len(x.spatial_shape) == 3: # For nuScenes-like datasets, adjust output bound based on spatial shape total_voxels = np.prod(x.spatial_shape) output_bound = min(500000, max(200000, int(total_voxels * 0.1))) nodes.append( helper.make_node( "SparseConvolution", inputs, [get_tensor_id(y)], f"conv{ilayer}", ndim = self.ndim, input_spatial_shape = x.spatial_shape, output_spatial_shape = y.spatial_shape, in_channels = self.in_channels, out_channels = self.out_channels, kernel_size = self.kernel_size, output_bound = output_bound, stride = self.stride, dilation = self.dilation, padding = self.padding, transposed = self.transposed, inverse = self.inverse, output_padding = self.output_padding, groups = self.groups, subm = self.subm, rulebook = self.indice_key if self.indice_key else f"conv{ilayer}", activation = act_type_name.get(getattr(self, 'act_type', tv.gemm.Activation.None_), "None"), input_shape = list(x.features.shape), output_shape = list(y.features.shape) ) ) ''' @register_node("spconv.conv.SparseConvolution.forward") def symbolic_sparse_convolution(self, ilayer, y, x): register_tensor(y) print(f" --> SparseConvolution{ilayer}[{'subm' if self.subm else 'conv'}] -> Input {get_tensor_id(x)}, Output {get_tensor_id(y)}") if self.transposed: output_size = spconv.ops.get_deconv_output_size( x.features.size(), self.kernel_size, self.stride, self.padding, self.dilation, self.output_padding ) else: output_size = spconv.ops.get_conv_output_size( x.features.size(), self.kernel_size, self.stride, self.padding, self.dilation ) if self.subm: output_size[0] = x.features.size(0) output_size[1] = self.out_channels inputs = [ get_tensor_id(x), append_initializer(self.weight.data, f"spconv{ilayer}.weight"), ] if self.bias is not None: inputs.append(append_initializer(self.bias.data, f"spconv{ilayer}.bias")) act_type_name = { tv.gemm.Activation.ReLU : "ReLU", tv.gemm.Activation.None_ : "None", tv.gemm.Activation.Sigmoid : "Sigmoid", tv.gemm.Activation.LeakyReLU : "LeakyReLU" } algo_name = { ConvAlgo.MaskImplicitGemm : "MaskImplicitGemm", ConvAlgo.MaskSplitImplicitGemm : "MaskSplitImplicitGemm", ConvAlgo.Native : "Native", } output_bound = 200000 if hasattr(self, "output_bound"): output_bound = self.output_bound nodes.append( helper.make_node( "SparseConvolution", inputs, [get_tensor_id(y)], f"conv{ilayer}", ndim = self.ndim, input_spatial_shape = x.spatial_shape, output_spatial_shape = y.spatial_shape, in_channels = self.in_channels, out_channels = self.out_channels, kernel_size = self.kernel_size, output_bound = output_bound, stride = self.stride, dilation = self.dilation, padding = self.padding, transposed = self.transposed, inverse = self.inverse, output_padding = self.output_padding, groups = self.groups, subm = self.subm, rulebook = self.indice_key if self.indice_key else "", activation = act_type_name[self.act_type], input_shape = x.features.shape, output_shape = y.features.shape ) ) @register_node("torch.nn.ReLU.forward") def symbolic_relu(self, ilayer, y, x): register_tensor(y) print(f" --> ReLU{ilayer} -> Input {get_tensor_id(x)}, Output {get_tensor_id(y)}") nodes.append( helper.make_node( "Relu", [get_tensor_id(x)], [get_tensor_id(y)], f"relu{ilayer}" ) ) @register_node("torch.nn.functional.relu") def symbolic_functional_relu(x, ilayer, y, inplace=False): register_tensor(y) print(f" --> F.ReLU{ilayer} -> Input {get_tensor_id(x)}, Output {get_tensor_id(y)}") nodes.append( helper.make_node( "Relu", [get_tensor_id(x)], [get_tensor_id(y)], f"f_relu{ilayer}" ) ) @register_node("torch.Tensor.__add__") def symbolic_add(a, ilayer, y, b): register_tensor(y) print(f" --> Add{ilayer} -> Input {get_tensor_id(a)} + {get_tensor_id(b)}, Output {get_tensor_id(y)}") nodes.append( helper.make_node( "Add", [get_tensor_id(a), get_tensor_id(b)], [get_tensor_id(y)], f"add{ilayer}" ) ) ''' @register_node("spconv.core.SparseConvTensor.dense") def node_sparse_conv_tensor_dense(self, ilayer, y): register_tensor(y) print(f" --> ToDense{ilayer}[{self.spatial_shape}][{list(y.size())}] -> Input {get_tensor_id(self)}, Output {get_tensor_id(y)}") nodes.append( helper.make_node( "ScatterDense", [get_tensor_id(self)], [get_tensor_id(y)], f"scatter{ilayer}", input_spatial_shape = self.spatial_shape, format = "zyx", # nuScenes typically uses zyx format output_shape = list(y.size()) ) ) ''' ''' @register_node("spconv.core.SparseConvTensor.dense") def node_sparse_conv_tensor_dense(self, ilayer, y): register_tensor(y) print(f" --> ToDense{ilayer}[{self.spatial_shape}][{list(y.size())}] -> Input {get_tensor_id(self)}, Output {get_tensor_id(y)}") nodes.append( helper.make_node( "ScatterDense", [get_tensor_id(self)], [get_tensor_id(y)], f"scatter{ilayer}", input_spatial_shape = self.spatial_shape, format = "zyx", output_shape = y.size() ) ) ''' @register_node("spconv.core.SparseConvTensor.dense") def node_sparse_conv_tensor_dense(self, ilayer, y): register_tensor(y) print(f" --> ToDense{ilayer}[{self.spatial_shape}][{list(y.size())}] -> Input {get_tensor_id(self)}, Output {get_tensor_id(y)}") # 添加详细的稀疏张量信息 print(f" Sparse tensor features range: [{self.features.min().item():.6f}, {self.features.max().item():.6f}]") print(f" Sparse tensor features mean: {self.features.mean().item():.6f}") print(f" Sparse tensor non-zero features: {(self.features.abs() > 1e-6).sum().item()}/{self.features.numel()}") if hasattr(self, 'indices'): print(f" Sparse tensor indices range: [{self.indices.min().item()}, {self.indices.max().item()}]") print(f" Sparse tensor indices shape: {self.indices.shape}") # 验证坐标是否在有效范围内 if hasattr(self, 'indices') and hasattr(self, 'spatial_shape'): valid_indices = (self.indices[:, 1:] < torch.tensor(self.spatial_shape, device=self.indices.device)).all(dim=1) print(f" Valid indices ratio: {valid_indices.float().mean().item():.4f}") nodes.append( helper.make_node( "ScatterDense", [get_tensor_id(self)], [get_tensor_id(y)], f"scatter{ilayer}", input_spatial_shape = list(self.spatial_shape), format = "zyx", output_shape = list(y.size()) ) ) @register_node("torch.Tensor.view") def node_view(self, ilayer, y, *dims): register_tensor(y) print(f" --> Reshape{ilayer}[{dims}] -> Input {get_tensor_id(self)}, Output {get_tensor_id(y)}") nodes.append( helper.make_node( "Reshape", [get_tensor_id(self)], [get_tensor_id(y)], f"reshape{ilayer}", dims = dims ) ) @register_node("torch.Tensor.permute") def node_permute(self, ilayer, y, *dims): register_tensor(y) print(f" --> Permute{ilayer}[{dims}][{list(y.shape)}] -> Input {get_tensor_id(self)}, Output {get_tensor_id(y)}") nodes.append( helper.make_node( "Transpose", [get_tensor_id(self)], [get_tensor_id(y)], f"transpose{ilayer}", perm = dims # ONNX uses 'perm' instead of 'dims' ) ) # Add support for batch normalization (though it should be fused) @register_node("torch.nn.BatchNorm1d.forward") def symbolic_batchnorm1d(self, ilayer, y, x): register_tensor(y) print(f" --> BatchNorm1d{ilayer} -> Input {get_tensor_id(x)}, Output {get_tensor_id(y)}") print(" WARNING: BatchNorm should be fused for better performance!") inputs = [ get_tensor_id(x), append_initializer(self.weight.data, f"bn{ilayer}.weight"), append_initializer(self.bias.data, f"bn{ilayer}.bias"), append_initializer(self.running_mean.data, f"bn{ilayer}.running_mean"), append_initializer(self.running_var.data, f"bn{ilayer}.running_var") ] nodes.append( helper.make_node( "BatchNormalization", inputs, [get_tensor_id(y)], f"bn{ilayer}", epsilon=self.eps, momentum=self.momentum ) ) # Add support for concatenation (often used in multi-scale features) @register_node("torch.cat") def symbolic_cat(tensors, ilayer, y, dim=0): register_tensor(y) input_ids = [get_tensor_id(t) for t in tensors] print(f" --> Concat{ilayer}[dim={dim}] -> Inputs {input_ids}, Output {get_tensor_id(y)}") nodes.append( helper.make_node( "Concat", input_ids, [get_tensor_id(y)], f"concat{ilayer}", axis=dim ) ) ''' def append_initializer(value, name): """ Improved initializer handling with better data type support """ # Handle different tensor types if isinstance(value, torch.Tensor): np_value = value.detach().cpu().numpy() else: np_value = np.array(value) # Determine appropriate data type if np_value.dtype == np.float16 or str(value.dtype) == 'torch.float16': data_type = helper.TensorProto.DataType.FLOAT16 np_value = np_value.astype(np.float16) elif np_value.dtype == np.float32 or str(value.dtype) == 'torch.float32': data_type = helper.TensorProto.DataType.FLOAT np_value = np_value.astype(np.float32) elif np_value.dtype == np.int32 or str(value.dtype) == 'torch.int32': data_type = helper.TensorProto.DataType.INT32 np_value = np_value.astype(np.int32) elif np_value.dtype == np.int64 or str(value.dtype) == 'torch.int64': data_type = helper.TensorProto.DataType.INT64 np_value = np_value.astype(np.int64) else: # Default to float16 for nuScenes models data_type = helper.TensorProto.DataType.FLOAT16 np_value = np_value.astype(np.float16) initializers.append( helper.make_tensor( name=name, data_type=data_type, dims=list(np_value.shape), vals=np_value.tobytes(), raw=True ) ) return name ''' def append_initializer(value, name): initializers.append( helper.make_tensor( name=name, data_type=helper.TensorProto.DataType.FLOAT16, dims=list(value.shape), vals=value.cpu().data.numpy().astype(np.float16).tobytes(), raw=True ) ) return name def __obj_to_id(obj): idd = id(obj) if isinstance(obj, spconv.SparseConvTensor): idd = id(obj.features) return idd def set_obj_idd_assame(a_already_has_idd, b_no_idd): global obj_to_tensor_id aidd = __obj_to_id(a_already_has_idd) bidd = __obj_to_id(b_no_idd) assert aidd in obj_to_tensor_id, "A is not in tensor map" assert bidd not in obj_to_tensor_id, "B is already in tensor map" obj_to_tensor_id[bidd] = obj_to_tensor_id[aidd] def register_tensor(obj): global obj_to_tensor_id obj_to_tensor_id[__obj_to_id(obj)] = str(len(obj_to_tensor_id)) #''' def get_tensor_id(obj): idd = __obj_to_id(obj) assert idd in obj_to_tensor_id, f"ops!!!😮 Cannot find the tensorid of this object. this means that some operators are not being traced. You need to confirm it. Object type: {type(obj)}" return obj_to_tensor_id[idd] #''' ''' def get_tensor_id(obj): """获取张量ID,带调试信息""" idd = __obj_to_id(obj) if idd not in obj_to_tensor_id: print(f"❌ ERROR: 未找到张量ID的对象信息:") print(f" 对象类型: {type(obj)}") print(f" 对象ID: {idd}") print(f" 是否有features属性: {hasattr(obj, 'features')}") print(f" 是否有shape属性: {hasattr(obj, 'shape')}") if hasattr(obj, 'features'): print(f" features类型: {type(obj.features)}") print(f" features shape: {obj.features.shape}") if hasattr(obj, 'shape'): print(f" 直接shape: {obj.shape}") print(f" 当前已注册的张量ID数量: {len(obj_to_tensor_id)}") print(f" 已注册的前5个ID: {list(obj_to_tensor_id.keys())[:5]}") # 尝试自动注册 print(f" 尝试自动注册...") register_tensor(obj) if idd in obj_to_tensor_id: print(f" ✅ 自动注册成功,ID: {obj_to_tensor_id[idd]}") return obj_to_tensor_id[idd] else: print(f" ❌ 自动注册失败") raise AssertionError(f"无法为对象分配张量ID,类型: {type(obj)}") return obj_to_tensor_id[idd] ''' def create_input_info(voxels, name="input_voxels"): """ Create input value info with proper type and shape """ return helper.make_value_info( name=name, type_proto=helper.make_tensor_type_proto( elem_type=helper.TensorProto.DataType.FLOAT16 if voxels.dtype == torch.float16 else helper.TensorProto.DataType.FLOAT, shape=list(voxels.size()) ) ) def create_output_info(output_tensor, name): """ Create output value info with proper type and shape """ return helper.make_value_info( name=name, type_proto=helper.make_tensor_type_proto( elem_type=helper.TensorProto.DataType.FLOAT16 if output_tensor.dtype == torch.float16 else helper.TensorProto.DataType.FLOAT, shape=list(output_tensor.size()) ) ) def export_onnx(model, voxels, coors, batch_size, spatial_shape, args): """ ONNX export function for different datasets and backbones. Args: model: The PyTorch model to export voxels: Voxel features tensor [N, C] coors: Voxel coordinates tensor [N, 4] (batch_idx, z, y, x) batch_size: Batch size spatial_shape: Spatial shape tuple args: args parameters: dataset_type: 'nuscenes', 'kitti', 'waymo' backbone_type: 'VoxelResBackBone8x', 'VoxelBackBone8x', 'SpMiddleResNetFHD' """ global avoid_reuse_container, obj_to_tensor_id, nodes, initializers, enable_trace # Clear global state avoid_reuse_container = [] obj_to_tensor_id = {} nodes = [] initializers = [] #collect_all_module_types(model) #print(f'types of modules is:\n{collect_module_types(model)}') # 添加输入信息 print(f"Input voxels shape: {voxels.shape}") print(f"Input coors shape: {coors.shape}") print(f"Input coors range: [{coors.min().item()}, {coors.max().item()}]") print(f"Batch size: {batch_size}") print(f"Spatial shape: {spatial_shape}") # 检查坐标是否在有效范围内 valid_coors = (coors[:, 1:] < torch.tensor(spatial_shape, device=coors.device)).all(dim=1) print(f"Valid coors ratio: {valid_coors.float().mean().item():.4f}") with torch.no_grad(): register_tensor(voxels) enable_trace = True # Dataset-specific configurations if args.dataset_type == 'nuscenes': # nuScenes: batch_dict format with specific keys batch_dict = { 'voxel_features': voxels, 'voxel_coords': coors, 'batch_size': batch_size, } y = model(batch_dict) elif args.dataset_type == 'kitti': # KITTI: batch_dict format with different spatial shape batch_dict = { 'voxel_features': voxels, 'voxel_coords': coors, 'batch_size': batch_size, } y = model(batch_dict) elif args.dataset_type == 'waymo': # Waymo: batch_dict format with waymo-specific keys batch_dict = { 'voxel_features': voxels, 'voxel_coords': coors, 'voxel_num_points': torch.ones(voxels.shape[0], dtype=torch.int32, device=voxels.device), 'batch_size': batch_size, 'spatial_shape': spatial_shape, # typically larger than KITTI 'use_lead_xyz': True } y = model(batch_dict) else: raise ValueError(f"Unsupported dataset type: {args.dataset_type}") # Backbone-specific output handling if args.backbone_type in ['VoxelResBackBone8x', 'VoxelBackBone8x']: # These backbones typically return dict with spatial_features if isinstance(y, dict): if 'encoded_spconv_tensor' in y: y = y['encoded_spconv_tensor'] elif 'multi_scale_3d_features' in y: y = y['multi_scale_3d_features'] else: # Take first tensor value y = next(v for v in y.values() if isinstance(v, torch.Tensor)) if hasattr(y, 'dense'): y_dense = y.dense() if verify_sparse_to_dense_conversion(y, y_dense): print("Scatter operation verified successfully") else: print("WARNING: Scatter operation verification failed") # height compression N, C, D, H, W = y_dense.shape y_bev = y_dense.view(N, C * D, H, W) elif args.backbone_type == 'SECOND': # SECOND backbone may return SparseConvTensor if hasattr(y, 'dense'): y = y.dense() elif isinstance(y, dict) and 'out' in y: y = y['out'] if hasattr(y, 'dense'): y = y.dense() elif args.backbone_type == 'SpMiddleResNetFHD': # this backbone has height compression in forward # SpMiddleResNetFHD specific handling if isinstance(y, dict): # Common keys for SpMiddleResNetFHD if 'encoded_spconv_tensor' in y: y = y['encoded_spconv_tensor'] elif 'backbone_features' in y: y = y['backbone_features'] elif 'out' in y: y = y['out'] # If it's a SparseConvTensor, convert to dense if hasattr(y, 'dense'): y = y.dense() else: # Take first tensor value y = next(v for v in y.values() if isinstance(v, torch.Tensor)) # Handle SparseConvTensor output directly elif hasattr(y, 'dense'): y = y.dense() else: # Generic handling for other backbones if isinstance(y, dict): y = next(v for v in y.values() if isinstance(v, torch.Tensor)) elif hasattr(y, 'dense'): y = y.dense() enable_trace = False ckpt_path = Path(args.ckpt) export_dir = ckpt_path.parent args.save_onnx = os.path.join(export_dir, 'centerpoint_scn.onnx') # Save tensors for verification if requested if args.save_tensor is not None: print("> Saving tensors for verification...") funcs.save_tensor(voxels, export_dir/ f"{args.save_tensor}.voxels") funcs.save_tensor(coors, export_dir / f"{args.save_tensor}.coors") funcs.save_tensor(y_bev, export_dir / f"{args.save_tensor}.output") funcs.save_tensor([batch_size] + list(spatial_shape), export_dir / f"{args.save_tensor}.info") # Create ONNX graph,test ok #inputs = [create_input_info(voxels, "0")] #outputs = [create_output_info(y_bev, get_tensor_id(y_bev))] inputs = [ helper.make_value_info( name="0", type_proto=helper.make_tensor_type_proto( elem_type=helper.TensorProto.DataType.FLOAT16, shape=voxels.size() ) ) ] outputs = [ helper.make_value_info( name=get_tensor_id(y_bev), type_proto=helper.make_tensor_type_proto( elem_type=helper.TensorProto.DataType.FLOAT16, shape=y_bev.size() ) ) ] graph = helper.make_graph( name=f"{args.dataset_type}_{args.backbone_type}", inputs=inputs, outputs=outputs, nodes=nodes, initializer=initializers ) # Create model model_proto = helper.make_model( graph, opset_imports=[helper.make_operatorsetid("ai.onnx", 11)], producer_name=f"{args.dataset_type}_export", producer_version="1.0" ) # Add metadata model_proto.metadata_props.extend([ onnx.StringStringEntryProto(key="dataset_type", value=args.dataset_type), onnx.StringStringEntryProto(key="backbone_type", value=args.backbone_type), onnx.StringStringEntryProto(key="spatial_shape", value=str(spatial_shape)), onnx.StringStringEntryProto(key="input_channels", value=str(voxels.shape[1])) ]) # Validate and save try: onnx.checker.check_model(model_proto) print("✅ ONNX model validation passed") except Exception as e: print(f"⚠️ ONNX model validation warning: {e}") onnx.save_model(model_proto, args.save_onnx) # Clean up avoid_reuse_container = [] obj_to_tensor_id = {} nodes = [] initializers = [] def verify_sparse_to_dense_conversion(sparse_tensor, dense_tensor): """验证稀疏到稠密的转换是否正确""" # 手动实现稀疏到稠密的转换 manual_dense = torch.zeros(dense_tensor.shape, device=dense_tensor.device, dtype=dense_tensor.dtype) if hasattr(sparse_tensor, 'indices') and hasattr(sparse_tensor, 'features'): indices = sparse_tensor.indices features = sparse_tensor.features # 将稀疏特征填充到稠密张量中 for i in range(indices.shape[0]): b, z, y, x = indices[i] if b < manual_dense.shape[0] and z < manual_dense.shape[2] and \ y < manual_dense.shape[3] and x < manual_dense.shape[4]: manual_dense[b, :, z, y, x] = features[i] # 比较结果 diff = torch.abs(manual_dense - dense_tensor).sum() print(f"Scatter verification - Total difference: {diff.item()}") if diff > 1e-3: print("WARNING: Manual scatter differs from PyTorch scatter!") print(f"Manual dense range: [{manual_dense.min().item():.6f}, {manual_dense.max().item():.6f}]") print(f"PyTorch dense range: [{dense_tensor.min().item():.6f}, {dense_tensor.max().item():.6f}]") return diff < 1e-3