Skip to content

Commit

Permalink
Merge pull request #4221 from tybug/atheris-tcs
Browse files Browse the repository at this point in the history
Add and use `BytestringProvider` in `fuzz_one_input`
  • Loading branch information
tybug authored Jan 18, 2025
2 parents 50d2707 + 703ae81 commit 2ce4344
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 8 deletions.
3 changes: 3 additions & 0 deletions hypothesis-python/RELEASE.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
RELEASE_TYPE: patch

:ref:`fuzz_one_input <fuzz_one_input>` is now implemented using an :ref:`alternative backend <alternative-backends>`. This brings the interpretation of the fuzzer-provided bytestring closer to the fuzzer mutations, allowing the mutations to work more reliably. We hope to use this backend functionality to improve fuzzing integration (see e.g. https://github.com/google/atheris/issues/20) in the future!
12 changes: 10 additions & 2 deletions hypothesis-python/src/hypothesis/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
ensure_free_stackframes,
gc_cumulative_time,
)
from hypothesis.internal.conjecture.providers import BytestringProvider
from hypothesis.internal.conjecture.shrinker import sort_key_ir
from hypothesis.internal.entropy import deterministic_PRNG
from hypothesis.internal.escalation import (
Expand Down Expand Up @@ -1866,7 +1867,13 @@ def fuzz_one_input(
if isinstance(buffer, io.IOBase):
buffer = buffer.read(BUFFER_SIZE)
assert isinstance(buffer, (bytes, bytearray, memoryview))
data = ConjectureData.for_buffer(buffer)
data = ConjectureData(
max_length=BUFFER_SIZE,
prefix=b"",
random=None,
provider=BytestringProvider,
provider_kw={"bytestring": buffer},
)
try:
state.execute_once(data)
except (StopTest, UnsatisfiedAssumption):
Expand All @@ -1880,7 +1887,8 @@ def fuzz_one_input(
settings.database.save(database_key, ir_to_bytes(data.choices))
minimal_failures[data.interesting_origin] = data.ir_nodes
raise
return bytes(data.buffer)
assert isinstance(data.provider, BytestringProvider)
return bytes(data.provider.drawn)

fuzz_one_input.__doc__ = HypothesisHandle.fuzz_one_input.__doc__
return fuzz_one_input
Expand Down
16 changes: 12 additions & 4 deletions hypothesis-python/src/hypothesis/internal/conjecture/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1606,11 +1606,20 @@ def __init__(
provider: Union[type, PrimitiveProvider] = HypothesisProvider,
ir_prefix: Optional[Sequence[Union[NodeTemplate, ChoiceT]]] = None,
max_length_ir: Optional[int] = None,
provider_kw: Optional[dict[str, Any]] = None,
) -> None:
from hypothesis.internal.conjecture.engine import BUFFER_SIZE_IR

if observer is None:
observer = DataObserver()
if provider_kw is None:
provider_kw = {}
elif not isinstance(provider, type):
raise InvalidArgument(
f"Expected {provider=} to be a class since {provider_kw=} was "
"passed, but got an instance instead."
)

assert isinstance(observer, DataObserver)
self._bytes_drawn = 0
self.observer = observer
Expand All @@ -1621,9 +1630,6 @@ def __init__(
self.__prefix = bytes(prefix)
self.__random = random

if ir_prefix is None:
assert random is not None or max_length <= len(prefix)

self.buffer: "Union[bytes, bytearray]" = bytearray()
self.index = 0
self.length_ir = 0
Expand All @@ -1644,9 +1650,11 @@ def __init__(
self.has_discards = False

self.provider: PrimitiveProvider = (
provider(self) if isinstance(provider, type) else provider
provider(self, **provider_kw) if isinstance(provider, type) else provider
)
assert isinstance(self.provider, PrimitiveProvider)
if ir_prefix is None and isinstance(self.provider, HypothesisProvider):
assert random is not None or max_length <= len(prefix)

self.__result: "Optional[ConjectureResult]" = None

Expand Down
182 changes: 182 additions & 0 deletions hypothesis-python/src/hypothesis/internal/conjecture/providers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# This file is part of Hypothesis, which may be found at
# https://github.com/HypothesisWorks/hypothesis/
#
# Copyright the Hypothesis Authors.
# Individual contributors are listed in AUTHORS.rst and the git log.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.

import math
from typing import Optional

from hypothesis.internal.compat import int_from_bytes
from hypothesis.internal.conjecture.data import (
BYTE_MASKS,
COLLECTION_DEFAULT_MAX_SIZE,
ConjectureData,
PrimitiveProvider,
bits_to_bytes,
)
from hypothesis.internal.conjecture.floats import lex_to_float
from hypothesis.internal.conjecture.utils import many
from hypothesis.internal.floats import make_float_clamper
from hypothesis.internal.intervalsets import IntervalSet


class BytestringProvider(PrimitiveProvider):
lifetime = "test_case"

def __init__(
self, conjecturedata: Optional["ConjectureData"], /, *, bytestring: bytes
):
super().__init__(conjecturedata)
self.bytestring = bytestring
self.index = 0
self.drawn = bytearray()

def _draw_bits(self, n):
if n == 0: # pragma: no cover
return 0
n_bytes = bits_to_bytes(n)
if self.index + n_bytes > len(self.bytestring):
self._cd.mark_overrun()
buf = bytearray(self.bytestring[self.index : self.index + n_bytes])
self.index += n_bytes

buf[0] &= BYTE_MASKS[n % 8]
buf = bytes(buf)
self.drawn += buf
return int_from_bytes(buf)

def draw_boolean(
self,
p: float = 0.5,
*,
forced: Optional[bool] = None,
fake_forced: bool = False,
) -> bool:
if forced is not None:
return forced

if p <= 0:
return False
if p >= 1:
return True

# always use one byte for booleans to maintain constant draw size.
# If a probability requires more than 8 bits to represent precisely,
# the result will be slightly biased, but not badly.
bits = 8
size = 2**bits
# always leave at least one value that can be true, even for very small
# p.
falsey = max(1, math.floor(size * (1 - p)))
n = self._draw_bits(bits)
return n >= falsey

def draw_integer(
self,
min_value: Optional[int] = None,
max_value: Optional[int] = None,
*,
weights: Optional[dict[int, float]] = None,
shrink_towards: int = 0,
forced: Optional[int] = None,
fake_forced: bool = False,
) -> int:
if forced is not None:
return forced

assert self._cd is not None

# we explicitly ignore integer weights for now, as they are likely net
# negative on fuzzer performance.

if min_value is None and max_value is None:
min_value = -(2**127)
max_value = 2**127 - 1
elif min_value is None:
assert max_value is not None
min_value = max_value - 2**64
elif max_value is None:
assert min_value is not None
max_value = min_value + 2**64

if min_value == max_value:
return min_value

bits = (max_value - min_value).bit_length()
value = self._draw_bits(bits)
while not (min_value <= value <= max_value):
value = self._draw_bits(bits)
return value

def draw_float(
self,
*,
min_value: float = -math.inf,
max_value: float = math.inf,
allow_nan: bool = True,
smallest_nonzero_magnitude: float,
forced: Optional[float] = None,
fake_forced: bool = False,
) -> float:
if forced is not None:
return forced

n = self._draw_bits(64)
sign = -1 if n >> 64 else 1
f = sign * lex_to_float(n & ((1 << 64) - 1))
clamper = make_float_clamper(
min_value,
max_value,
smallest_nonzero_magnitude=smallest_nonzero_magnitude,
allow_nan=allow_nan,
)
return clamper(f)

def _draw_collection(self, min_size, max_size, *, alphabet_size):
average_size = min(
max(min_size * 2, min_size + 5),
0.5 * (min_size + max_size),
)
elements = many(
self._cd,
min_size=min_size,
max_size=max_size,
average_size=average_size,
observe=False,
)
values = []
while elements.more():
values.append(self.draw_integer(0, alphabet_size - 1))
return values

def draw_string(
self,
intervals: IntervalSet,
*,
min_size: int = 0,
max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
forced: Optional[str] = None,
fake_forced: bool = False,
) -> str:
if forced is not None:
return forced
values = self._draw_collection(min_size, max_size, alphabet_size=len(intervals))
return "".join(chr(intervals[v]) for v in values)

def draw_bytes(
self,
min_size: int = 0,
max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
*,
forced: Optional[bytes] = None,
fake_forced: bool = False,
) -> bytes:
if forced is not None:
return forced
values = self._draw_collection(min_size, max_size, alphabet_size=2**8)
return bytes(values)
11 changes: 11 additions & 0 deletions hypothesis-python/tests/conjecture/test_alt_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,3 +595,14 @@ def test_function(x):
with pytest.raises(AssertionError) as ctx:
test_function()
assert (msg in ctx.value.__notes__) == (provider is UnsoundVerifierProvider)


def test_invalid_provider_kw():
with pytest.raises(InvalidArgument, match="got an instance instead"):
ConjectureData(
max_length=0,
prefix=b"",
random=None,
provider=TrivialProvider(None),
provider_kw={"one": "two"},
)
71 changes: 71 additions & 0 deletions hypothesis-python/tests/conjecture/test_provider_contract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# This file is part of Hypothesis, which may be found at
# https://github.com/HypothesisWorks/hypothesis/
#
# Copyright the Hypothesis Authors.
# Individual contributors are listed in AUTHORS.rst and the git log.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.

from hypothesis import example, given, strategies as st
from hypothesis.errors import StopTest
from hypothesis.internal.conjecture.choice import (
choice_equal,
choice_from_index,
choice_permitted,
)
from hypothesis.internal.conjecture.data import ConjectureData
from hypothesis.internal.conjecture.engine import BUFFER_SIZE
from hypothesis.internal.conjecture.providers import BytestringProvider
from hypothesis.internal.intervalsets import IntervalSet

from tests.conjecture.common import float_kw, integer_kw, ir_types_and_kwargs, string_kw


@example(b"\x00" * 100, [("integer", integer_kw())])
@example(b"\x00" * 100, [("integer", integer_kw(0, 2))])
@example(b"\x00" * 100, [("integer", integer_kw(0, 0))])
@example(b"\x00" * 100, [("integer", integer_kw(min_value=0))])
@example(b"\x00" * 100, [("integer", integer_kw(max_value=2))])
@example(b"\x00" * 100, [("integer", integer_kw(0, 2, weights={0: 0.1}))])
@example(b"\x00" * 100, [("boolean", {"p": 1.0})])
@example(b"\x00" * 100, [("boolean", {"p": 0.0})])
@example(b"\x00" * 100, [("boolean", {"p": 1e-99})])
@example(b"\x00" * 100, [("string", string_kw(IntervalSet.from_string("a")))])
@example(b"\x00" * 100, [("float", float_kw())])
@example(b"\x00" * 100, [("bytes", {"min_size": 0, "max_size": 10})])
@given(st.binary(min_size=200), st.lists(ir_types_and_kwargs()))
def test_provider_contract_bytestring(bytestring, ir_type_and_kwargs):
data = ConjectureData(
BUFFER_SIZE,
prefix=b"",
random=None,
observer=None,
provider=BytestringProvider,
provider_kw={"bytestring": bytestring},
)

for ir_type, kwargs in ir_type_and_kwargs:
try:
value = getattr(data, f"draw_{ir_type}")(**kwargs)
except StopTest:
return

# ir_value_permitted is currently restricted to what *could* be generated
# by the buffer. once we're fully on the TCS, we can drop this restriction.
# until then, the BytestringProvider can theoretically generate values
# that aren't forcable to a buffer - but this requires an enormous shrink_towards
# value and is such an edge case that I'm just going to bank on nobody hitting
# it before we're off the bytestring.
integer_edge_case = (
ir_type == "integer"
and kwargs["shrink_towards"] is not None
and kwargs["shrink_towards"].bit_length() > 100
)
assert choice_permitted(value, kwargs) or integer_edge_case

kwargs["forced"] = choice_from_index(0, ir_type, kwargs)
assert choice_equal(
kwargs["forced"], getattr(data, f"draw_{ir_type}")(**kwargs)
)
4 changes: 2 additions & 2 deletions hypothesis-python/tests/cover/test_fuzz_one_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_fuzz_one_input(buffer_type):
@settings(database=db, phases=[Phase.reuse, Phase.shrink])
def test(s):
seen.append(s)
assert "\0" not in s, repr(s)
assert len(s) < 5, repr(s)

# Before running fuzz_one_input, there's nothing in `db`, and so the test passes
# (because example generation is disabled by the custom settings)
Expand Down Expand Up @@ -67,7 +67,7 @@ def test(s):
# reproduce it, *and shrink to a minimal example*.
with pytest.raises(AssertionError):
test()
assert seen[-1] == "\0"
assert seen[-1] == "0" * 5


def test_can_fuzz_with_database_eq_None():
Expand Down

0 comments on commit 2ce4344

Please sign in to comment.