Skip to content

Commit

Permalink
chore: tweak threadpool configuration (#483)
Browse files Browse the repository at this point in the history
* feat: separate ydb read and write threadpools

* fix: broken import
  • Loading branch information
BobTheBuidler authored Jan 10, 2024
1 parent d6c9ccf commit 819c22b
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 28 deletions.
6 changes: 3 additions & 3 deletions y/_db/utils/contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@

from brownie import chain
from y._db.utils._ep import _get_get_token
from y._db.utils.decorators import a_sync_db_session
from y._db.utils.decorators import a_sync_read_db_session, a_sync_write_db_session
from y._db.utils.utils import ensure_block


logger = logging.getLogger(__name__)

@a_sync_db_session
@a_sync_read_db_session
def get_deploy_block(address: str) -> Optional[int]:
get_token = _get_get_token()
if deploy_block := get_token(address, sync=True).deploy_block:
logger.debug('%s deploy block from cache: %s', address, deploy_block.number)
return deploy_block.number
logger.debug('%s deploy block not cached, fetching from chain', address)

@a_sync_db_session
@a_sync_write_db_session
def set_deploy_block(address: str, deploy_block: int) -> None:
from y._db.utils._ep import _get_get_token
get_token = _get_get_token()
Expand Down
28 changes: 24 additions & 4 deletions y/_db/utils/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
logger = logging.getLogger(__name__)


token_attr_threads = PruningThreadPoolExecutor(32)
ydb_read_threads = PruningThreadPoolExecutor(32)
ydb_write_threads = PruningThreadPoolExecutor(16)

def retry_locked(callable: Callable[_P, _T]) -> Callable[_P, _T]:
@wraps(callable)
Expand All @@ -39,19 +40,38 @@ def retry_locked_wrap(*args: _P.args, **kwargs: _P.kwargs) -> _T:
raise e
return retry_locked_wrap

a_sync_db_session = lambda fn: a_sync(default='async', executor=token_attr_threads)(
a_sync_read_db_session = lambda fn: a_sync(default='async', executor=ydb_write_threads)(
db_session(
retry_locked(
fn
)
)
)
a_sync_db_session_cached = lambda fn: a_sync(default='async', executor=token_attr_threads)(

a_sync_write_db_session = lambda fn: a_sync(default='async', executor=ydb_read_threads)(
db_session(
retry_locked(
fn
)
)
)

a_sync_read_db_session_cached = lambda fn: a_sync(default='async', executor=ydb_read_threads)(
retry_locked(
lru_cache(maxsize=None)(
db_session(
fn
)
)
)
)

a_sync_write_db_session_cached = lambda fn: a_sync(default='async', executor=ydb_read_threads)(
retry_locked(
lru_cache(maxsize=None)(
db_session(
fn
)
)
)
)
)
6 changes: 3 additions & 3 deletions y/_db/utils/price.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@

from y import constants
from y._db.entities import Price, insert
from y._db.utils.decorators import a_sync_db_session
from y._db.utils.decorators import a_sync_read_db_session, a_sync_write_db_session
from y._db.utils.token import ensure_token
from y._db.utils.utils import ensure_block


logger = logging.getLogger(__name__)

@a_sync_db_session
@a_sync_read_db_session
def get_price(address: str, block: int) -> Optional[Decimal]:
ensure_block(block, sync=True)
ensure_token(address)
Expand All @@ -33,7 +33,7 @@ async def set_price(address: str, block: int, price: Decimal) -> None:
await t
_tasks.remove(t)

@a_sync_db_session
@a_sync_write_db_session
def _set_price(address: str, block: int, price: Decimal) -> None:
with suppress(InvalidOperation): # happens with really big numbers sometimes. nbd, we can just skip the cache in this case.
ensure_block(block, sync=True)
Expand Down
20 changes: 10 additions & 10 deletions y/_db/utils/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
from y import constants
from y._db.entities import Address, Token, insert
from y._db.utils._ep import _get_get_token
from y._db.utils.decorators import a_sync_db_session
from y._db.utils.decorators import a_sync_read_db_session, a_sync_write_db_session
from y._db.utils.utils import ensure_chain
from y.erc20 import decimals

logger = logging.getLogger(__name__)

@a_sync_db_session
@a_sync_read_db_session
def get_token(address: str) -> Token:
ensure_chain()
address = convert.to_address(address)
Expand All @@ -35,7 +35,7 @@ def ensure_token(address: str) -> None:
get_token = _get_get_token()
get_token(address, sync=True)

@a_sync_db_session
@a_sync_read_db_session
def get_bucket(address: str) -> Optional[str]:
if address == constants.EEE_ADDRESS:
return
Expand All @@ -45,37 +45,37 @@ def get_bucket(address: str) -> Optional[str]:
logger.debug("found %s bucket %s in ydb", address, bucket)
return bucket

@a_sync_db_session
@a_sync_write_db_session
def set_bucket(address: str, bucket: str) -> None:
if address == constants.EEE_ADDRESS:
return
get_token = _get_get_token()
get_token(address, sync=True).bucket = bucket
logger.debug("updated %s bucket in ydb: %s", address, bucket)

@a_sync_db_session
@a_sync_read_db_session
def get_symbol(address: str) -> Optional[str]:
get_token = _get_get_token()
symbol = get_token(address, sync=True).symbol
if symbol:
logger.debug("found %s symbol %s in ydb", address, symbol)
return symbol

@a_sync_db_session
@a_sync_write_db_session
def set_symbol(address: str, symbol: str) -> None:
get_token = _get_get_token()
get_token(address, sync=True).symbol = symbol
logger.debug("updated %s symbol in ydb: %s", address, symbol)

@a_sync_db_session
@a_sync_read_db_session
def get_name(address: str) -> Optional[str]:
get_token = _get_get_token()
name = get_token(address, sync=True).name
if name:
logger.debug("found %s name %s in ydb", address, name)
return name

@a_sync_db_session
@a_sync_write_db_session
def set_name(address: str, name: str) -> None:
get_token = _get_get_token()
get_token(address, sync=True).name = name
Expand All @@ -89,15 +89,15 @@ async def get_decimals(address: str) -> int:
asyncio.create_task(coro=set_decimals(address, d), name=f"set_decimals {address}")
return d

@a_sync_db_session
@a_sync_read_db_session
def _get_token_decimals(address: str) -> Optional[int]:
get_token = _get_get_token()
decimals = get_token(address, sync=True).decimals
logger.debug("found %s decimals %s in ydb", address, decimals)
if decimals:
return decimals

@a_sync_db_session
@a_sync_write_db_session
def set_decimals(address: str, decimals: int) -> None:
get_token = _get_get_token()
get_token(address, sync=True).decimals = decimals
Expand Down
4 changes: 2 additions & 2 deletions y/_db/utils/traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
from y._db.common import DiskCache, Filter, _clean_addresses, filter_threads
from y._db.entities import Chain, Trace, TraceCacheInfo, insert
from y._db.utils._ep import _get_get_block
from y._db.utils.decorators import a_sync_db_session
from y._db.utils.decorators import a_sync_write_db_session
from y.utils.dank_mids import dank_w3
from y.utils.middleware import BATCH_SIZE

logger = logging.getLogger(__name__)

@a_sync_db_session
@a_sync_write_db_session
def insert_trace(trace: dict) -> None:
get_block = _get_get_block()
kwargs = {
Expand Down
12 changes: 6 additions & 6 deletions y/_db/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@

from y._db.entities import Block, Chain, insert
from y._db.utils._ep import _get_get_block
from y._db.utils.decorators import a_sync_db_session, a_sync_db_session_cached
from y._db.utils.decorators import a_sync_read_db_session, a_sync_write_db_session, a_sync_write_db_session_cached

logger = logging.getLogger(__name__)


@a_sync_db_session
@a_sync_read_db_session
def get_chain() -> Chain:
return Chain.get(id=chain.id) or insert(type=Chain, id=chain.id) or Chain[chain.id]

Expand All @@ -23,19 +23,19 @@ def ensure_chain() -> None:
"""ensures that the chain object for the connected chain has been inserted to the db"""
get_chain(sync=True)

@a_sync_db_session
@a_sync_read_db_session
def get_block(number: int) -> Block:
ensure_chain()
if block := Block.get(chain=chain.id, number=number):
return block
return insert(type=Block, chain=chain.id, number=number) or get_block(number, sync=True)

@a_sync_db_session_cached
@a_sync_write_db_session_cached
def ensure_block(number: int) -> None:
get_block = _get_get_block()
get_block(number, sync=True)

@a_sync_db_session
@a_sync_read_db_session
def get_block_timestamp(number: int) -> Optional[int]:
get_block = _get_get_block()
block = get_block(number, sync=True)
Expand All @@ -47,7 +47,7 @@ def get_block_timestamp(number: int) -> Optional[int]:
logger.debug("got %s.timestamp from cache: %s, %s", block, unix, ts)
return unix

@a_sync_db_session
@a_sync_write_db_session
def set_block_timestamp(block: int, timestamp: int) -> None:
get_block = _get_get_block()
block = get_block(block, sync=True)
Expand Down

0 comments on commit 819c22b

Please sign in to comment.