Files
tinygrad/test/backend/test_randomness.py
Christopher Milan 8ddd1328df remove getenv(CI) (#16365)
gone everywhere except test_interop, because torch MPS does not work in actions
2026-05-25 20:23:33 -04:00

263 lines
11 KiB
Python

import unittest, math
from tinygrad import dtypes, Tensor, Device
from tinygrad.helpers import getenv, DEV
from tinygrad.codegen import to_program
from tinygrad.uop.ops import Ops
from tinygrad.renderer.ptx import PTXRenderer
from tinygrad.renderer.nir import NIRRenderer
from tinygrad.renderer.isa.x86 import X86Renderer
from test.helpers import not_support_multi_device, needs_second_gpu
from test.unit.test_randomness import equal_distribution, normal_test
import numpy as np
import torch
from hypothesis import given, settings, strategies as strat
settings.register_profile("my_profile", max_examples=200, deadline=None, derandomize=getenv("DERANDOMIZE_CI", False))
settings.load_profile("my_profile")
class TestRandomness(unittest.TestCase):
def test_rand(self):
self.assertFalse(normal_test(Tensor.rand))
self.assertTrue(equal_distribution(Tensor.rand, torch.rand, lambda x: np.random.rand(*x)))
def test_rand_is_lazy(self):
Tensor.manual_seed(0)
r1 = Tensor.rand(10)
self.assertFalse(r1.uop.is_realized, "rand should be lazy - tensor should not be realized")
counter = Tensor._device_rng_counters[Device.DEFAULT]
self.assertFalse(counter.uop.is_realized, "rand should be lazy - counter should not be realized")
# second rand triggers assign path
r2 = Tensor.rand(10)
self.assertFalse(r2.uop.is_realized, "rand should be lazy - tensor should not be realized after second rand")
self.assertFalse(counter.uop.is_realized, "rand should be lazy - counter should not be realized after second rand")
Tensor.realize(r1, r2)
self.assertTrue(r1.uop.is_realized, "tensor should be realized after .realize()")
self.assertTrue(r2.uop.is_realized, "tensor should be realized after .realize()")
@unittest.skipUnless(dtypes.float16 in Device[Device.DEFAULT].renderer.supported_dtypes(), "need float16 support")
def test_rand_float16(self):
N = 128
x = Tensor.rand((2, N, N), dtype=dtypes.float16)
assert x.dtype == dtypes.float16
nx = x.numpy()
# seed dependant, check output range is [0, 1)
assert nx[nx == 1].size == 0
assert nx[nx == 0].size > 0
equal_distribution(lambda *x: Tensor.rand(*x, dtype=dtypes.float16), torch.rand, lambda x: np.random.rand(*x), shape=(2, N, N))
@unittest.skipIf(DEV.interface.startswith("MOCK") and Device.DEFAULT in {"NV", "CUDA"}, "gpuocelot doesn't support certain ops needed for threefry")
def test_threefry_against_reference(self):
Tensor.manual_seed(1337)
# reference generated using
"""
key0 = 1337
key1 = 0
values = jax.extend.random.threefry_2x32((np.uint32(key1), np.uint32(key0)), np.arange(20, dtype=np.uint32))
print(f"[{', '.join(f'{v}' for v in values)}]")
"""
jr = np.array([2221762175, 1752107825, 653745012, 1967534793, 1395205442, 3840423848, 2159346757,
603508235, 3319473678, 3363866483, 3544324138, 1436466838, 2169858556, 2570072943,
2387150698, 3678370550, 2911697663, 403244401, 2560861638, 1692360114])
counts = Tensor.arange(20, dtype=dtypes.uint32)
counts0, counts1 = counts.chunk(2)
r = Tensor._threefry_random_bits(Tensor([0, 1337], dtype='uint32'), counts0, counts1).numpy()
np.testing.assert_allclose(jr, r)
@unittest.skipIf(isinstance(Device[Device.DEFAULT].renderer, (NIRRenderer, PTXRenderer)), "PTX and NIR use pointer arithmetic")
@unittest.skipIf(isinstance(Device[Device.DEFAULT].renderer, X86Renderer), "X86 callee saved registers have ulong dtype")
def test_threefry_doesnt_use_long(self):
linear = Tensor.rand(20).schedule_linear()
for call in linear.src:
ast = call.src[0]
if ast.op is Ops.SINK:
prg = to_program(ast, renderer=Device[Device.DEFAULT].renderer)
for u in tuple(prg.src[2].src):
self.assertNotIn(u.dtype, {dtypes.long, dtypes.ulong}, msg=f"long found in {prg.arg.name}")
def test_threefry_against_reference_full(self):
Tensor.manual_seed(1337)
# reference generated using
"""
key0 = 1337
key1 = int.from_bytes(hashlib.sha256(int(0).to_bytes(4)).digest(), "big") & 0xffffffff
# derive new key for the counter offset (c_low=0, c_high=0 for first call)
new_key_values = jax.extend.random.threefry_2x32((np.uint32(key1), np.uint32(key0)), np.array([0, 0], dtype=np.uint32))
new_key = (np.uint32(new_key_values[0]), np.uint32(new_key_values[1]))
values = jax.extend.random.threefry_2x32(new_key, np.arange(20, dtype=np.uint32))
values = (values >> (32 - 23)) | np.array(1, dtype=np.float32).view(np.uint32)
values = values.view(np.float32) - 1
print(f"[{', '.join(f'{v}' for v in values)}]")
"""
jr = np.array([0.45735931396484375, 0.6311527490615845, 0.15571284294128418, 0.8149417638778687, 0.7862188816070557,
0.8008807897567749, 0.568588376045227, 0.9852620363235474, 0.42314577102661133, 0.9811755418777466,
0.38059568405151367, 0.09186363220214844, 0.9497315883636475, 0.5826880931854248, 0.3796330690383911,
0.5610522031784058, 0.16122901439666748, 0.3732343912124634, 0.9795231819152832, 0.3280656337738037], dtype=np.float32)
r = Tensor.rand(20).numpy()
np.testing.assert_allclose(r, jr, atol=1e-5, rtol=1e-5)
# next 20 (c_low=20, c_high=0)
jr = np.array([0.09199333190917969, 0.9130761623382568, 0.7048608064651489, 0.22254979610443115, 0.0014830827713012695,
0.37023448944091797, 0.7790107727050781, 0.7484984397888184, 0.7524604797363281, 0.19875383377075195,
0.48537540435791016, 0.10002851486206055, 0.5369305610656738, 0.3294715881347656, 0.5246957540512085,
0.7659651041030884, 0.7949080467224121, 0.34988296031951904, 0.9798505306243896, 0.2599533796310425], dtype=np.float32)
r = Tensor.rand(20).numpy()
np.testing.assert_allclose(r, jr, atol=1e-5, rtol=1e-5)
# next 10 (c_low=40, c_high=0)
jr = np.array([0.3198714256286621, 0.7984923124313354, 0.320881724357605, 0.4716068506240845, 0.7323365211486816,
0.9663800001144409, 0.13873648643493652, 0.16062307357788086, 0.49300849437713623, 0.10077548027038574], dtype=np.float32)
r = Tensor.rand(10).numpy()
np.testing.assert_allclose(r, jr, atol=1e-5, rtol=1e-5)
@needs_second_gpu
@unittest.skipIf(not_support_multi_device(), "no multi")
def test_threefry_tensors_cnt(self):
Tensor.manual_seed(1337)
Tensor.rand(20).realize()
assert len(Tensor._device_rng_counters) == 1
assert len(Tensor._device_seeds) == 1
Tensor.rand(20, device=f"{Device.DEFAULT}:1").realize()
assert len(Tensor._device_rng_counters) == 2
assert len(Tensor._device_seeds) == 2
Tensor.manual_seed(2)
assert len(Tensor._device_rng_counters) == 0
assert len(Tensor._device_seeds) == 0
@needs_second_gpu
@unittest.skipIf(not_support_multi_device(), "no multi")
def test_threefry_same_kernels(self):
Tensor.manual_seed(0)
Tensor.rand(1).realize()
s = Tensor.rand(20).schedule_linear().src
s2 = Tensor.rand(20).schedule_linear().src
assert len(s) == len(s2), f"{len(s)} != {len(s2)}"
for x,y in zip(s, s2):
if not (x.src[0] == y.src[0]):
print(f"{x.src[0]} != {y.src[0]}")
Tensor.rand(1, device=f"{Device.DEFAULT}:1").realize()
s3 = Tensor.rand(20, device=f"{Device.DEFAULT}:1").schedule_linear().src
s4 = Tensor.rand(20, device=f"{Device.DEFAULT}:1").schedule_linear().src
assert len(s3) == len(s4), f"{len(s3)} != {len(s4)}"
assert len(s2) == len(s4), f"{len(s)} != {len(s3)}"
for x,y in zip(s3, s4):
if not (x.src[0] == y.src[0]):
print(f"{x.src[0]} != {y.src[0]}")
@unittest.skipUnless(dtypes.bfloat16 in Device[Device.DEFAULT].renderer.supported_dtypes(), "need bfloat16 support")
def test_rand_bfloat16(self):
N = 128
x = Tensor.rand((2, N, N), dtype=dtypes.bfloat16)
assert x.dtype == dtypes.bfloat16
nx = x.numpy()
assert nx[nx == 1].size == 0
assert nx[nx == 0].size > 0
equal_distribution(lambda *x: Tensor.rand(*x, dtype=dtypes.bfloat16).float(), torch.rand, lambda x: np.random.rand(*x), shape=(2, N, N))
def test_rand_like(self):
empty = Tensor.empty((80, 44))
rand = Tensor.rand_like(empty)
assert rand.shape == empty.shape
assert rand.dtype == empty.dtype
assert rand.device == empty.device
def test_randn_like(self):
empty = Tensor.empty((80, 44))
rand = Tensor.randn_like(empty)
assert rand.shape == empty.shape
assert rand.dtype == empty.dtype
assert rand.device == empty.device
def test_rand_like_zero_shape(self):
empty = Tensor.empty(0, 20)
rand = Tensor.rand_like(empty)
assert rand.shape == empty.shape
assert rand.dtype == empty.dtype
assert rand.device == empty.device
def test_rand_like_more_dims(self):
empty = Tensor.empty((1, 2, 3, 4, 5, 6))
rand = Tensor.rand_like(empty)
assert rand.shape == empty.shape
assert rand.dtype == empty.dtype
assert rand.device == empty.device
def test_rand_like_dtype(self):
empty = Tensor.empty((80, 44), dtype=dtypes.float16)
rand = Tensor.rand_like(empty)
assert rand.shape == empty.shape
assert rand.dtype == empty.dtype
assert rand.device == empty.device
empty = Tensor.empty((80, 44))
rand = Tensor.rand_like(empty, dtype=dtypes.float16)
assert rand.shape == empty.shape
assert rand.dtype == dtypes.float16
assert rand.device == empty.device
def test_randn_like_dtype(self):
empty = Tensor.empty((80, 44), dtype=dtypes.float16)
rand = Tensor.randn_like(empty)
assert rand.shape == empty.shape
assert rand.dtype == empty.dtype
assert rand.device == empty.device
empty = Tensor.empty((80, 44))
rand = Tensor.randn_like(empty, dtype=dtypes.float16)
assert rand.shape == empty.shape
assert rand.dtype == dtypes.float16
assert rand.device == empty.device
def test_randn_device(self):
self.assertEqual(Tensor.randn(3,3,device="CPU").device, "CPU")
@given(strat.sampled_from([dtypes.float, dtypes.float16, dtypes.bfloat16]))
def test_randn_finite(self, default_float):
if default_float not in Device[Device.DEFAULT].renderer.supported_dtypes(): return
old_default_float = dtypes.default_float
# low precision can result in inf from randn
dtypes.default_float = default_float
t = Tensor.randn(64, 64)
mx = t.max().numpy().item()
mn = t.min().numpy().item()
print(f"testing with {default_float=}")
assert math.isfinite(mx), mx
assert math.isfinite(mn), mn
dtypes.default_float = old_default_float
def test_random_counter_overflow(self):
device = Device.DEFAULT
Tensor.manual_seed(1337)
Tensor.rand(1).realize()
Tensor._device_rng_counters[device].assign(Tensor([dtypes.uint32.max - 5, 0], device=device, dtype=dtypes.uint32)).realize()
Tensor.rand(10).realize()
c = Tensor._device_rng_counters[device].numpy()
np.testing.assert_allclose(c, [4, 1])
Tensor.rand(10).realize()
c = Tensor._device_rng_counters[device].numpy()
np.testing.assert_allclose(c, [14, 1])
if __name__ == "__main__":
unittest.main()