Home / Class/ BenchmarkRunner Class — pytorch Architecture

BenchmarkRunner Class — pytorch Architecture

Architecture documentation for the BenchmarkRunner class in common.py from the pytorch codebase.

Entity Profile

Relationship Graph

Source Code

benchmarks/dynamo/common.py lines 1781–3102

class BenchmarkRunner:
    def __init__(self):
        self.model_iter_fn = None
        self.grad_scaler = DummyGradScaler()
        self.autocast = contextlib.nullcontext
        self.autocast_arg = {}
        self.optimizer: Optional[torch.optim.Optimizer] = None
        self._args = None

    def setup_amp(self, current_device=None):
        if self.args.only in self.fp32_only_models:
            return

        devices = [current_device] if current_device else self.args.devices
        if self.args.amp:
            # AMP training can lead to small loss values which can underflow
            # gradient values returning in zero gradients. To solve this
            # problem, PyTorch introduces GradScaler. GradScaler is a stateful
            # structure, that scales the loss values to prevent underflow. Loss
            # values are big at the beginning of training (therefore not
            # requiring scaling), while loss value tends to be small as network
            # starts getting better (requiring scaling). GradScaler manages all
            # of this fine tuning, checking the gradients are turning to inf,
            # discarding such batches.

            # Since we are not running a long iteration, default value of
            # init_scale 65536 is going to turn all gradients to inf. Therefore,
            # we just use a init_scale of 2.0 for benchmarking purpose.

            # Disabling Gradscaler because
            #  1) Benchmark setup runs 2 iterations of fwd-bwd. So, not useful.
            #  2) Current setup shares grad_scaler for eager and dynamo model,
            #  which is bad as Gradscaler has state and can adjust the scaling
            #  factor between eager and dynamo run, making accuracy check
            #  harder.
            # self.grad_scaler = torch.amp.GradScaler(device="cuda", init_scale=2.0)
            self.autocast = functools.partial(
                torch.amp.autocast, device_type=devices[0]
            )
            if self.args.amp_dtype is None:
                if self.args.only in self.amp_dtype_bfloat16:
                    self.autocast_arg["dtype"] = torch.bfloat16
            else:
                amp_dtype = (
                    torch.float16
                    if self.args.amp_dtype == "float16"
                    else torch.bfloat16
                )
                self.autocast_arg["dtype"] = amp_dtype

    def init_optimizer(self, name, device, params):
        if device == "cuda" and self.args.training and name not in CI_SKIP_OPTIMIZER:
            if (name in CI_USE_SGD and self.args.ci) or name in BENCHMARK_USE_SGD:
                self.optimizer = torch.optim.SGD(params, lr=0.01, foreach=True)
                # Disable multi_tensor_sgd for benchmarking, there isn't a large performance benefit (~1%) to compiling
                # this optimizer because it is a single foreach add, and increases compile time.
                # After autotuning and fake tensor caching lands, we can enable, because the compile time impact will be lower.
                # Fake Tensor caching: https://github.com/pytorch/pytorch/pull/113873
                # Autotuning: https://github.com/pytorch/pytorch/issues/117447
                self.optimizer.step = torch._dynamo.disable(self.optimizer.step)
            else:
                self.optimizer = torch.optim.Adam(
                    params, lr=0.01, capturable=True, foreach=True
                )
        else:
            self.optimizer = None

    @property
    def args(self):
        return self._args

    @args.setter
    def args(self, args):
        self._args = args

    @property
    def skip_models(self):
        return set()

    @property
    def skip_models_for_cuda(self):
        return set()

    @property
    def skip_models_for_xpu(self):
        return set()

    @property
    def skip_models_for_cpu(self):
        return set()

    @property
    def skip_models_for_cpu_aarch64(self):
        return set()

    @property
    def skip_models_for_freezing_cpu(self):
        return set()

    @property
    def skip_models_for_freezing_cuda(self):
        return set()

    @property
    def slow_models(self):
        return set()

    @property
    def very_slow_models(self):
        return set()

    @property
    def non_deterministic_models(self):
        return set()

    @property
    def fp32_only_models(self):
        return set()

    @property
    def force_amp_for_fp16_bf16_models(self):
        return set()

    @property
    def force_fp16_for_bf16_models(self):
        return set()

    @property
    def amp_dtype_bfloat16(self):
        return set()

    @property
    def skip_not_suitable_for_training_models(self):
        return set()

    @property
    def failing_torchinductor_models(self):
        return set()

    @property
    def failing_fx2trt_models(self):
        return set()

    @property
    def skip_accuracy_checks_large_models_dashboard(self):
        return set()

    @property
    def skip_accuracy_check_as_eager_non_deterministic(self):
        return set()

    @property
    def skip_multiprocess_models(self):
        return set()

    @property
    def skip_models_due_to_control_flow(self):
        return set()

    @property
    def skip_models_due_to_export_not_supported(self):
        return set()

    @property
    def disable_cudagraph_models(self):
        return set()

    @property
    def guard_on_nn_module_models(self):
        return set()

    @property
    def inline_inbuilt_nn_modules_models(self):
        return set()

    def get_tolerance_and_cosine_flag(self, is_training, current_device, name):
        raise NotImplementedError

    @property
    def equal_nan(self):
        equal_nan = True
        if self.args.float32:
            equal_nan = False
        return equal_nan

    def use_larger_multiplier_for_smaller_tensor(self, name):
        return False

    def use_iou_for_bool_accuracy(self, name):
        return False

    def get_iou_threshold(self, name):
        return 0.99

    def get_accuracy_check_runs(self, name):
        return 1

    def iter_models(self, args):
        for model_name in self.iter_model_names(args):
            for device in args.devices:
                try:
                    yield self.load_model(
                        device,
                        model_name,
                        batch_size=args.batch_size,
                    )
                except NotImplementedError:
                    continue  # bad benchmark implementation

    def deepcopy_model(self, model):
        return copy.deepcopy(model)

    def cast_based_on_args(self, model, example_inputs):
        if self.args.float32 or self.args.only in self.fp32_only_models:
            if not self.args.float32:
                log.warning("Model %s supports float32 only", self.args.only)
            model, example_inputs = cast_to_fp32(model, example_inputs)
        elif self.args.float16:
            if self.args.only in self.force_amp_for_fp16_bf16_models:
                log.warning(
                    "Model %s does not support float16, running with amp instead",
                    self.args.only,
                )
                self.args.amp = True
                self.setup_amp()
            else:
                model, example_inputs = cast_to_fp16(model, example_inputs)
        elif self.args.bfloat16:
            if self.args.only in self.force_amp_for_fp16_bf16_models:
                log.warning(
                    "Model %s does not support bfloat16, running with amp instead",
                    self.args.only,
                )
                self.args.amp = True
                self.setup_amp()
            elif self.args.only in self.force_fp16_for_bf16_models:
                log.warning(
                    "Model %s does not support bfloat16, running with float16 instead",
                    self.args.only,
                )
                model, example_inputs = cast_to_fp16(model, example_inputs)
            else:
                model, example_inputs = cast_to_bf16(model, example_inputs)

        return model, example_inputs

    def validate_model(self, model, example_inputs):
        """
        Runs the eager model with example inputs to ensure that eager passes.
        """
        model = self.deepcopy_model(model)
        example_inputs = clone_inputs(example_inputs)
        model, example_inputs = self.cast_based_on_args(model, example_inputs)
        try:
            self.model_iter_fn(model, example_inputs)
        except Exception as e:
            raise RuntimeError("Eager run failed") from e

    def maybe_cast(self, model, example_inputs):
        model, example_inputs = self.cast_based_on_args(model, example_inputs)
        return model, example_inputs

    def decay_batch_exp(self, batch_size, factor=0.5, divisor=2):
        out_batch_size = batch_size * factor
        if out_batch_size > divisor:
            out_batch_size = (out_batch_size + 1) // divisor * divisor
        else:
            out_batch_size = batch_size - 1
        return max(0, int(out_batch_size))

    def batch_size_finder(self, device, model_name, initial_batch_size=1024):
        batch_size = initial_batch_size
        while batch_size >= 1:
            empty_gpu_cache(current_device)
            try:
                device, name, model, example_inputs, _ = self.load_model(
                    device,
                    model_name,
                    batch_size,
                )
                self.model_iter_fn(model, example_inputs)
                return batch_size
            except RuntimeError as e:
                error_str = str(e)
                if "channels_last" in error_str:
                    break
            batch_size = self.decay_batch_exp(batch_size)
        return 1

    def run_n_iterations(self, mod, inputs, model_iter_fn):
        n = self.args.iterations
        for _ in range(n - 1):
            model_iter_fn(mod, inputs, collect_outputs=False)
        return model_iter_fn(mod, inputs, collect_outputs=True)

    @torch._disable_dynamo(recursive=True)
    def optimizer_zero_grad(self, mod):
        if self.optimizer is not None:
            self.optimizer.zero_grad(True)
        else:
            mod.zero_grad(True)

    def optimizer_step(self):
        if self.optimizer is not None:
            self.optimizer.step()

    def get_benchmark_indices(self, length):
        start = self._args.partition_id * (length // self._args.total_partitions)
        end = (
            (self._args.partition_id + 1) * (length // self._args.total_partitions)
            if self._args.partition_id < self._args.total_partitions - 1
            else length
        )
        return start, end

    def get_fsdp_auto_wrap_policy(self, model_name: str):
        from diffusers.models.transformer_2d import Transformer2DModel
        from torchbenchmark.models.nanogpt.model import Block
        from transformers.models.llama.modeling_llama import LlamaDecoderLayer

        from torch.distributed.fsdp.wrap import (
            ModuleWrapPolicy,
            size_based_auto_wrap_policy,
        )

        # handcrafted wrap policy
        MODEL_FSDP_WRAP = {
            "stable_diffusion_unet": (Transformer2DModel,),
            "llama_v2_7b_16h": (LlamaDecoderLayer,),
            "nanogpt": (Block,),
        }

        if model_name not in MODEL_FSDP_WRAP:
            # default to using wrap policy based on module size
            return functools.partial(
                size_based_auto_wrap_policy, recurse=True, min_num_params=int(1e5)
            )

        return ModuleWrapPolicy(MODEL_FSDP_WRAP[model_name])

    def deepcopy_and_maybe_parallelize(self, model):
        model = self.deepcopy_model(model)
        if self.args.ddp:
            if not torch.distributed.is_available():
                raise AssertionError(
                    "Can't use DDP without a distributed enabled build"
                )
            from torch.nn.parallel import DistributedDataParallel as DDP

            model = DDP(model, find_unused_parameters=True)
        elif self.args.fsdp:
            if not torch.distributed.is_available():
                raise AssertionError(
                    "Can't use FSDP without a distributed enabled build"
                )
            from torch.distributed.fsdp import (
                FullyShardedDataParallel as FSDP,
                MixedPrecision,
            )

            if self.args.float16:
                dtype = torch.float16
            elif self.args.bfloat16:
                dtype = torch.bfloat16
            else:
                dtype = torch.float32

            mp_policy = MixedPrecision(
                param_dtype=dtype,
                # Gradient communication precision.
                reduce_dtype=dtype,
                # Buffer precision.
                buffer_dtype=dtype,
            )

            model = FSDP(
                model,
                use_orig_params=True,
                device_id=torch.cuda.current_device()
                if self.args.devices[-1] == "cuda"
                else None,
                mixed_precision=mp_policy,
                limit_all_gathers=True,
                auto_wrap_policy=self.get_fsdp_auto_wrap_policy(self.args.only),
            )
        return model

    def check_accuracy(
        self, name, model, example_inputs, optimize_ctx, experiment, tag
    ):
        """
        Checks accuracy.
        1) Collect the outputs with fp64 datatype. This is useful for error checking.
        2) Checks if eager itself has variations.
        """
        start_stats = get_dynamo_stats()

        def record_status(accuracy_status, dynamo_start_stats):
            """
            Records the status in the csv file
            """
            if current_name in self.non_deterministic_models:
                if accuracy_status in (
                    "pass",
                    "eager_two_runs_differ",
                    "fail_accuracy",
                ):
                    accuracy_status = "pass"

            headers = ["dev", "name", "batch_size", "accuracy"]
            fields = [current_device, current_name, current_batch_size, accuracy_status]

            if tag is not None:
                headers.insert(3, "tag")
                fields.insert(3, tag)

            o_headers = list(headers)
            o_fields = list(fields)

            dynamo_stats = get_dynamo_stats()
            dynamo_stats.subtract(dynamo_start_stats)
            for k, v in dynamo_stats.items():
                headers.append(k)
                fields.append(v)

            total_wall_time = output_signpost(
                dict(zip(o_headers, o_fields)),
                self.args,
                self.suite_name,
            )
            headers.append("compilation_latency")
            fields.append(total_wall_time)
            write_outputs(output_filename, headers, fields)

            if self.args.print_compilation_time:
                print(f"Compilation time (from dynamo_timed): {total_wall_time}")

            return accuracy_status

        if name in self.skip_accuracy_checks_large_models_dashboard:
            return record_status("pass_due_to_skip", dynamo_start_stats=start_stats)

        # Skip all accuracy check for the torchao backend
        if self.args.backend == "torchao":
            return record_status("pass_due_to_skip", dynamo_start_stats=start_stats)

        with self.pick_grad(name, self.args.training):
            # Collect the fp64 reference outputs to be used later for accuracy checking.
            fp64_outputs = None
            model_fp64 = None
            inputs_fp64 = None
            try:
                model_fp64, inputs_fp64 = cast_to_fp64(
                    self.deepcopy_and_maybe_parallelize(model),
                    clone_inputs(example_inputs),
                )
                self.init_optimizer(name, current_device, model_fp64.parameters())
                fp64_outputs = self.run_n_iterations(
                    model_fp64, inputs_fp64, self.model_iter_fn
                )
                fp64_outputs = tree_map(
                    lambda x: x.to(torch.float64)
                    if isinstance(x, torch.Tensor) and x.is_floating_point()
                    else x,
                    fp64_outputs,
                )
            except Exception:
                log.warning(
                    "fp64 golden ref were not generated for %s. Setting accuracy check to cosine",
                    name,
                    exc_info=True,
                )
                self.args.cosine = True
                fp64_outputs = None
            finally:
                del model_fp64, inputs_fp64
                empty_gpu_cache(current_device)

            tolerance, cos_similarity = self.get_tolerance_and_cosine_flag(
                self.args.training, current_device, name
            )

            # Cast the model to float16/float32 as necessary
            model, example_inputs = self.maybe_cast(model, example_inputs)
            accuracy_status = "pass"

            # Get results of native pytorch
            reset_rng_state()
            model_copy = None
            try:
                with torch.compiler.set_stance("force_eager"):
                    model_copy = self.deepcopy_and_maybe_parallelize(model)
                    self.init_optimizer(name, current_device, model_copy.parameters())
                    correct_result = self.run_n_iterations(
                        model_copy, clone_inputs(example_inputs), self.model_iter_fn
                    )
            except Exception as e:
                accuracy_status = (
                    "eager_1st_run_OOM"
                    if isinstance(e, torch.cuda.OutOfMemoryError)
                    else "eager_1st_run_fail"
                )
                log.exception("")
                return record_status(accuracy_status, dynamo_start_stats=start_stats)
            finally:
                del model_copy
                empty_gpu_cache(current_device)

            # Rerun native pytorch
            reset_rng_state()
            model_copy = None
            try:
                with torch.compiler.set_stance("force_eager"):
                    model_copy = self.deepcopy_and_maybe_parallelize(model)
                    self.init_optimizer(name, current_device, model_copy.parameters())
                    correct_rerun_result = self.run_n_iterations(
                        model_copy, clone_inputs(example_inputs), self.model_iter_fn
                    )
            except Exception as e:
                accuracy_status = (
                    "eager_2nd_run_OOM"
                    if isinstance(e, torch.cuda.OutOfMemoryError)
                    else "eager_2nd_run_fail"
                )
                log.exception("")
                return record_status(accuracy_status, dynamo_start_stats=start_stats)
            finally:
                del model_copy
                empty_gpu_cache(current_device)

            # Two eager runs should have exactly same result, within tolerance.
            # TODO If we want the above to be true, then deterministic should be set.
            # For example, MIOpen convolutions could be implemented with non-deterministic algos.
            is_same = True
            try:
                if (
                    name not in self.skip_accuracy_check_as_eager_non_deterministic
                    and not same(
                        correct_result,
                        correct_rerun_result,
                        fp64_ref=None,
                        cos_similarity=False,
                        tol=tolerance if torch.version.hip else 0,
                        equal_nan=self.equal_nan,
                        use_larger_multiplier_for_smaller_tensor=self.use_larger_multiplier_for_smaller_tensor(
                            name
                        ),
                    )
                ):
                    is_same = False
            except Exception:
                # Sometimes torch.allclose may throw RuntimeError
                is_same = False

            if not is_same:
                accuracy_status = "eager_two_runs_differ"
                return record_status(accuracy_status, dynamo_start_stats=start_stats)

            correct_rerun_result = None

            # Support multiple accuracy check runs for flaky models
            accuracy_check_runs = self.get_accuracy_check_runs(name)
            pass_count = 0

            for run_idx in range(accuracy_check_runs):
                # Run with Dynamo
                reset_rng_state()
                torch._dynamo.reset()
                torch._dynamo.utils.counters.clear()
                model_copy = None
                run_passed = True

                try:
                    model_copy = self.deepcopy_and_maybe_parallelize(model)
                    self.init_optimizer(name, current_device, model_copy.parameters())
                    if (
                        self.args.export
                        or self.args.export_aot_inductor
                        or self.args.export_nativert
                        or self.args.torchscript_jit_trace
                        or self.args.aot_precompile
                    ):
                        # apply export on module directly
                        # no need for n iterations
                        # the logic should be the same to self.model_iter_fn (forward_pass)
                        with self.autocast(**self.autocast_arg):
                            optimized_model_iter_fn = optimize_ctx(
                                model_copy, example_inputs
                            )
                            new_result = optimized_model_iter_fn(
                                model_copy, example_inputs
                            )
                    else:
                        optimized_model_iter_fn = optimize_ctx(self.model_iter_fn)
                        new_result = self.run_n_iterations(
                            model_copy, example_inputs, optimized_model_iter_fn
                        )
                except Exception as e:
                    log.exception("")
                    print(
                        "TorchDynamo optimized model failed to run because of following error"
                    )
                    accuracy_status = (
                        "OOM"
                        if isinstance(e, torch.cuda.OutOfMemoryError)
                        else "fail_to_run"
                    )
                    return record_status(
                        accuracy_status, dynamo_start_stats=start_stats
                    )
                finally:
                    del model_copy

                if name in self.skip_accuracy_check_as_eager_non_deterministic:
                    return record_status(
                        "pass_due_to_skip", dynamo_start_stats=start_stats
                    )

                force_max_multiplier = False
                if (
                    self.args.freezing
                    and self.args.bfloat16
                    and torch._dynamo.utils.counters["inductor"]["binary_folding_conv"]
                    > 0
                ):
                    force_max_multiplier = True

                try:
                    if self.args.training and self.args.amp:
                        if process_fn := self.get_output_amp_train_process_func.get(
                            name, None
                        ):
                            correct_result = process_fn(correct_result)
                            new_result = process_fn(new_result)
                            fp64_outputs = process_fn(fp64_outputs)

                    if (
                        self.args.save_model_outputs_to
                        and self.args.compare_model_outputs_with
                        and self.args.save_model_outputs_to
                        == self.args.compare_model_outputs_with
                    ):
                        log.warning(
                            "args.save_model_outputs_to and args.compare_model_outputs_with points to the same path."
                            "Result will be undefined."
                        )

                    if self.args.save_model_outputs_to:
                        print(
                            f"Save model outputs to: {self.args.save_model_outputs_to}"
                        )
                        torch.save(new_result, self.args.save_model_outputs_to)

                    if self.args.compare_model_outputs_with:
                        print(
                            f"Load model outputs from {self.args.compare_model_outputs_with} to compare"
                        )
                        saved_result = torch.load(
                            self.args.compare_model_outputs_with, weights_only=False
                        )
                        is_bitwise_same = bitwise_same(saved_result, new_result)
                        if not is_bitwise_same:
                            print(
                                "The result is not bitwise equivalent to the previously saved result"
                            )
                            return record_status(
                                "not_bitwise_equivalent",
                                dynamo_start_stats=start_stats,
                            )

                        print(
                            "The result is bitwise equivalent to the previously saved result"
                        )
                        del saved_result

                    if not same(
                        correct_result,
                        new_result,
                        fp64_outputs,
                        equal_nan=self.equal_nan,
                        use_larger_multiplier_for_smaller_tensor=self.use_larger_multiplier_for_smaller_tensor(
                            name
                        ),
                        cos_similarity=cos_similarity,
                        tol=tolerance,
                        force_max_multiplier=force_max_multiplier,
                        use_iou_for_bool=self.use_iou_for_bool_accuracy(name),
                        iou_threshold=self.get_iou_threshold(name),
                    ):
                        run_passed = False
                except Exception:
                    # Sometimes torch.allclose may throw RuntimeError
                    run_passed = False

                if run_passed:
                    pass_count += 1

                if accuracy_check_runs > 1:
                    log.info(
                        "Accuracy check run %d/%d: %s",
                        run_idx + 1,
                        accuracy_check_runs,
                        "passed" if run_passed else "failed",
                    )

            # Pass if majority of runs pass (more than half)
            is_same = pass_count > accuracy_check_runs // 2

            if accuracy_check_runs > 1:
                log.info(
                    "Accuracy check summary: %d/%d runs passed, %s",
                    pass_count,
                    accuracy_check_runs,
                    "PASS" if is_same else "FAIL",
                )

            if not is_same:
                if self.args.skip_accuracy_check:
                    accuracy_status = "pass_due_to_skip"
                else:
                    accuracy_status = "fail_accuracy"
                return record_status(accuracy_status, dynamo_start_stats=start_stats)

        return record_status(accuracy_status, dynamo_start_stats=start_stats)

    def check_tolerance(
        self, name, model, example_inputs, optimize_ctx, base_device="cpu"
    ):
        """
        Checks tolerance based on https://pytorch.org/docs/stable/generated/torch.allclose.html.
        """
        tolerance_status = "pass"
        if name in self.skip_accuracy_checks_large_models_dashboard:
            tolerance_status = "pass_due_to_skip"
            return tolerance_status
        # Cast the model to float16/float32 as necessary
        model, example_inputs = self.maybe_cast(model, example_inputs)

        with self.pick_grad(name, self.args.training):
            # Get results of native pytorch
            reset_rng_state()
            model_copy = copy.deepcopy(model)
            model_copy = model_copy.to(base_device)
            example_inputs_copy = copy.deepcopy(example_inputs)
            example_inputs_copy = tree_map(
                lambda x: x.to(base_device), example_inputs_copy
            )
            self.init_optimizer(name, base_device, model_copy.parameters())
            correct_result = self.run_n_iterations(
                model_copy, example_inputs_copy, self.model_iter_fn
            )

            # Run with Dynamo
            # Sometime CI fails with random triton compilation failure which will be skipped for now
            # TODO: revisit this after switching to new Triton runtime
            reset_rng_state()
            torch._dynamo.reset()
            try:
                self.init_optimizer(name, current_device, model.parameters())
                optimized_model_iter_fn = optimize_ctx(self.model_iter_fn)
                new_result = self.run_n_iterations(
                    model_copy, example_inputs, optimized_model_iter_fn
                )
            except Exception:
                log.exception("")
                print(
                    "TorchDynamo optimized model failed to run because of following error"
                )
                return "fail_to_run"

            def dump_max_mean_values(tol, ref, res):
                if isinstance(ref, (list, tuple, torch.nn.ParameterList, torch.Size)):
                    for refi, resi in zip(ref, res):
                        dump_max_mean_values(tol, refi, resi)
                elif isinstance(ref, dict):
                    for k in ref:
                        dump_max_mean_values(tol, ref[k], res[k])
                elif isinstance(ref, torch.Tensor):
                    res = res.to(base_device)
                    t = torch.abs(ref - res) / (1 + torch.abs(ref))
                    tol.append(t.flatten().to(torch.float32))
                return tol

            tol = []
            dump_max_mean_values(tol, correct_result, new_result)
            tol = torch.cat(tol)
            tol = torch.tensor(tol)
            max = torch.max(tol)
            mean = torch.mean(tol)
            div = torch.std(tol)
            headers = ["dev", "name", "batch_size", "max", "mean", "std"]
            fields = [
                current_device,
                current_name,
                current_batch_size,
                max.item(),
                mean.item(),
                div.item(),
            ]
            write_outputs(output_filename, headers, fields)
        return tolerance_status

    def run_performance_test_non_alternate(
        self, name, model, example_inputs, optimize_ctx, experiment, tag=None
    ):
        "Run performance test in non-alternately."
        if experiment.func is not latency_experiment:
            raise AssertionError(
                f"Must run with latency_experiment, got {experiment.func}"
            )

        def warmup(fn, model, example_inputs, mode, niters=10):
            gc.collect()
            peak_mem = 0
            start_stats = get_dynamo_stats()
            try:
                if current_device == "cuda":
                    torch.cuda.reset_peak_memory_stats()
                    empty_gpu_cache(current_device)
                elif current_device == "hpu":
                    torch.hpu.reset_peak_memory_stats()
                t0 = time.perf_counter()
                for _ in range(niters):
                    fn(model, example_inputs)
                t1 = time.perf_counter()
                latency = t1 - t0
                if current_device == "cuda":
                    peak_mem = get_peak_memory()
                elif current_device == "hpu":
                    peak_mem = torch.hpu.max_memory_allocated() / 10**9
                elif current_device == "cpu":
                    total = psutil.virtual_memory().total
                    percentage = psutil.Process(os.getpid()).memory_percent()
                    peak_mem = percentage * total / 10**9
            except Exception:
                log.exception("Backend %s failed in warmup()", mode)
                write_csv_when_exception(
                    self.args, current_name, "warmup_failed", current_device
                )
                output_signpost({}, self.args, self.suite_name, error="warmup_failed")
                return sys.exit(-1)
            dynamo_stats = get_dynamo_stats()
            dynamo_stats.subtract(start_stats)
            return latency, peak_mem, dynamo_stats

        # Cast the model to float16/float32 as necessary
        model, example_inputs = self.maybe_cast(model, example_inputs)

        # Use distributed wrapping as necessary
        model = self.deepcopy_and_maybe_parallelize(model)

        if not hasattr(model, name):
            model.name = name
        self.init_optimizer(name, current_device, model.parameters())

        # The self.autocast context is needed for the model we export with aot_compile,
        # similar to what we do in the check_accuracy function
        ctx = (
            self.autocast(**self.autocast_arg)
            if self.args.export_aot_inductor
            else contextlib.nullcontext()
        )

        with self.pick_grad(name, self.args.training), ctx:
            ok, total = Stats.reset_counters()
            experiment_kwargs = {}
            if tag is not None:
                experiment_kwargs["tag"] = tag
            results = []

            with maybe_snapshot_memory(
                self.args.snapshot_memory, f"eager_{self.args.only}"
            ):
                eager_latency, eager_peak_mem, _ = warmup(
                    self.model_iter_fn, model, example_inputs, "eager"
                )
                if self.args.use_warm_peak_memory:
                    _, eager_peak_mem, _ = warmup(
                        self.model_iter_fn, model, example_inputs, "eager", niters=1
                    )

            baseline_timings = experiment(
                self.model_iter_fn,
                model,
                example_inputs,
                mark="expected",
                **experiment_kwargs,
            )

            # reset dynamo
            torch._dynamo.reset()

            if self.args.export_aot_inductor:
                optimized_model_iter_fn = optimize_ctx
            else:
                optimized_model_iter_fn = optimize_ctx(self.model_iter_fn)

            with maybe_snapshot_memory(
                self.args.snapshot_memory, f"compiled_{self.args.only}"
            ):
                dynamo_latency, dynamo_peak_mem, dynamo_stats = warmup(
                    optimized_model_iter_fn, model, example_inputs, "dynamo"
                )
                if self.args.use_warm_peak_memory:
                    _, dynamo_peak_mem, _ = warmup(
                        optimized_model_iter_fn,
                        model,
                        example_inputs,
                        "dynamo",
                        niters=1,
                    )
                # If we use warm peak memory, the AOT model loading transient memory
                # won't be present on the warm measurement.  We only have to account for
                # it when using cold memory.
                elif self.args.export_aot_inductor:
                    dynamo_peak_mem -= AOTInductorModelCache.get_excess_memory(model)

            if self.args.profile_dynamo_cache_lookup:
                with torch.profiler.profile(
                    activities=[torch.profiler.ProfilerActivity.CPU]
                ) as prof:
                    warmup(optimized_model_iter_fn, model, example_inputs, "dynamo")

                events = list(
                    filter(
                        lambda event: "TorchDynamo Cache Lookup" in event.key,
                        prof.key_averages(),
                    )
                )
                dynamo_cache_lookup_latency = events[0].self_cpu_time_total

            compilation_time = dynamo_latency - eager_latency
            compression_ratio = (
                eager_peak_mem / dynamo_peak_mem if dynamo_peak_mem else 0.0
            )
            if self.args.print_memory:
                print(
                    f"memory: eager: {eager_peak_mem:.2f} GB, "
                    f"dynamo: {dynamo_peak_mem:.2f} GB, "
                    f"ratio: {compression_ratio:.2f}"
                )

            if self.args.print_compilation_time:
                print(f"Compilation time: {compilation_time:.2f}")

            if experiment.func is speedup_experiment:
                experiment_kwargs["compilation_latency"] = compilation_time
                experiment_kwargs["compression_ratio"] = compression_ratio
                experiment_kwargs["eager_peak_mem"] = eager_peak_mem
                experiment_kwargs["dynamo_peak_mem"] = dynamo_peak_mem
                experiment_kwargs["dynamo_stats"] = dynamo_stats
                if self.args.profile_dynamo_cache_lookup:
                    experiment_kwargs["cache_lookup_latency"] = (
                        dynamo_cache_lookup_latency
                    )

            backend_timings = experiment(
                self.model_iter_fn,
                model,
                example_inputs,
                mark="expected",
                **experiment_kwargs,
            )
            timings = np.stack((baseline_timings, backend_timings), axis=1)
            result_summary = latency_experiment_summary(
                self.suite_name, self.args, model, timings, **experiment_kwargs
            )
            results.append(result_summary)
            return " ".join(map(str, results))

    def run_performance_test(
        self,
        name,
        model,
        example_inputs,
        optimize_ctx,
        experiment,
        tag=None,
        batch_size=None,
    ):
        niters = 5
        if getattr(self, "hf_llm", False):
            # If we're benchmarking an llm, we want to use the generate function
            self.model_iter_fn = self.generate
            niters = 1

        if self.args.xla:
            with self.pick_grad(name, self.args.training):
                return experiment(
                    self.model_iter_fn, *self.maybe_cast(model, example_inputs)
                )

        def warmup(fn, model, example_inputs, mode, niters=5):
            gc.collect()
            peak_mem = 0
            start_stats = get_dynamo_stats()
            try:
                if current_device == "cuda":
                    torch.cuda.reset_peak_memory_stats()
                    empty_gpu_cache(current_device)
                elif current_device == "hpu":
                    torch.hpu.reset_peak_memory_stats()
                t0 = time.perf_counter()
                for _ in range(niters):
                    fn(model, example_inputs)
                t1 = time.perf_counter()
                latency = t1 - t0
                if current_device == "cuda":
                    peak_mem = get_peak_memory()
                elif current_device == "hpu":
                    peak_mem = torch.hpu.max_memory_allocated() / 10**9
                elif current_device == "cpu":
                    total = psutil.virtual_memory().total
                    percentage = psutil.Process(os.getpid()).memory_percent()
                    peak_mem = percentage * total / 10**9
            except Exception:
                log.exception("Backend %s failed in warmup()", mode)
                write_csv_when_exception(
                    self.args, current_name, "warmup_failed", current_device
                )
                output_signpost({}, self.args, self.suite_name, error="warmup_failed")
                return sys.exit(-1)
            dynamo_stats = get_dynamo_stats()
            dynamo_stats.subtract(start_stats)
            return latency, peak_mem, dynamo_stats

        # Cast the model to float16/float32 as necessary
        model, example_inputs = self.maybe_cast(model, example_inputs)

        # Use distributed wrapping as necessary
        model = self.deepcopy_and_maybe_parallelize(model)

        if not hasattr(model, name):
            model.name = name

        self.init_optimizer(name, current_device, model.parameters())

        # The self.autocast context is needed for the model we export with aot_compile,
        # similar to what we do in the check_accuracy function
        ctx = (
            self.autocast(**self.autocast_arg)
            if self.args.export_aot_inductor
            else contextlib.nullcontext()
        )

        with self.pick_grad(name, self.args.training), ctx:
            ok, total = Stats.reset_counters()
            experiment_kwargs = {}
            experiment_kwargs["batch_size"] = batch_size
            if tag is not None:
                experiment_kwargs["tag"] = tag
            results = []
            with maybe_snapshot_memory(
                self.args.snapshot_memory, f"eager_{self.args.only}"
            ):
                with torch.compiler.set_stance("force_eager"):
                    eager_latency, eager_peak_mem, _ = warmup(
                        self.model_iter_fn,
                        copy.deepcopy(model),
                        example_inputs,
                        "eager",
                        niters=niters,
                    )
                    if self.args.use_warm_peak_memory:
                        _, eager_peak_mem, _ = warmup(
                            self.model_iter_fn,
                            copy.deepcopy(model),
                            example_inputs,
                            "eager",
                            niters=1,
                        )

            if (
                self.args.export_aot_inductor
                or self.args.export_nativert
                or self.args.torchscript_jit_trace
                or self.args.aot_precompile
            ):
                optimized_model_iter_fn = optimize_ctx
            else:
                if getattr(self, "hf_llm", False):
                    # If it's an llm, we want to optimize model.forward, and use
                    # the generate function
                    model = optimize_ctx(model)
                    optimized_model_iter_fn = self.model_iter_fn
                else:
                    optimized_model_iter_fn = optimize_ctx(self.model_iter_fn)

            with maybe_snapshot_memory(
                self.args.snapshot_memory, f"compiled_{self.args.only}"
            ):
                dynamo_latency, dynamo_peak_mem, dynamo_stats = warmup(
                    optimized_model_iter_fn, model, example_inputs, "dynamo"
                )
                if self.args.use_warm_peak_memory:
                    _, dynamo_peak_mem, _ = warmup(
                        optimized_model_iter_fn,
                        model,
                        example_inputs,
                        "dynamo",
                        niters=1,
                    )
                # If we use warm peak memory, the AOT model loading transient memory
                # won't be present on the warm measurement.  We only have to account for
                # it when using cold memory.
                elif self.args.export_aot_inductor:
                    dynamo_peak_mem -= AOTInductorModelCache.get_excess_memory(model)

            if self.args.profile_dynamo_cache_lookup:
                with torch.profiler.profile(
                    activities=[torch.profiler.ProfilerActivity.CPU]
                ) as prof:
                    warmup(optimized_model_iter_fn, model, example_inputs, "dynamo")

                events = list(
                    filter(
                        lambda event: "TorchDynamo Cache Lookup" in event.key,
                        prof.key_averages(),
                    )
                )
                dynamo_cache_lookup_latency = events[0].self_cpu_time_total

            compilation_time = dynamo_latency - eager_latency
            compression_ratio = (
                eager_peak_mem / dynamo_peak_mem if dynamo_peak_mem else 0.0
            )
            if self.args.print_memory:
                print(
                    f"memory: eager: {eager_peak_mem:.2f} GB, "
                    f"dynamo: {dynamo_peak_mem:.2f} GB, "
                    f"ratio: {compression_ratio:.2f}"
                )

            if self.args.print_compilation_time:
                print(f"Compilation time: {compilation_time:.2f}")

            if experiment.func is speedup_experiment:
                experiment_kwargs["compilation_latency"] = compilation_time
                experiment_kwargs["compression_ratio"] = compression_ratio
                experiment_kwargs["eager_peak_mem"] = eager_peak_mem
                experiment_kwargs["dynamo_peak_mem"] = dynamo_peak_mem
                experiment_kwargs["dynamo_stats"] = dynamo_stats
                if self.args.profile_dynamo_cache_lookup:
                    experiment_kwargs["cache_lookup_latency"] = (
                        dynamo_cache_lookup_latency
                    )

            if experiment.func is coverage_experiment:
                ok, total = Stats.reset_counters()
                results = []
                # run with torch._dynamo few times to populate the cache
                for _ in range(3):
                    optimized_model_iter_fn(model, example_inputs)
                _, frames_second_pass = Stats.reset_counters()  # should be 0
                if frames_second_pass > 0:
                    optimized_model_iter_fn(model, example_inputs)
                    _, frames_third_pass = Stats.reset_counters()  # should be 0
                else:
                    frames_third_pass = 0

                results.append(
                    f"{ok:3}/{total:3} +{frames_third_pass} frames {compilation_time:3.0f}s"
                )

            experiment_kwargs["hf_llm"] = getattr(self, "hf_llm", False)

            results.append(
                experiment(
                    self.model_iter_fn, model, example_inputs, **experiment_kwargs
                )
            )
            return " ".join(map(str, results))

    def minify_model(
        self,
        name,
        model,
        example_inputs,
        optimize_ctx,
        experiment,
        tag,
    ):
        log.info("Minifying %s...", name)
        os.environ["TORCH_COMPILE_DEBUG"] = "1"
        os.environ["TORCHDYNAMO_REPRO_AFTER"] = "dynamo"
        os.environ["TORCHDYNAMO_REPRO_LEVEL"] = "4"

        self.check_accuracy(name, model, example_inputs, optimize_ctx, experiment, tag)

        if self.args.output_directory:
            repro_dir = self.args.output_directory
        else:
            repro_dir = torch._dynamo.config.base_dir

        try:
            shutil.move("repro.py", f"{repro_dir}/{name}_repro.py")
        except OSError:
            log.error("Could not find repro script for model %s", name)
        else:
            log.info(
                "Repro script for model %s with minified graph saved to %s",
                name,
                repro_dir,
            )

    def maybe_preserve_compile_debug(self, name, status):
        if (
            name in CI_PRESERVE_COMPILE_DEBUG
            and status in CI_PRESERVE_COMPILE_DEBUG[name]
        ):
            src_dir = torch._dynamo.utils.get_debug_dir()
            if os.path.isdir(src_dir):
                dbg_dir = os.path.join(
                    os.getcwd(), "test", "debug", "torch_compile_debug"
                )
                dst_dir = os.path.join(dbg_dir, os.path.basename(src_dir))
                try:
                    os.makedirs(dbg_dir, exist_ok=True)
                    os.rename(src_dir, dst_dir)
                    log.warning("Moved %s to %s", src_dir, dst_dir)
                except OSError:
                    log.exception("Failed to preserve %s", src_dir)

    def run_one_model(
        self,
        name,
        model,
        example_inputs,
        optimize_ctx,
        experiment,
        explain=False,
        tag=None,
        batch_size=None,
    ):
        mode = "train" if self.args.training else "eval"
        msg = f"{current_device:4} {mode:5} {current_name:34} "
        if tag:
            msg += f" {tag:26}"
        print(msg, flush=True)

        start_stats = get_dynamo_stats()

        if self.args.accuracy:
            status = self.check_accuracy(
                name, model, example_inputs, optimize_ctx, experiment, tag
            )
            print(status)
            if status == "fail_accuracy" and self.args.minify:
                self.minify_model(
                    name, model, example_inputs, optimize_ctx, experiment, tag
                )
        elif self.args.tolerance:
            status = self.check_tolerance(name, model, example_inputs, optimize_ctx)
            print(status)
        elif self.args.performance:
            if self.args.backend in ["torchao", "optimus"]:
                status = self.run_performance_test_non_alternate(
                    name, model, example_inputs, optimize_ctx, experiment, tag
                )
            else:
                status = self.run_performance_test(
                    name,
                    model,
                    example_inputs,
                    optimize_ctx,
                    experiment,
                    tag,
                    batch_size=batch_size,
                )
            print(status)
        empty_gpu_cache(current_device)

        self.maybe_preserve_compile_debug(name, status)

        if self.args.timing:
            from torch._dynamo.utils import op_count, print_time_report
            from torch.utils._stats import simple_call_counter

            print_time_report()
            stats = "STATS: "
            stats = stats + " | ".join(
                itertools.chain(
                    [f"call_* op count: {op_count}"],
                    (f"{key}:{value}" for key, value in simple_call_counter.items()),
                )
            )
            print(stats)
        stats = get_dynamo_stats()
        stats.subtract(start_stats)

        if explain:
            print(
                f"Dynamo produced {stats['unique_graphs']} graphs "
                f"covering {stats['calls_captured']} ops with "
                f"{stats['graph_breaks']} graph breaks ({stats['unique_graph_breaks']} unique)"
            )

        if explain or self.args.log_graph_breaks or self.args.print_graph_breaks:
            filename = f"{output_filename.rstrip('.csv')}_graph_breaks.csv"

            def add_double_quotes(x):
                # Delimiter because reason could have comma
                return f'"{x}"'

            for graph_break in graph_break_reasons:
                reason = add_double_quotes(graph_break.reason)
                user_stack = add_double_quotes(
                    ", ".join([str(x) for x in graph_break.user_stack])
                )

                # NB: Don't upload them to the benchmark database as they are debugging
                # information. There are also around a million records a day which is
                # wasteful to store
                write_outputs(
                    filename,
                    ["model", "reason", "user_stack"],
                    [current_name, reason, user_stack],
                    False,
                )

        if self.args.stats:
            Stats.print_summary()

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free