Files
tinygrad/test/amd/hw/test_dpp.py
George Hotz 359b1582d6 amd: EMU DPP support (#15719)
* EMU DPP support from GPT 5.4

* cleanups

* simple

* nope

* fix
2026-04-14 14:58:41 +08:00

188 lines
6.2 KiB
Python

"""Tests for DPP16 source swizzles.
These instructions trap in the default wave32 hw helper, so this file uses a
minimal wave64 lane-store harness and compares emulator vs hardware directly
when USE_HW=1.
"""
import ctypes, unittest
from tinygrad.runtime.autogen.amd.rdna3.ins import *
from tinygrad.helpers import flat_mv
from test.amd.hw.helpers import USE_HW, assemble
from test.mockgpu.amd.emu import run_asm
WAVE64 = 64
def _wave64_code(instructions: list, out_reg: int = 1) -> bytes:
return assemble([
s_mov_b32(s[80], s[0]),
s_mov_b32(s[81], s[1]),
v_mov_b32_e32(v[255], v[0]),
*instructions,
s_load_b64(s[92:93], s[80:81], 0, soffset=NULL),
s_waitcnt(0),
v_lshlrev_b32_e32(v[240], 2, v[255]),
global_store_b32(addr=v[240], data=v[out_reg], saddr=s[92:93], offset=0),
s_endpgm(),
])
def _run_wave64_emu(instructions: list, out_reg: int = 1) -> list[int]:
out_buf = (ctypes.c_uint32 * WAVE64)(*([0] * WAVE64))
args = (ctypes.c_uint64 * 1)(ctypes.addressof(out_buf))
code = _wave64_code(instructions, out_reg)
kernel_buf = (ctypes.c_char * len(code)).from_buffer_copy(code)
rsrc2 = 0x19c | (128 << 15)
scratch_size = 0x10000
result = run_asm(ctypes.addressof(kernel_buf), len(code), 1, 1, 1, WAVE64, 1, 1, ctypes.addressof(args), rsrc2, scratch_size)
assert result == 0, f"run_asm failed with {result}"
return list(out_buf)
def _run_wave64_hw(instructions: list, out_reg: int = 1) -> list[int]:
from tinygrad.device import Device
from tinygrad.runtime.ops_amd import AMDProgram
from tinygrad.runtime.support.compiler_amd import HIPCompiler
dev = Device["AMD"]
compiler = HIPCompiler(dev.arch) # type: ignore[attr-defined]
code = _wave64_code(instructions, out_reg)
byte_str = ', '.join(f'0x{b:02x}' for b in code)
asm_src = f""".text
.globl test
.p2align 8
.type test,@function
test:
.byte {byte_str}
.rodata
.p2align 6
.amdhsa_kernel test
.amdhsa_next_free_vgpr 256
.amdhsa_next_free_sgpr 96
.amdhsa_user_sgpr_kernarg_segment_ptr 1
.amdhsa_kernarg_size 8
.amdhsa_group_segment_fixed_size 65536
.amdhsa_private_segment_fixed_size 65536
.amdhsa_enable_private_segment 1
.end_amdhsa_kernel
.amdgpu_metadata
---
amdhsa.version:
- 1
- 0
amdhsa.kernels:
- .name: test
.symbol: test.kd
.kernarg_segment_size: 8
.group_segment_fixed_size: 65536
.private_segment_fixed_size: 65536
.kernarg_segment_align: 8
.wavefront_size: 64
.sgpr_count: 96
.vgpr_count: 256
.max_flat_workgroup_size: 1024
...
.end_amdgpu_metadata
"""
lib = compiler.compile(asm_src)
prg = AMDProgram(dev, "test", lib) # type: ignore[arg-type]
out_gpu = dev.allocator.alloc(WAVE64 * 4)
prg(out_gpu, global_size=(1, 1, 1), local_size=(WAVE64, 1, 1), wait=True)
out = bytearray(WAVE64 * 4)
dev.allocator._copyout(flat_mv(memoryview(out)), out_gpu)
return [int.from_bytes(out[i*4:(i+1)*4], 'little') for i in range(WAVE64)]
def run_wave64(instructions: list, out_reg: int = 1) -> list[int]:
emu = _run_wave64_emu(instructions, out_reg)
if not USE_HW: return emu
hw = _run_wave64_hw(instructions, out_reg)
if emu != hw:
diffs = [f"lane {i}: emu=0x{e:08x} hw=0x{h:08x}" for i, (e, h) in enumerate(zip(emu, hw)) if e != h]
raise AssertionError("Emulator vs Hardware mismatch:\n" + '\n'.join(diffs[:16]))
return hw
class TestDPP16(unittest.TestCase):
def _run_copy(self, dpp: int, *, row_mask: int = 0xf, bank_mask: int = 0xf, bc: int = 1, dst_seed: int | None = None) -> list[int]:
instructions = [
v_mul_u32_u24_e32(v[0], 10, v[255]),
v_add_nc_u32_e32(v[0], 3, v[0]),
]
if dst_seed is not None: instructions.append(v_mov_b32_e32(v[1], dst_seed))
instructions += [v_mov_b32_e32(v[2], 0), v_or_b32_e32(v[1], DPP, v[2], vsrc0=v[0], dpp=dpp, row_mask=row_mask, bank_mask=bank_mask, bc=bc)]
return run_wave64(instructions)
def test_quad_perm_reverse(self):
out = self._run_copy(0x1b)
self.assertEqual(out[0], 33)
self.assertEqual(out[1], 23)
self.assertEqual(out[2], 13)
self.assertEqual(out[3], 3)
self.assertEqual(out[4], 73)
def test_row_shl(self):
out = self._run_copy(0x101)
self.assertEqual(out[0], 13)
self.assertEqual(out[7], 83)
self.assertEqual(out[14], 153)
self.assertEqual(out[15], 0)
self.assertEqual(out[16], 173)
def test_row_shr(self):
out = self._run_copy(0x111)
self.assertEqual(out[0], 0)
self.assertEqual(out[1], 3)
self.assertEqual(out[8], 73)
self.assertEqual(out[15], 143)
self.assertEqual(out[16], 0)
self.assertEqual(out[17], 163)
def test_row_ror(self):
out = self._run_copy(0x121)
self.assertEqual(out[0], 153)
self.assertEqual(out[1], 3)
self.assertEqual(out[15], 143)
self.assertEqual(out[16], 313)
def test_row_mirror(self):
out = self._run_copy(0x140)
self.assertEqual(out[0], 153)
self.assertEqual(out[5], 103)
self.assertEqual(out[8], 73)
self.assertEqual(out[16], 313)
def test_row_half_mirror(self):
out = self._run_copy(0x141)
self.assertEqual(out[0], 73)
self.assertEqual(out[7], 3)
self.assertEqual(out[8], 153)
self.assertEqual(out[15], 83)
self.assertEqual(out[16], 233)
def test_row_mask(self):
out = self._run_copy(0x101, row_mask=0x5, dst_seed=0xDEADBEEF)
self.assertEqual(out[0], 13)
self.assertEqual(out[15], 0)
self.assertEqual(out[16], 0xDEADBEEF)
self.assertEqual(out[32], 333)
self.assertEqual(out[47], 0)
self.assertEqual(out[48], 0xDEADBEEF)
def test_bank_mask(self):
out = self._run_copy(0x101, bank_mask=0x5, dst_seed=0xDEADBEEF)
self.assertEqual(out[0], 13)
self.assertEqual(out[3], 43)
self.assertEqual(out[4], 0xDEADBEEF)
self.assertEqual(out[8], 93)
self.assertEqual(out[12], 0xDEADBEEF)
class TestVOPCDPP16(unittest.TestCase):
def test_row_bcast15_materializes_vcc(self):
out = run_wave64([
v_mov_b32_e32(v[0], v[255]),
v_cmp_eq_u32_e32(DPP, v[0], vsrc0=v[0], dpp=0x142, row_mask=0xf, bank_mask=0xf, bc=1),
v_mov_b32_e32(v[2], 0),
v_mov_b32_e32(v[3], 1),
v_cndmask_b32_e32(v[1], v[2], v[3]),
])
for lane in (0, 16, 32, 48): self.assertEqual(out[lane], 1)
for lane in (1, 15, 31, 47, 63): self.assertEqual(out[lane], 0)