BenchmarkRunner Class — pytorch Architecture
Architecture documentation for the BenchmarkRunner class in common.py from the pytorch codebase.
Entity Profile
Relationship Graph
Source Code
benchmarks/dynamo/common.py lines 1781–3102
class BenchmarkRunner:
def __init__(self):
self.model_iter_fn = None
self.grad_scaler = DummyGradScaler()
self.autocast = contextlib.nullcontext
self.autocast_arg = {}
self.optimizer: Optional[torch.optim.Optimizer] = None
self._args = None
def setup_amp(self, current_device=None):
if self.args.only in self.fp32_only_models:
return
devices = [current_device] if current_device else self.args.devices
if self.args.amp:
# AMP training can lead to small loss values which can underflow
# gradient values returning in zero gradients. To solve this
# problem, PyTorch introduces GradScaler. GradScaler is a stateful
# structure, that scales the loss values to prevent underflow. Loss
# values are big at the beginning of training (therefore not
# requiring scaling), while loss value tends to be small as network
# starts getting better (requiring scaling). GradScaler manages all
# of this fine tuning, checking the gradients are turning to inf,
# discarding such batches.
# Since we are not running a long iteration, default value of
# init_scale 65536 is going to turn all gradients to inf. Therefore,
# we just use a init_scale of 2.0 for benchmarking purpose.
# Disabling Gradscaler because
# 1) Benchmark setup runs 2 iterations of fwd-bwd. So, not useful.
# 2) Current setup shares grad_scaler for eager and dynamo model,
# which is bad as Gradscaler has state and can adjust the scaling
# factor between eager and dynamo run, making accuracy check
# harder.
# self.grad_scaler = torch.amp.GradScaler(device="cuda", init_scale=2.0)
self.autocast = functools.partial(
torch.amp.autocast, device_type=devices[0]
)
if self.args.amp_dtype is None:
if self.args.only in self.amp_dtype_bfloat16:
self.autocast_arg["dtype"] = torch.bfloat16
else:
amp_dtype = (
torch.float16
if self.args.amp_dtype == "float16"
else torch.bfloat16
)
self.autocast_arg["dtype"] = amp_dtype
def init_optimizer(self, name, device, params):
if device == "cuda" and self.args.training and name not in CI_SKIP_OPTIMIZER:
if (name in CI_USE_SGD and self.args.ci) or name in BENCHMARK_USE_SGD:
self.optimizer = torch.optim.SGD(params, lr=0.01, foreach=True)
# Disable multi_tensor_sgd for benchmarking, there isn't a large performance benefit (~1%) to compiling
# this optimizer because it is a single foreach add, and increases compile time.
# After autotuning and fake tensor caching lands, we can enable, because the compile time impact will be lower.
# Fake Tensor caching: https://github.com/pytorch/pytorch/pull/113873
# Autotuning: https://github.com/pytorch/pytorch/issues/117447
self.optimizer.step = torch._dynamo.disable(self.optimizer.step)
else:
self.optimizer = torch.optim.Adam(
params, lr=0.01, capturable=True, foreach=True
)
else:
self.optimizer = None
@property
def args(self):
return self._args
@args.setter
def args(self, args):
self._args = args
@property
def skip_models(self):
return set()
@property
def skip_models_for_cuda(self):
return set()
@property
def skip_models_for_xpu(self):
return set()
@property
def skip_models_for_cpu(self):
return set()
@property
def skip_models_for_cpu_aarch64(self):
return set()
@property
def skip_models_for_freezing_cpu(self):
return set()
@property
def skip_models_for_freezing_cuda(self):
return set()
@property
def slow_models(self):
return set()
@property
def very_slow_models(self):
return set()
@property
def non_deterministic_models(self):
return set()
@property
def fp32_only_models(self):
return set()
@property
def force_amp_for_fp16_bf16_models(self):
return set()
@property
def force_fp16_for_bf16_models(self):
return set()
@property
def amp_dtype_bfloat16(self):
return set()
@property
def skip_not_suitable_for_training_models(self):
return set()
@property
def failing_torchinductor_models(self):
return set()
@property
def failing_fx2trt_models(self):
return set()
@property
def skip_accuracy_checks_large_models_dashboard(self):
return set()
@property
def skip_accuracy_check_as_eager_non_deterministic(self):
return set()
@property
def skip_multiprocess_models(self):
return set()
@property
def skip_models_due_to_control_flow(self):
return set()
@property
def skip_models_due_to_export_not_supported(self):
return set()
@property
def disable_cudagraph_models(self):
return set()
@property
def guard_on_nn_module_models(self):
return set()
@property
def inline_inbuilt_nn_modules_models(self):
return set()
def get_tolerance_and_cosine_flag(self, is_training, current_device, name):
raise NotImplementedError
@property
def equal_nan(self):
equal_nan = True
if self.args.float32:
equal_nan = False
return equal_nan
def use_larger_multiplier_for_smaller_tensor(self, name):
return False
def use_iou_for_bool_accuracy(self, name):
return False
def get_iou_threshold(self, name):
return 0.99
def get_accuracy_check_runs(self, name):
return 1
def iter_models(self, args):
for model_name in self.iter_model_names(args):
for device in args.devices:
try:
yield self.load_model(
device,
model_name,
batch_size=args.batch_size,
)
except NotImplementedError:
continue # bad benchmark implementation
def deepcopy_model(self, model):
return copy.deepcopy(model)
def cast_based_on_args(self, model, example_inputs):
if self.args.float32 or self.args.only in self.fp32_only_models:
if not self.args.float32:
log.warning("Model %s supports float32 only", self.args.only)
model, example_inputs = cast_to_fp32(model, example_inputs)
elif self.args.float16:
if self.args.only in self.force_amp_for_fp16_bf16_models:
log.warning(
"Model %s does not support float16, running with amp instead",
self.args.only,
)
self.args.amp = True
self.setup_amp()
else:
model, example_inputs = cast_to_fp16(model, example_inputs)
elif self.args.bfloat16:
if self.args.only in self.force_amp_for_fp16_bf16_models:
log.warning(
"Model %s does not support bfloat16, running with amp instead",
self.args.only,
)
self.args.amp = True
self.setup_amp()
elif self.args.only in self.force_fp16_for_bf16_models:
log.warning(
"Model %s does not support bfloat16, running with float16 instead",
self.args.only,
)
model, example_inputs = cast_to_fp16(model, example_inputs)
else:
model, example_inputs = cast_to_bf16(model, example_inputs)
return model, example_inputs
def validate_model(self, model, example_inputs):
"""
Runs the eager model with example inputs to ensure that eager passes.
"""
model = self.deepcopy_model(model)
example_inputs = clone_inputs(example_inputs)
model, example_inputs = self.cast_based_on_args(model, example_inputs)
try:
self.model_iter_fn(model, example_inputs)
except Exception as e:
raise RuntimeError("Eager run failed") from e
def maybe_cast(self, model, example_inputs):
model, example_inputs = self.cast_based_on_args(model, example_inputs)
return model, example_inputs
def decay_batch_exp(self, batch_size, factor=0.5, divisor=2):
out_batch_size = batch_size * factor
if out_batch_size > divisor:
out_batch_size = (out_batch_size + 1) // divisor * divisor
else:
out_batch_size = batch_size - 1
return max(0, int(out_batch_size))
def batch_size_finder(self, device, model_name, initial_batch_size=1024):
batch_size = initial_batch_size
while batch_size >= 1:
empty_gpu_cache(current_device)
try:
device, name, model, example_inputs, _ = self.load_model(
device,
model_name,
batch_size,
)
self.model_iter_fn(model, example_inputs)
return batch_size
except RuntimeError as e:
error_str = str(e)
if "channels_last" in error_str:
break
batch_size = self.decay_batch_exp(batch_size)
return 1
def run_n_iterations(self, mod, inputs, model_iter_fn):
n = self.args.iterations
for _ in range(n - 1):
model_iter_fn(mod, inputs, collect_outputs=False)
return model_iter_fn(mod, inputs, collect_outputs=True)
@torch._disable_dynamo(recursive=True)
def optimizer_zero_grad(self, mod):
if self.optimizer is not None:
self.optimizer.zero_grad(True)
else:
mod.zero_grad(True)
def optimizer_step(self):
if self.optimizer is not None:
self.optimizer.step()
def get_benchmark_indices(self, length):
start = self._args.partition_id * (length // self._args.total_partitions)
end = (
(self._args.partition_id + 1) * (length // self._args.total_partitions)
if self._args.partition_id < self._args.total_partitions - 1
else length
)
return start, end
def get_fsdp_auto_wrap_policy(self, model_name: str):
from diffusers.models.transformer_2d import Transformer2DModel
from torchbenchmark.models.nanogpt.model import Block
from transformers.models.llama.modeling_llama import LlamaDecoderLayer
from torch.distributed.fsdp.wrap import (
ModuleWrapPolicy,
size_based_auto_wrap_policy,
)
# handcrafted wrap policy
MODEL_FSDP_WRAP = {
"stable_diffusion_unet": (Transformer2DModel,),
"llama_v2_7b_16h": (LlamaDecoderLayer,),
"nanogpt": (Block,),
}
if model_name not in MODEL_FSDP_WRAP:
# default to using wrap policy based on module size
return functools.partial(
size_based_auto_wrap_policy, recurse=True, min_num_params=int(1e5)
)
return ModuleWrapPolicy(MODEL_FSDP_WRAP[model_name])
def deepcopy_and_maybe_parallelize(self, model):
model = self.deepcopy_model(model)
if self.args.ddp:
if not torch.distributed.is_available():
raise AssertionError(
"Can't use DDP without a distributed enabled build"
)
from torch.nn.parallel import DistributedDataParallel as DDP
model = DDP(model, find_unused_parameters=True)
elif self.args.fsdp:
if not torch.distributed.is_available():
raise AssertionError(
"Can't use FSDP without a distributed enabled build"
)
from torch.distributed.fsdp import (
FullyShardedDataParallel as FSDP,
MixedPrecision,
)
if self.args.float16:
dtype = torch.float16
elif self.args.bfloat16:
dtype = torch.bfloat16
else:
dtype = torch.float32
mp_policy = MixedPrecision(
param_dtype=dtype,
# Gradient communication precision.
reduce_dtype=dtype,
# Buffer precision.
buffer_dtype=dtype,
)
model = FSDP(
model,
use_orig_params=True,
device_id=torch.cuda.current_device()
if self.args.devices[-1] == "cuda"
else None,
mixed_precision=mp_policy,
limit_all_gathers=True,
auto_wrap_policy=self.get_fsdp_auto_wrap_policy(self.args.only),
)
return model
def check_accuracy(
self, name, model, example_inputs, optimize_ctx, experiment, tag
):
"""
Checks accuracy.
1) Collect the outputs with fp64 datatype. This is useful for error checking.
2) Checks if eager itself has variations.
"""
start_stats = get_dynamo_stats()
def record_status(accuracy_status, dynamo_start_stats):
"""
Records the status in the csv file
"""
if current_name in self.non_deterministic_models:
if accuracy_status in (
"pass",
"eager_two_runs_differ",
"fail_accuracy",
):
accuracy_status = "pass"
headers = ["dev", "name", "batch_size", "accuracy"]
fields = [current_device, current_name, current_batch_size, accuracy_status]
if tag is not None:
headers.insert(3, "tag")
fields.insert(3, tag)
o_headers = list(headers)
o_fields = list(fields)
dynamo_stats = get_dynamo_stats()
dynamo_stats.subtract(dynamo_start_stats)
for k, v in dynamo_stats.items():
headers.append(k)
fields.append(v)
total_wall_time = output_signpost(
dict(zip(o_headers, o_fields)),
self.args,
self.suite_name,
)
headers.append("compilation_latency")
fields.append(total_wall_time)
write_outputs(output_filename, headers, fields)
if self.args.print_compilation_time:
print(f"Compilation time (from dynamo_timed): {total_wall_time}")
return accuracy_status
if name in self.skip_accuracy_checks_large_models_dashboard:
return record_status("pass_due_to_skip", dynamo_start_stats=start_stats)
# Skip all accuracy check for the torchao backend
if self.args.backend == "torchao":
return record_status("pass_due_to_skip", dynamo_start_stats=start_stats)
with self.pick_grad(name, self.args.training):
# Collect the fp64 reference outputs to be used later for accuracy checking.
fp64_outputs = None
model_fp64 = None
inputs_fp64 = None
try:
model_fp64, inputs_fp64 = cast_to_fp64(
self.deepcopy_and_maybe_parallelize(model),
clone_inputs(example_inputs),
)
self.init_optimizer(name, current_device, model_fp64.parameters())
fp64_outputs = self.run_n_iterations(
model_fp64, inputs_fp64, self.model_iter_fn
)
fp64_outputs = tree_map(
lambda x: x.to(torch.float64)
if isinstance(x, torch.Tensor) and x.is_floating_point()
else x,
fp64_outputs,
)
except Exception:
log.warning(
"fp64 golden ref were not generated for %s. Setting accuracy check to cosine",
name,
exc_info=True,
)
self.args.cosine = True
fp64_outputs = None
finally:
del model_fp64, inputs_fp64
empty_gpu_cache(current_device)
tolerance, cos_similarity = self.get_tolerance_and_cosine_flag(
self.args.training, current_device, name
)
# Cast the model to float16/float32 as necessary
model, example_inputs = self.maybe_cast(model, example_inputs)
accuracy_status = "pass"
# Get results of native pytorch
reset_rng_state()
model_copy = None
try:
with torch.compiler.set_stance("force_eager"):
model_copy = self.deepcopy_and_maybe_parallelize(model)
self.init_optimizer(name, current_device, model_copy.parameters())
correct_result = self.run_n_iterations(
model_copy, clone_inputs(example_inputs), self.model_iter_fn
)
except Exception as e:
accuracy_status = (
"eager_1st_run_OOM"
if isinstance(e, torch.cuda.OutOfMemoryError)
else "eager_1st_run_fail"
)
log.exception("")
return record_status(accuracy_status, dynamo_start_stats=start_stats)
finally:
del model_copy
empty_gpu_cache(current_device)
# Rerun native pytorch
reset_rng_state()
model_copy = None
try:
with torch.compiler.set_stance("force_eager"):
model_copy = self.deepcopy_and_maybe_parallelize(model)
self.init_optimizer(name, current_device, model_copy.parameters())
correct_rerun_result = self.run_n_iterations(
model_copy, clone_inputs(example_inputs), self.model_iter_fn
)
except Exception as e:
accuracy_status = (
"eager_2nd_run_OOM"
if isinstance(e, torch.cuda.OutOfMemoryError)
else "eager_2nd_run_fail"
)
log.exception("")
return record_status(accuracy_status, dynamo_start_stats=start_stats)
finally:
del model_copy
empty_gpu_cache(current_device)
# Two eager runs should have exactly same result, within tolerance.
# TODO If we want the above to be true, then deterministic should be set.
# For example, MIOpen convolutions could be implemented with non-deterministic algos.
is_same = True
try:
if (
name not in self.skip_accuracy_check_as_eager_non_deterministic
and not same(
correct_result,
correct_rerun_result,
fp64_ref=None,
cos_similarity=False,
tol=tolerance if torch.version.hip else 0,
equal_nan=self.equal_nan,
use_larger_multiplier_for_smaller_tensor=self.use_larger_multiplier_for_smaller_tensor(
name
),
)
):
is_same = False
except Exception:
# Sometimes torch.allclose may throw RuntimeError
is_same = False
if not is_same:
accuracy_status = "eager_two_runs_differ"
return record_status(accuracy_status, dynamo_start_stats=start_stats)
correct_rerun_result = None
# Support multiple accuracy check runs for flaky models
accuracy_check_runs = self.get_accuracy_check_runs(name)
pass_count = 0
for run_idx in range(accuracy_check_runs):
# Run with Dynamo
reset_rng_state()
torch._dynamo.reset()
torch._dynamo.utils.counters.clear()
model_copy = None
run_passed = True
try:
model_copy = self.deepcopy_and_maybe_parallelize(model)
self.init_optimizer(name, current_device, model_copy.parameters())
if (
self.args.export
or self.args.export_aot_inductor
or self.args.export_nativert
or self.args.torchscript_jit_trace
or self.args.aot_precompile
):
# apply export on module directly
# no need for n iterations
# the logic should be the same to self.model_iter_fn (forward_pass)
with self.autocast(**self.autocast_arg):
optimized_model_iter_fn = optimize_ctx(
model_copy, example_inputs
)
new_result = optimized_model_iter_fn(
model_copy, example_inputs
)
else:
optimized_model_iter_fn = optimize_ctx(self.model_iter_fn)
new_result = self.run_n_iterations(
model_copy, example_inputs, optimized_model_iter_fn
)
except Exception as e:
log.exception("")
print(
"TorchDynamo optimized model failed to run because of following error"
)
accuracy_status = (
"OOM"
if isinstance(e, torch.cuda.OutOfMemoryError)
else "fail_to_run"
)
return record_status(
accuracy_status, dynamo_start_stats=start_stats
)
finally:
del model_copy
if name in self.skip_accuracy_check_as_eager_non_deterministic:
return record_status(
"pass_due_to_skip", dynamo_start_stats=start_stats
)
force_max_multiplier = False
if (
self.args.freezing
and self.args.bfloat16
and torch._dynamo.utils.counters["inductor"]["binary_folding_conv"]
> 0
):
force_max_multiplier = True
try:
if self.args.training and self.args.amp:
if process_fn := self.get_output_amp_train_process_func.get(
name, None
):
correct_result = process_fn(correct_result)
new_result = process_fn(new_result)
fp64_outputs = process_fn(fp64_outputs)
if (
self.args.save_model_outputs_to
and self.args.compare_model_outputs_with
and self.args.save_model_outputs_to
== self.args.compare_model_outputs_with
):
log.warning(
"args.save_model_outputs_to and args.compare_model_outputs_with points to the same path."
"Result will be undefined."
)
if self.args.save_model_outputs_to:
print(
f"Save model outputs to: {self.args.save_model_outputs_to}"
)
torch.save(new_result, self.args.save_model_outputs_to)
if self.args.compare_model_outputs_with:
print(
f"Load model outputs from {self.args.compare_model_outputs_with} to compare"
)
saved_result = torch.load(
self.args.compare_model_outputs_with, weights_only=False
)
is_bitwise_same = bitwise_same(saved_result, new_result)
if not is_bitwise_same:
print(
"The result is not bitwise equivalent to the previously saved result"
)
return record_status(
"not_bitwise_equivalent",
dynamo_start_stats=start_stats,
)
print(
"The result is bitwise equivalent to the previously saved result"
)
del saved_result
if not same(
correct_result,
new_result,
fp64_outputs,
equal_nan=self.equal_nan,
use_larger_multiplier_for_smaller_tensor=self.use_larger_multiplier_for_smaller_tensor(
name
),
cos_similarity=cos_similarity,
tol=tolerance,
force_max_multiplier=force_max_multiplier,
use_iou_for_bool=self.use_iou_for_bool_accuracy(name),
iou_threshold=self.get_iou_threshold(name),
):
run_passed = False
except Exception:
# Sometimes torch.allclose may throw RuntimeError
run_passed = False
if run_passed:
pass_count += 1
if accuracy_check_runs > 1:
log.info(
"Accuracy check run %d/%d: %s",
run_idx + 1,
accuracy_check_runs,
"passed" if run_passed else "failed",
)
# Pass if majority of runs pass (more than half)
is_same = pass_count > accuracy_check_runs // 2
if accuracy_check_runs > 1:
log.info(
"Accuracy check summary: %d/%d runs passed, %s",
pass_count,
accuracy_check_runs,
"PASS" if is_same else "FAIL",
)
if not is_same:
if self.args.skip_accuracy_check:
accuracy_status = "pass_due_to_skip"
else:
accuracy_status = "fail_accuracy"
return record_status(accuracy_status, dynamo_start_stats=start_stats)
return record_status(accuracy_status, dynamo_start_stats=start_stats)
def check_tolerance(
self, name, model, example_inputs, optimize_ctx, base_device="cpu"
):
"""
Checks tolerance based on https://pytorch.org/docs/stable/generated/torch.allclose.html.
"""
tolerance_status = "pass"
if name in self.skip_accuracy_checks_large_models_dashboard:
tolerance_status = "pass_due_to_skip"
return tolerance_status
# Cast the model to float16/float32 as necessary
model, example_inputs = self.maybe_cast(model, example_inputs)
with self.pick_grad(name, self.args.training):
# Get results of native pytorch
reset_rng_state()
model_copy = copy.deepcopy(model)
model_copy = model_copy.to(base_device)
example_inputs_copy = copy.deepcopy(example_inputs)
example_inputs_copy = tree_map(
lambda x: x.to(base_device), example_inputs_copy
)
self.init_optimizer(name, base_device, model_copy.parameters())
correct_result = self.run_n_iterations(
model_copy, example_inputs_copy, self.model_iter_fn
)
# Run with Dynamo
# Sometime CI fails with random triton compilation failure which will be skipped for now
# TODO: revisit this after switching to new Triton runtime
reset_rng_state()
torch._dynamo.reset()
try:
self.init_optimizer(name, current_device, model.parameters())
optimized_model_iter_fn = optimize_ctx(self.model_iter_fn)
new_result = self.run_n_iterations(
model_copy, example_inputs, optimized_model_iter_fn
)
except Exception:
log.exception("")
print(
"TorchDynamo optimized model failed to run because of following error"
)
return "fail_to_run"
def dump_max_mean_values(tol, ref, res):
if isinstance(ref, (list, tuple, torch.nn.ParameterList, torch.Size)):
for refi, resi in zip(ref, res):
dump_max_mean_values(tol, refi, resi)
elif isinstance(ref, dict):
for k in ref:
dump_max_mean_values(tol, ref[k], res[k])
elif isinstance(ref, torch.Tensor):
res = res.to(base_device)
t = torch.abs(ref - res) / (1 + torch.abs(ref))
tol.append(t.flatten().to(torch.float32))
return tol
tol = []
dump_max_mean_values(tol, correct_result, new_result)
tol = torch.cat(tol)
tol = torch.tensor(tol)
max = torch.max(tol)
mean = torch.mean(tol)
div = torch.std(tol)
headers = ["dev", "name", "batch_size", "max", "mean", "std"]
fields = [
current_device,
current_name,
current_batch_size,
max.item(),
mean.item(),
div.item(),
]
write_outputs(output_filename, headers, fields)
return tolerance_status
def run_performance_test_non_alternate(
self, name, model, example_inputs, optimize_ctx, experiment, tag=None
):
"Run performance test in non-alternately."
if experiment.func is not latency_experiment:
raise AssertionError(
f"Must run with latency_experiment, got {experiment.func}"
)
def warmup(fn, model, example_inputs, mode, niters=10):
gc.collect()
peak_mem = 0
start_stats = get_dynamo_stats()
try:
if current_device == "cuda":
torch.cuda.reset_peak_memory_stats()
empty_gpu_cache(current_device)
elif current_device == "hpu":
torch.hpu.reset_peak_memory_stats()
t0 = time.perf_counter()
for _ in range(niters):
fn(model, example_inputs)
t1 = time.perf_counter()
latency = t1 - t0
if current_device == "cuda":
peak_mem = get_peak_memory()
elif current_device == "hpu":
peak_mem = torch.hpu.max_memory_allocated() / 10**9
elif current_device == "cpu":
total = psutil.virtual_memory().total
percentage = psutil.Process(os.getpid()).memory_percent()
peak_mem = percentage * total / 10**9
except Exception:
log.exception("Backend %s failed in warmup()", mode)
write_csv_when_exception(
self.args, current_name, "warmup_failed", current_device
)
output_signpost({}, self.args, self.suite_name, error="warmup_failed")
return sys.exit(-1)
dynamo_stats = get_dynamo_stats()
dynamo_stats.subtract(start_stats)
return latency, peak_mem, dynamo_stats
# Cast the model to float16/float32 as necessary
model, example_inputs = self.maybe_cast(model, example_inputs)
# Use distributed wrapping as necessary
model = self.deepcopy_and_maybe_parallelize(model)
if not hasattr(model, name):
model.name = name
self.init_optimizer(name, current_device, model.parameters())
# The self.autocast context is needed for the model we export with aot_compile,
# similar to what we do in the check_accuracy function
ctx = (
self.autocast(**self.autocast_arg)
if self.args.export_aot_inductor
else contextlib.nullcontext()
)
with self.pick_grad(name, self.args.training), ctx:
ok, total = Stats.reset_counters()
experiment_kwargs = {}
if tag is not None:
experiment_kwargs["tag"] = tag
results = []
with maybe_snapshot_memory(
self.args.snapshot_memory, f"eager_{self.args.only}"
):
eager_latency, eager_peak_mem, _ = warmup(
self.model_iter_fn, model, example_inputs, "eager"
)
if self.args.use_warm_peak_memory:
_, eager_peak_mem, _ = warmup(
self.model_iter_fn, model, example_inputs, "eager", niters=1
)
baseline_timings = experiment(
self.model_iter_fn,
model,
example_inputs,
mark="expected",
**experiment_kwargs,
)
# reset dynamo
torch._dynamo.reset()
if self.args.export_aot_inductor:
optimized_model_iter_fn = optimize_ctx
else:
optimized_model_iter_fn = optimize_ctx(self.model_iter_fn)
with maybe_snapshot_memory(
self.args.snapshot_memory, f"compiled_{self.args.only}"
):
dynamo_latency, dynamo_peak_mem, dynamo_stats = warmup(
optimized_model_iter_fn, model, example_inputs, "dynamo"
)
if self.args.use_warm_peak_memory:
_, dynamo_peak_mem, _ = warmup(
optimized_model_iter_fn,
model,
example_inputs,
"dynamo",
niters=1,
)
# If we use warm peak memory, the AOT model loading transient memory
# won't be present on the warm measurement. We only have to account for
# it when using cold memory.
elif self.args.export_aot_inductor:
dynamo_peak_mem -= AOTInductorModelCache.get_excess_memory(model)
if self.args.profile_dynamo_cache_lookup:
with torch.profiler.profile(
activities=[torch.profiler.ProfilerActivity.CPU]
) as prof:
warmup(optimized_model_iter_fn, model, example_inputs, "dynamo")
events = list(
filter(
lambda event: "TorchDynamo Cache Lookup" in event.key,
prof.key_averages(),
)
)
dynamo_cache_lookup_latency = events[0].self_cpu_time_total
compilation_time = dynamo_latency - eager_latency
compression_ratio = (
eager_peak_mem / dynamo_peak_mem if dynamo_peak_mem else 0.0
)
if self.args.print_memory:
print(
f"memory: eager: {eager_peak_mem:.2f} GB, "
f"dynamo: {dynamo_peak_mem:.2f} GB, "
f"ratio: {compression_ratio:.2f}"
)
if self.args.print_compilation_time:
print(f"Compilation time: {compilation_time:.2f}")
if experiment.func is speedup_experiment:
experiment_kwargs["compilation_latency"] = compilation_time
experiment_kwargs["compression_ratio"] = compression_ratio
experiment_kwargs["eager_peak_mem"] = eager_peak_mem
experiment_kwargs["dynamo_peak_mem"] = dynamo_peak_mem
experiment_kwargs["dynamo_stats"] = dynamo_stats
if self.args.profile_dynamo_cache_lookup:
experiment_kwargs["cache_lookup_latency"] = (
dynamo_cache_lookup_latency
)
backend_timings = experiment(
self.model_iter_fn,
model,
example_inputs,
mark="expected",
**experiment_kwargs,
)
timings = np.stack((baseline_timings, backend_timings), axis=1)
result_summary = latency_experiment_summary(
self.suite_name, self.args, model, timings, **experiment_kwargs
)
results.append(result_summary)
return " ".join(map(str, results))
def run_performance_test(
self,
name,
model,
example_inputs,
optimize_ctx,
experiment,
tag=None,
batch_size=None,
):
niters = 5
if getattr(self, "hf_llm", False):
# If we're benchmarking an llm, we want to use the generate function
self.model_iter_fn = self.generate
niters = 1
if self.args.xla:
with self.pick_grad(name, self.args.training):
return experiment(
self.model_iter_fn, *self.maybe_cast(model, example_inputs)
)
def warmup(fn, model, example_inputs, mode, niters=5):
gc.collect()
peak_mem = 0
start_stats = get_dynamo_stats()
try:
if current_device == "cuda":
torch.cuda.reset_peak_memory_stats()
empty_gpu_cache(current_device)
elif current_device == "hpu":
torch.hpu.reset_peak_memory_stats()
t0 = time.perf_counter()
for _ in range(niters):
fn(model, example_inputs)
t1 = time.perf_counter()
latency = t1 - t0
if current_device == "cuda":
peak_mem = get_peak_memory()
elif current_device == "hpu":
peak_mem = torch.hpu.max_memory_allocated() / 10**9
elif current_device == "cpu":
total = psutil.virtual_memory().total
percentage = psutil.Process(os.getpid()).memory_percent()
peak_mem = percentage * total / 10**9
except Exception:
log.exception("Backend %s failed in warmup()", mode)
write_csv_when_exception(
self.args, current_name, "warmup_failed", current_device
)
output_signpost({}, self.args, self.suite_name, error="warmup_failed")
return sys.exit(-1)
dynamo_stats = get_dynamo_stats()
dynamo_stats.subtract(start_stats)
return latency, peak_mem, dynamo_stats
# Cast the model to float16/float32 as necessary
model, example_inputs = self.maybe_cast(model, example_inputs)
# Use distributed wrapping as necessary
model = self.deepcopy_and_maybe_parallelize(model)
if not hasattr(model, name):
model.name = name
self.init_optimizer(name, current_device, model.parameters())
# The self.autocast context is needed for the model we export with aot_compile,
# similar to what we do in the check_accuracy function
ctx = (
self.autocast(**self.autocast_arg)
if self.args.export_aot_inductor
else contextlib.nullcontext()
)
with self.pick_grad(name, self.args.training), ctx:
ok, total = Stats.reset_counters()
experiment_kwargs = {}
experiment_kwargs["batch_size"] = batch_size
if tag is not None:
experiment_kwargs["tag"] = tag
results = []
with maybe_snapshot_memory(
self.args.snapshot_memory, f"eager_{self.args.only}"
):
with torch.compiler.set_stance("force_eager"):
eager_latency, eager_peak_mem, _ = warmup(
self.model_iter_fn,
copy.deepcopy(model),
example_inputs,
"eager",
niters=niters,
)
if self.args.use_warm_peak_memory:
_, eager_peak_mem, _ = warmup(
self.model_iter_fn,
copy.deepcopy(model),
example_inputs,
"eager",
niters=1,
)
if (
self.args.export_aot_inductor
or self.args.export_nativert
or self.args.torchscript_jit_trace
or self.args.aot_precompile
):
optimized_model_iter_fn = optimize_ctx
else:
if getattr(self, "hf_llm", False):
# If it's an llm, we want to optimize model.forward, and use
# the generate function
model = optimize_ctx(model)
optimized_model_iter_fn = self.model_iter_fn
else:
optimized_model_iter_fn = optimize_ctx(self.model_iter_fn)
with maybe_snapshot_memory(
self.args.snapshot_memory, f"compiled_{self.args.only}"
):
dynamo_latency, dynamo_peak_mem, dynamo_stats = warmup(
optimized_model_iter_fn, model, example_inputs, "dynamo"
)
if self.args.use_warm_peak_memory:
_, dynamo_peak_mem, _ = warmup(
optimized_model_iter_fn,
model,
example_inputs,
"dynamo",
niters=1,
)
# If we use warm peak memory, the AOT model loading transient memory
# won't be present on the warm measurement. We only have to account for
# it when using cold memory.
elif self.args.export_aot_inductor:
dynamo_peak_mem -= AOTInductorModelCache.get_excess_memory(model)
if self.args.profile_dynamo_cache_lookup:
with torch.profiler.profile(
activities=[torch.profiler.ProfilerActivity.CPU]
) as prof:
warmup(optimized_model_iter_fn, model, example_inputs, "dynamo")
events = list(
filter(
lambda event: "TorchDynamo Cache Lookup" in event.key,
prof.key_averages(),
)
)
dynamo_cache_lookup_latency = events[0].self_cpu_time_total
compilation_time = dynamo_latency - eager_latency
compression_ratio = (
eager_peak_mem / dynamo_peak_mem if dynamo_peak_mem else 0.0
)
if self.args.print_memory:
print(
f"memory: eager: {eager_peak_mem:.2f} GB, "
f"dynamo: {dynamo_peak_mem:.2f} GB, "
f"ratio: {compression_ratio:.2f}"
)
if self.args.print_compilation_time:
print(f"Compilation time: {compilation_time:.2f}")
if experiment.func is speedup_experiment:
experiment_kwargs["compilation_latency"] = compilation_time
experiment_kwargs["compression_ratio"] = compression_ratio
experiment_kwargs["eager_peak_mem"] = eager_peak_mem
experiment_kwargs["dynamo_peak_mem"] = dynamo_peak_mem
experiment_kwargs["dynamo_stats"] = dynamo_stats
if self.args.profile_dynamo_cache_lookup:
experiment_kwargs["cache_lookup_latency"] = (
dynamo_cache_lookup_latency
)
if experiment.func is coverage_experiment:
ok, total = Stats.reset_counters()
results = []
# run with torch._dynamo few times to populate the cache
for _ in range(3):
optimized_model_iter_fn(model, example_inputs)
_, frames_second_pass = Stats.reset_counters() # should be 0
if frames_second_pass > 0:
optimized_model_iter_fn(model, example_inputs)
_, frames_third_pass = Stats.reset_counters() # should be 0
else:
frames_third_pass = 0
results.append(
f"{ok:3}/{total:3} +{frames_third_pass} frames {compilation_time:3.0f}s"
)
experiment_kwargs["hf_llm"] = getattr(self, "hf_llm", False)
results.append(
experiment(
self.model_iter_fn, model, example_inputs, **experiment_kwargs
)
)
return " ".join(map(str, results))
def minify_model(
self,
name,
model,
example_inputs,
optimize_ctx,
experiment,
tag,
):
log.info("Minifying %s...", name)
os.environ["TORCH_COMPILE_DEBUG"] = "1"
os.environ["TORCHDYNAMO_REPRO_AFTER"] = "dynamo"
os.environ["TORCHDYNAMO_REPRO_LEVEL"] = "4"
self.check_accuracy(name, model, example_inputs, optimize_ctx, experiment, tag)
if self.args.output_directory:
repro_dir = self.args.output_directory
else:
repro_dir = torch._dynamo.config.base_dir
try:
shutil.move("repro.py", f"{repro_dir}/{name}_repro.py")
except OSError:
log.error("Could not find repro script for model %s", name)
else:
log.info(
"Repro script for model %s with minified graph saved to %s",
name,
repro_dir,
)
def maybe_preserve_compile_debug(self, name, status):
if (
name in CI_PRESERVE_COMPILE_DEBUG
and status in CI_PRESERVE_COMPILE_DEBUG[name]
):
src_dir = torch._dynamo.utils.get_debug_dir()
if os.path.isdir(src_dir):
dbg_dir = os.path.join(
os.getcwd(), "test", "debug", "torch_compile_debug"
)
dst_dir = os.path.join(dbg_dir, os.path.basename(src_dir))
try:
os.makedirs(dbg_dir, exist_ok=True)
os.rename(src_dir, dst_dir)
log.warning("Moved %s to %s", src_dir, dst_dir)
except OSError:
log.exception("Failed to preserve %s", src_dir)
def run_one_model(
self,
name,
model,
example_inputs,
optimize_ctx,
experiment,
explain=False,
tag=None,
batch_size=None,
):
mode = "train" if self.args.training else "eval"
msg = f"{current_device:4} {mode:5} {current_name:34} "
if tag:
msg += f" {tag:26}"
print(msg, flush=True)
start_stats = get_dynamo_stats()
if self.args.accuracy:
status = self.check_accuracy(
name, model, example_inputs, optimize_ctx, experiment, tag
)
print(status)
if status == "fail_accuracy" and self.args.minify:
self.minify_model(
name, model, example_inputs, optimize_ctx, experiment, tag
)
elif self.args.tolerance:
status = self.check_tolerance(name, model, example_inputs, optimize_ctx)
print(status)
elif self.args.performance:
if self.args.backend in ["torchao", "optimus"]:
status = self.run_performance_test_non_alternate(
name, model, example_inputs, optimize_ctx, experiment, tag
)
else:
status = self.run_performance_test(
name,
model,
example_inputs,
optimize_ctx,
experiment,
tag,
batch_size=batch_size,
)
print(status)
empty_gpu_cache(current_device)
self.maybe_preserve_compile_debug(name, status)
if self.args.timing:
from torch._dynamo.utils import op_count, print_time_report
from torch.utils._stats import simple_call_counter
print_time_report()
stats = "STATS: "
stats = stats + " | ".join(
itertools.chain(
[f"call_* op count: {op_count}"],
(f"{key}:{value}" for key, value in simple_call_counter.items()),
)
)
print(stats)
stats = get_dynamo_stats()
stats.subtract(start_stats)
if explain:
print(
f"Dynamo produced {stats['unique_graphs']} graphs "
f"covering {stats['calls_captured']} ops with "
f"{stats['graph_breaks']} graph breaks ({stats['unique_graph_breaks']} unique)"
)
if explain or self.args.log_graph_breaks or self.args.print_graph_breaks:
filename = f"{output_filename.rstrip('.csv')}_graph_breaks.csv"
def add_double_quotes(x):
# Delimiter because reason could have comma
return f'"{x}"'
for graph_break in graph_break_reasons:
reason = add_double_quotes(graph_break.reason)
user_stack = add_double_quotes(
", ".join([str(x) for x in graph_break.user_stack])
)
# NB: Don't upload them to the benchmark database as they are debugging
# information. There are also around a million records a day which is
# wasteful to store
write_outputs(
filename,
["model", "reason", "user_stack"],
[current_name, reason, user_stack],
False,
)
if self.args.stats:
Stats.print_summary()
Domain
Source
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free