Skip to content

FanoutCache

FanoutCache

FanoutCache(
    directory: str | PathLike[str] | None = None,
    disk_type: (
        type[DiskProtocol]
        | Callable[..., DiskProtocol]
        | None
    ) = None,
    disk_args: Mapping[str, Any] | None = None,
    timeout: float = 60,
    shard_size: int = 8,
    **kwargs: Unpack[SettingsKwargs]
)

Bases: CacheProtocol

Disk and file backed cache.

FanoutCache is a cache that uses multiple shards to store key-value pairs.

Parameters:

Name Type Description Default

directory

str | PathLike[str] | None

directory for cache. Default is None.

None

disk_type

type[DiskProtocol] | Callable[..., DiskProtocol] | None

DiskProtocol class or callable. Default is None.

None

disk_args

Mapping[str, Any] | None

keyword arguments for disk_type. Default is None.

None

timeout

float

connection timeout. Default is 60 seconds.

60

shard_size

int

number of shards. Default is 8.

8

**kwargs

Unpack[SettingsKwargs]

additional keyword arguments for Settings.

{}
Source code in src/typed_diskcache/implement/cache/fanout/main.py
def __init__(
    self,
    directory: str | PathLike[str] | None = None,
    disk_type: type[DiskProtocol] | Callable[..., DiskProtocol] | None = None,
    disk_args: Mapping[str, Any] | None = None,
    timeout: float = 60,
    shard_size: int = 8,
    **kwargs: Unpack[SettingsKwargs],
) -> None:
    if directory is None:
        directory = tempfile.mkdtemp(prefix="typed-diskcache-")
    directory = Path(directory)
    directory = cache_utils.ensure_cache_directory(directory)
    directory = directory.expanduser()
    directory = Path(expandvars(directory))

    size_limit = kwargs.get("size_limit", DEFAULT_SIZE_LIMIT)
    kwargs["size_limit"] = size_limit // shard_size

    self._cache = Shard(
        directory,
        disk_type=disk_type,
        disk_args=disk_args,
        timeout=timeout,
        **kwargs,
    )
    constructor = partial(
        Shard, disk_type=disk_type, disk_args=disk_args, timeout=timeout, **kwargs
    )
    with ThreadPoolExecutor(shard_size) as pool:
        futures = pool.map(
            constructor, (directory / f"{index:03d}" for index in range(shard_size))
        )
        self._shards = tuple(futures)

directory property

directory: Path

Directory for cache.

timeout property

timeout: float

Timeout for cache operations.

conn property

conn: Connection

Database connection.

disk property

Disk object.

settings property writable

settings: Settings

Settings for cache.

get

get(
    key: Any, default: _AnyT, *, retry: bool = ...
) -> Container[Any | _AnyT]
get(
    key: Any, default: Any = ..., *, retry: bool = ...
) -> Container[Any]
get(
    key: Any, default: Any = None, *, retry: bool = True
) -> Container[Any]

Retrieve value from cache.

If key is missing, return container with default.

Parameters:

Name Type Description Default

key

Any

Key for item.

required

default

Any

Value to return if key is missing.

None

retry

bool

Retry if database timeout occurs.

True

Returns:

Type Description
Container[Any]

Container with cached value or default if key not found.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def get(
    self, key: Any, default: Any = None, *, retry: bool = True
) -> Container[Any]:
    shard = fanout_utils.get_shard(key, self.disk, self._shards)
    try:
        return shard.get(key, default=default, retry=retry)
    except (OperationalError, TypedDiskcacheError):
        return cache_utils.wrap_default(default)

aget async

aget(
    key: Any, default: _AnyT, *, retry: bool = ...
) -> Container[Any | _AnyT]
aget(
    key: Any, default: Any = ..., *, retry: bool = ...
) -> Container[Any]
aget(
    key: Any, default: Any = None, *, retry: bool = True
) -> Container[Any]

Asynchronously retrieve value from cache.

If key is missing, return container with default.

Parameters:

Name Type Description Default

key

Any

Key for item.

required

default

Any

Value to return if key is missing.

None

retry

bool

Retry if database timeout occurs.

True

Returns:

Type Description
Container[Any]

Container with cached value or default if key not found.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def aget(
    self, key: Any, default: Any = None, *, retry: bool = True
) -> Container[Any]:
    shard = fanout_utils.get_shard(key, self.disk, self._shards)
    try:
        return await shard.aget(key, default=default, retry=retry)
    except (OperationalError, TypedDiskcacheError):
        return cache_utils.wrap_default(default)

set

set(
    key: Any,
    value: Any,
    *,
    expire: float | None = None,
    tags: str | Iterable[str] | None = None,
    retry: bool = False
) -> bool

Set key and value in cache.

Parameters:

Name Type Description Default

key

Any

Key for item.

required

value

Any

Value for item.

required

expire

float | None

Seconds until item expires.

None

tags

str | Iterable[str] | None

Tags to associate with key.

None

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
bool

True if item was set.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def set(
    self,
    key: Any,
    value: Any,
    *,
    expire: float | None = None,
    tags: str | Iterable[str] | None = None,
    retry: bool = False,
) -> bool:
    shard = fanout_utils.get_shard(key, self.disk, self._shards)
    try:
        return shard.set(key, value, expire=expire, tags=tags, retry=retry)
    except (OperationalError, TypedDiskcacheError):
        return False

aset async

aset(
    key: Any,
    value: Any,
    *,
    expire: float | None = None,
    tags: str | Iterable[str] | None = None,
    retry: bool = False
) -> bool

Asynchronously set key and value in cache.

Parameters:

Name Type Description Default

key

Any

Key for item.

required

value

Any

Value for item.

required

expire

float | None

Seconds until item expires.

None

tags

str | Iterable[str] | None

Tags to associate with key.

None

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
bool

True if item was set.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def aset(
    self,
    key: Any,
    value: Any,
    *,
    expire: float | None = None,
    tags: str | Iterable[str] | None = None,
    retry: bool = False,
) -> bool:
    shard = fanout_utils.get_shard(key, self.disk, self._shards)
    try:
        return await shard.aset(key, value, expire=expire, tags=tags, retry=retry)
    except (OperationalError, TypedDiskcacheError):
        return False

delete

delete(key: Any, *, retry: bool = False) -> bool

Delete corresponding item for key from cache.

Missing keys are ignored.

Parameters:

Name Type Description Default

key

Any

Key matching item.

required

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
bool

True if item was deleted.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def delete(self, key: Any, *, retry: bool = False) -> bool:
    shard = fanout_utils.get_shard(key, self.disk, self._shards)
    try:
        return shard.delete(key, retry=retry)
    except (OperationalError, TypedDiskcacheError):
        return False

adelete async

adelete(key: Any, *, retry: bool = False) -> bool

Asynchronously delete corresponding item for key from cache.

Missing keys are ignored.

Parameters:

Name Type Description Default

key

Any

Key matching item.

required

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
bool

True if item was deleted.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def adelete(self, key: Any, *, retry: bool = False) -> bool:
    shard = fanout_utils.get_shard(key, self.disk, self._shards)
    try:
        return await shard.adelete(key, retry=retry)
    except (OperationalError, TypedDiskcacheError):
        return False

clear

clear(*, retry: bool = False) -> int

Remove all items from cache.

Removing items is iterative. Each iteration removes a subset of items. Concurrent writes may occur.

If a TimeoutError occurs, the first element of the exception's args attribute will be the number of items removed before the exception.

Parameters:

Name Type Description Default

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
int

Count of rows removed.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def clear(self, *, retry: bool = False) -> int:
    total = 0
    for shard in self._shards:
        total = fanout_utils.loop_total(total, shard.clear, retry=retry)
    return total

aclear async

aclear(*, retry: bool = False) -> int

Asynchronously remove all items from cache.

Removing items is iterative. Each iteration removes a subset of items. Concurrent writes may occur.

If a TimeoutError occurs, the first element of the exception's args attribute will be the number of items removed before the exception.

Parameters:

Name Type Description Default

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
int

Count of rows removed.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def aclear(self, *, retry: bool = False) -> int:
    total = 0
    for shard in self._shards:
        total = await fanout_utils.async_loop_total(
            total, shard.aclear, retry=retry
        )
    return total

volume

volume() -> int

Return estimated total size of cache on disk.

Returns:

Type Description
int

Size in bytes.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def volume(self) -> int:
    return sum(shard.volume() for shard in self._shards)

avolume async

avolume() -> int

Asynchronously return estimated total size of cache on disk.

Returns:

Type Description
int

Size in bytes.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def avolume(self) -> int:
    total = 0
    for shard in self._shards:
        total += await shard.avolume()
    return total

stats

stats(*, enable: bool = True, reset: bool = False) -> Stats

Return cache statistics hits and misses.

Parameters:

Name Type Description Default

enable

bool

Enable collecting statistics.

True

reset

bool

Reset hits and misses to 0.

False

Returns:

Type Description
Stats

(hits, misses)

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def stats(self, *, enable: bool = True, reset: bool = False) -> Stats:
    hits, misses = 0, 0

    for shard in self._shards:
        shard_stats = shard.stats(enable=enable, reset=reset)
        hits += shard_stats[0]
        misses += shard_stats[1]

    self.update_settings(statistics=enable)
    return Stats(hits=hits, misses=misses)

astats async

astats(
    *, enable: bool = True, reset: bool = False
) -> Stats

Asynchronously return cache statistics hits and misses.

Parameters:

Name Type Description Default

enable

bool

Enable collecting statistics.

True

reset

bool

Reset hits and misses to 0.

False

Returns:

Type Description
Stats

(hits, misses)

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def astats(self, *, enable: bool = True, reset: bool = False) -> Stats:
    validate_installed("anyio", "Consider installing extra `asyncio`.")
    import anyio  # noqa: PLC0415

    hits, misses = 0, 0

    async def update_stats(shard: Shard) -> None:
        nonlocal hits, misses
        shard_stats = await shard.astats(enable=enable, reset=reset)
        hits += shard_stats[0]
        misses += shard_stats[1]

    async with anyio.create_task_group() as task_group:
        for shard in self._shards:
            task_group.start_soon(update_stats, shard)

    await self.aupdate_settings(statistics=enable)
    return Stats(hits=hits, misses=misses)

close

close() -> None

Close database connection.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def close(self) -> None:
    self.conn.close()
    for shard in self._shards:
        shard.close()

aclose async

aclose() -> None

Asynchronously close database connection.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def aclose(self) -> None:
    validate_installed("anyio", "Consider installing extra `asyncio`.")
    import anyio  # noqa: PLC0415

    await self.conn.aclose()
    async with anyio.create_task_group() as task_group:
        for shard in self._shards:
            task_group.start_soon(shard.aclose)

touch

touch(
    key: Any,
    *,
    expire: float | None = None,
    retry: bool = False
) -> bool

Touch key in cache and update expire time.

Parameters:

Name Type Description Default

key

Any

Key for item.

required

expire

float | None

Seconds until item expires. If None, no expiry.

None

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
bool

True if key was touched.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def touch(
    self, key: Any, *, expire: float | None = None, retry: bool = False
) -> bool:
    shard = fanout_utils.get_shard(key, self.disk, self._shards)
    try:
        return shard.touch(key, expire=expire, retry=retry)
    except (OperationalError, TypedDiskcacheError):
        return False

atouch async

atouch(
    key: Any,
    *,
    expire: float | None = None,
    retry: bool = False
) -> bool

Asynchronously touch key in cache and update expire time.

Parameters:

Name Type Description Default

key

Any

Key for item.

required

expire

float | None

Seconds until item expires. If None, no expiry.

None

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
bool

True if key was touched.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def atouch(
    self, key: Any, *, expire: float | None = None, retry: bool = False
) -> bool:
    shard = fanout_utils.get_shard(key, self.disk, self._shards)
    try:
        return await shard.atouch(key, expire=expire, retry=retry)
    except (OperationalError, TypedDiskcacheError):
        return False

add

add(
    key: Any,
    value: Any,
    *,
    expire: float | None = None,
    tags: str | Iterable[str] | None = None,
    retry: bool = False
) -> bool

Add key and value item to cache.

Similar to set, but only add to cache if key not present.

Operation is atomic. Only one concurrent add operation for a given key will succeed.

Parameters:

Name Type Description Default

key

Any

Key for item.

required

value

Any

Value for item.

required

expire

float | None

Seconds until the key expires. If None, no expiry.

None

tags

str | Iterable[str] | None

Tags to associate with key.

None

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
bool

True if item was added.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def add(
    self,
    key: Any,
    value: Any,
    *,
    expire: float | None = None,
    tags: str | Iterable[str] | None = None,
    retry: bool = False,
) -> bool:
    shard = fanout_utils.get_shard(key, self.disk, self._shards)
    try:
        return shard.add(key, value, expire=expire, tags=tags, retry=retry)
    except (OperationalError, TypedDiskcacheError):
        return False

aadd async

aadd(
    key: Any,
    value: Any,
    *,
    expire: float | None = None,
    tags: str | Iterable[str] | None = None,
    retry: bool = False
) -> bool

Asynchronously add key and value item to cache.

Similar to set, but only add to cache if key not present.

Operation is atomic. Only one concurrent add operation for a given key will succeed.

Parameters:

Name Type Description Default

key

Any

Key for item.

required

value

Any

Value for item.

required

expire

float | None

Seconds until the key expires. If None, no expiry.

None

tags

str | Iterable[str] | None

Tags to associate with key.

None

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
bool

True if item was added.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def aadd(
    self,
    key: Any,
    value: Any,
    *,
    expire: float | None = None,
    tags: str | Iterable[str] | None = None,
    retry: bool = False,
) -> bool:
    shard = fanout_utils.get_shard(key, self.disk, self._shards)
    try:
        return await shard.aadd(key, value, expire=expire, tags=tags, retry=retry)
    except (OperationalError, TypedDiskcacheError):
        return False

pop

pop(
    key: Any, default: _AnyT, *, retry: bool = ...
) -> Container[Any | _AnyT]
pop(
    key: Any, default: Any = ..., *, retry: bool = ...
) -> Container[Any]
pop(
    key: Any, default: Any = None, *, retry: bool = True
) -> Container[Any]

Remove corresponding item for key from cache and return value.

If key is missing, return default.

Operation is atomic. Concurrent operations will be serialized.

Parameters:

Name Type Description Default

key

Any

Key for item.

required

default

Any

Value to return if key is missing.

None

retry

bool

Retry if database timeout occurs.

True

Returns:

Type Description
Container[Any]

Container with cached value or default if key not found.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def pop(
    self, key: Any, default: Any = None, *, retry: bool = True
) -> Container[Any]:
    shard = fanout_utils.get_shard(key, self.disk, self._shards)
    try:
        return shard.pop(key, default=default, retry=retry)
    except (OperationalError, TypedDiskcacheError):
        return cache_utils.wrap_default(default)

apop async

apop(
    key: Any, default: _AnyT, *, retry: bool = ...
) -> Container[Any | _AnyT]
apop(
    key: Any, default: Any = ..., *, retry: bool = ...
) -> Container[Any]
apop(
    key: Any, default: Any = None, *, retry: bool = True
) -> Container[Any]

Asynchronously remove corresponding item for key from cache and return value.

If key is missing, return default.

Operation is atomic. Concurrent operations will be serialized.

Parameters:

Name Type Description Default

key

Any

Key for item.

required

default

Any

Value to return if key is missing.

None

retry

bool

Retry if database timeout occurs.

True

Returns:

Type Description
Container[Any]

Container with cached value or default if key not found.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def apop(
    self, key: Any, default: Any = None, *, retry: bool = True
) -> Container[Any]:
    shard = fanout_utils.get_shard(key, self.disk, self._shards)
    try:
        return await shard.apop(key, default=default, retry=retry)
    except (OperationalError, TypedDiskcacheError):
        return cache_utils.wrap_default(default)

filter

filter(
    tags: str | Iterable[str],
    *,
    method: FilterMethodLiteral | FilterMethod = OR
) -> Generator[Any, None, None]

Filter by tags.

Parameters:

Name Type Description Default

tags

str | Iterable[str]

Tags to filter by.

required

method

FilterMethodLiteral | FilterMethod

'and' or 'or' filter method.

OR

Yields:

Type Description
Any

Key of item matching tags.

Warning

This method is unstable and will be improved in the future.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def filter(
    self,
    tags: str | Iterable[str],
    *,
    method: FilterMethodLiteral | FilterMethod = FilterMethod.OR,
) -> Generator[Any, None, None]:
    for shard in self._shards:
        yield from shard.filter(tags, method=method)

afilter async

Asynchronously filter by tags.

Parameters:

Name Type Description Default

tags

str | Iterable[str]

Tags to filter by.

required

method

FilterMethodLiteral | FilterMethod

'and' or 'or' filter method.

OR

Yields:

Type Description
AsyncGenerator[Any, None]

Key of item matching tags.

Warning

This method is unstable and will be improved in the future.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def afilter(
    self,
    tags: str | Iterable[str],
    *,
    method: FilterMethodLiteral | FilterMethod = FilterMethod.OR,
) -> AsyncGenerator[Any, None]:
    for shard in self._shards:
        async for value in shard.afilter(tags, method=method):
            yield value

incr

incr(
    key: Any,
    delta: int = 1,
    default: int | None = 0,
    *,
    retry: bool = False
) -> int

Increment value by delta for item with key.

If key is missing and default is None then raise KeyError. Else if key is missing and default is not None then use default for value.

Operation is atomic. All concurrent increment operations will be counted individually.

Assumes value may be stored in a SQLite column. Most builds that target machines with 64-bit pointer widths will support 64-bit signed integers.

Parameters:

Name Type Description Default

key

Any

Key for item.

required

delta

int

Amount to increment.

1

default

int | None

Value if key is missing.

0

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
int

Incremented value or default if key not found.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def incr(
    self, key: Any, delta: int = 1, default: int | None = 0, *, retry: bool = False
) -> int:
    shard = fanout_utils.get_shard(key, self.disk, self._shards)
    return shard.incr(key, delta=delta, default=default, retry=retry)

aincr async

aincr(
    key: Any,
    delta: int = 1,
    default: int | None = 0,
    *,
    retry: bool = False
) -> int

Async increment value by delta for item with key.

If key is missing and default is None then raise [KeyError]. Else if key is missing and default is not None then use default for value.

Operation is atomic. All concurrent increment operations will be counted individually.

Assumes value may be stored in a SQLite column. Most builds that target machines with 64-bit pointer widths will support 64-bit signed integers.

Parameters:

Name Type Description Default

key

Any

Key for item.

required

delta

int

Amount to increment.

1

default

int | None

Value if key is missing.

0

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
int

Incremented value or default if key not found.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def aincr(
    self, key: Any, delta: int = 1, default: int | None = 0, *, retry: bool = False
) -> int:
    shard = fanout_utils.get_shard(key, self.disk, self._shards)
    return await shard.aincr(key, delta=delta, default=default, retry=retry)

decr

decr(
    key: Any,
    delta: int = 1,
    default: int | None = 0,
    *,
    retry: bool = False
) -> int

Decrement value by delta for item with key.

If key is missing and default is None then raise KeyError. Else if key is missing and default is not None then use default for value.

Operation is atomic. All concurrent decrement operations will be counted individually.

Unlike Memcached, negative values are supported. Value may be decremented below zero.

Assumes value may be stored in a SQLite column. Most builds that target machines with 64-bit pointer widths will support 64-bit signed integers.

Parameters:

Name Type Description Default

key

Any

Key for item.

required

delta

int

Amount to decrement.

1

default

int | None

Value if key is missing.

0

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
int

Decremented value or default if key not found.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def decr(
    self, key: Any, delta: int = 1, default: int | None = 0, *, retry: bool = False
) -> int:
    shard = fanout_utils.get_shard(key, self.disk, self._shards)
    return shard.decr(key, delta=delta, default=default, retry=retry)

adecr async

adecr(
    key: Any,
    delta: int = 1,
    default: int | None = 0,
    *,
    retry: bool = False
) -> int

Async decrement value by delta for item with key.

If key is missing and default is None then raise KeyError. Else if key is missing and default is not None then use default for value.

Operation is atomic. All concurrent decrement operations will be counted individually.

Unlike Memcached, negative values are supported. Value may be decremented below zero.

Assumes value may be stored in a SQLite column. Most builds that target machines with 64-bit pointer widths will support 64-bit signed integers.

Parameters:

Name Type Description Default

key

Any

Key for item.

required

delta

int

Amount to decrement.

1

default

int | None

Value if key is missing.

0

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
int

Decremented value or default if key not found.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def adecr(
    self, key: Any, delta: int = 1, default: int | None = 0, *, retry: bool = False
) -> int:
    shard = fanout_utils.get_shard(key, self.disk, self._shards)
    return await shard.adecr(key, delta=delta, default=default, retry=retry)

evict

evict(
    tags: str | Iterable[str],
    *,
    method: FilterMethodLiteral | FilterMethod = OR,
    retry: bool = False
) -> int

Remove items with matching tag from cache.

Removing items is an iterative process. In each iteration, a subset of items is removed. Concurrent writes may occur between iterations.

If a TimeoutError occurs, the first element of the exception's args attribute will be the number of items removed before the exception occurred.

Parameters:

Name Type Description Default

tags

str | Iterable[str]

Tags identifying items.

required

method

FilterMethodLiteral | FilterMethod

'and' or 'or' filter method.

OR

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
int

Count of rows removed.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def evict(
    self,
    tags: str | Iterable[str],
    *,
    method: FilterMethodLiteral | FilterMethod = FilterMethod.OR,
    retry: bool = False,
) -> int:
    total = 0
    tags = [tags] if isinstance(tags, str) else tags
    tags = list(tags)

    for shard in self._shards:
        total = fanout_utils.loop_total(
            total, shard.evict, tags, method=method, retry=retry
        )
    return total

aevict async

aevict(
    tags: str | Iterable[str],
    *,
    method: FilterMethodLiteral | FilterMethod = OR,
    retry: bool = False
) -> int

Async remove items with matching tag from cache.

Removing items is an iterative process. In each iteration, a subset of items is removed. Concurrent writes may occur between iterations.

If a TimeoutError occurs, the first element of the exception's args attribute will be the number of items removed before the exception occurred.

Parameters:

Name Type Description Default

tags

str | Iterable[str]

Tags identifying items.

required

method

FilterMethodLiteral | FilterMethod

'and' or 'or' filter method.

OR

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
int

Count of rows removed.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def aevict(
    self,
    tags: str | Iterable[str],
    *,
    method: FilterMethodLiteral | FilterMethod = FilterMethod.OR,
    retry: bool = False,
) -> int:
    total = 0
    tags = [tags] if isinstance(tags, str) else tags
    tags = list(tags)

    for shard in self._shards:
        total = await fanout_utils.async_loop_total(
            total, shard.aevict, tags, method=method, retry=retry
        )
    return total

expire

expire(
    now: float | None = None, *, retry: bool = False
) -> int

Remove expired items from cache.

Removing items is an iterative process. In each iteration, a subset of items is removed. Concurrent writes may occur between iterations.

If a TimeoutError occurs, the first element of the exception's args attribute will be the number of items removed before the exception occurred.

Parameters:

Name Type Description Default

now

float | None

Current time. If None, use time.time().

None

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
int

Count of items removed.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def expire(self, now: float | None = None, *, retry: bool = False) -> int:
    total = 0
    now = now or time.time()

    for shard in self._shards:
        total = fanout_utils.loop_total(total, shard.expire, now, retry=retry)
    return total

aexpire async

aexpire(
    now: float | None = None, *, retry: bool = False
) -> int

Async remove expired items from cache.

Removing items is an iterative process. In each iteration, a subset of items is removed. Concurrent writes may occur between iterations.

If a TimeoutError occurs, the first element of the exception's args attribute will be the number of items removed before the exception occurred.

Parameters:

Name Type Description Default

now

float | None

Current time. If None, use time.time().

None

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
int

Count of items removed.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def aexpire(self, now: float | None = None, *, retry: bool = False) -> int:
    total = 0
    now = now or time.time()

    for shard in self._shards:
        total = await fanout_utils.async_loop_total(
            total, shard.aexpire, now, retry=retry
        )
    return total

cull

cull(*, retry: bool = False) -> int

Cull items from cache until volume is less than size limit.

Removing items is an iterative process. In each iteration, a subset of items is removed. Concurrent writes may occur between iterations.

If a TimeoutError occurs, the first element of the exception's args attribute will be the number of items removed before the exception occurred.

Parameters:

Name Type Description Default

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
int

Count of items removed.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def cull(self, *, retry: bool = False) -> int:
    total = 0

    for shard in self._shards:
        total = fanout_utils.loop_total(total, shard.cull, retry=retry)
    return total

acull async

acull(*, retry: bool = False) -> int

Async cull items from cache until volume is less than size limit.

Removing items is an iterative process. In each iteration, a subset of items is removed. Concurrent writes may occur between iterations.

If a TimeoutError occurs, the first element of the exception's args attribute will be the number of items removed before the exception occurred.

Parameters:

Name Type Description Default

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
int

Count of items removed.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def acull(self, *, retry: bool = False) -> int:
    total = 0

    for shard in self._shards:
        total = await fanout_utils.async_loop_total(total, shard.acull, retry=retry)
    return total

check

check(
    *, fix: bool = False, retry: bool = False
) -> list[WarningMessage]

Check database and file system consistency.

Intended for use in testing and post-mortem error analysis.

While checking the Cache table for consistency, a writer lock is held on the database. The lock blocks other cache clients from writing to the database. For caches with many file references, the lock may be held for a long time. For example, local benchmarking shows that a cache with 1,000 file references takes ~60ms to check.

Parameters:

Name Type Description Default

fix

bool

Correct inconsistencies.

False

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
list[WarningMessage]

List of warnings.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def check(self, *, fix: bool = False, retry: bool = False) -> list[WarningMessage]:
    warnings = (shard.check(fix=fix, retry=retry) for shard in self._shards)
    return list(chain.from_iterable(warnings))

acheck async

acheck(
    *, fix: bool = False, retry: bool = False
) -> list[WarningMessage]

Async check database and file system consistency.

Intended for use in testing and post-mortem error analysis.

While checking the Cache table for consistency, a writer lock is held on the database. The lock blocks other cache clients from writing to the database. For caches with many file references, the lock may be held for a long time. For example, local benchmarking shows that a cache with 1,000 file references takes ~60ms to check.

Parameters:

Name Type Description Default

fix

bool

Correct inconsistencies.

False

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
list[WarningMessage]

List of warnings.

Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def acheck(
    self, *, fix: bool = False, retry: bool = False
) -> list[WarningMessage]:
    result: list[WarningMessage] = []
    for shard in self._shards:
        warnings = await shard.acheck(fix=fix, retry=retry)
        result.extend(warnings)
    return result

iterkeys

iterkeys(
    *, reverse: bool = False
) -> Generator[Any, None, None]

Iterate Cache keys in database sort order.

Parameters:

Name Type Description Default

reverse

bool

Reverse sort order.

False

Yields:

Type Description
Any

Key of item.

Examples:

import typed_diskcache


def main() -> None:
    cache = typed_diskcache.Cache()
    for key in [4, 1, 3, 0, 2]:
        cache[key] = key
    print(list(cache.iterkeys()))
    # [0, 1, 2, 3, 4]
    print(list(cache.iterkeys(reverse=True)))
    # [4, 3, 2, 1, 0]
Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def iterkeys(self, *, reverse: bool = False) -> Generator[Any, None, None]:
    shards = reversed(self._shards) if reverse else self._shards
    for shard in shards:
        yield from shard.iterkeys(reverse=reverse)

aiterkeys async

aiterkeys(
    *, reverse: bool = False
) -> AsyncGenerator[Any, None]

Async iterate Cache keys in database sort order.

Parameters:

Name Type Description Default

reverse

bool

Reverse sort order.

False

Yields:

Type Description
AsyncGenerator[Any, None]

Key of item.

Examples:

import typed_diskcache


async def main() -> None:
    cache = typed_diskcache.Cache()
    for key in [4, 1, 3, 0, 2]:
        cache[key] = key
    print([x async for x in cache.aiterkeys()])
    # [0, 1, 2, 3, 4]
    print([x async for x in cache.aiterkeys(reverse=True)])
    # [4, 3, 2, 1, 0]
Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def aiterkeys(self, *, reverse: bool = False) -> AsyncGenerator[Any, None]:
    shards = reversed(self._shards) if reverse else self._shards
    for shard in shards:
        async for key in shard.aiterkeys(reverse=reverse):
            yield key

push

push(
    value: Any,
    *,
    prefix: str | None = None,
    side: QueueSideLiteral | QueueSide = BACK,
    expire: float | None = None,
    tags: str | Iterable[str] | None = None,
    retry: bool = False
) -> NoReturn

Push value onto side of queue identified by prefix in cache.

When prefix is None, integer keys are used. Otherwise, string keys are used.

Operation is atomic. Concurrent operations will be serialized.

See also pull.

Parameters:

Name Type Description Default

value

Any

Value for item.

required

prefix

str | None

Key prefix. If None, key is integer.

None

side

QueueSideLiteral | QueueSide

Either 'back' or 'front'.

BACK

expire

float | None

Seconds until the key expires. If None, no expiry.

None

tags

str | Iterable[str] | None

Tags to associate with key.

None

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
NoReturn

Key of the pushed item.

Examples:

import typed_diskcache


def main() -> None:
    cache = typed_diskcache.Cache()
    print(cache.push("first value"))
    # 500000000000000
    print(cache.get(500000000000000))
    # first value
    print(cache.push("second value"))
    # 500000000000001
    print(cache.push("third value", side="front"))
    # 499999999999999
    print(cache.push(1234, prefix="userids"))
    # userids-500000000000000
Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def push(
    self,
    value: Any,
    *,
    prefix: str | None = None,
    side: QueueSideLiteral | QueueSide = QueueSide.BACK,
    expire: float | None = None,
    tags: str | Iterable[str] | None = None,
    retry: bool = False,
) -> NoReturn:
    raise te.TypedDiskcacheNotImplementedError(
        "FanoutCache does not support push operation"
    )

apush async

apush(
    value: Any,
    *,
    prefix: str | None = None,
    side: QueueSideLiteral | QueueSide = BACK,
    expire: float | None = None,
    tags: str | Iterable[str] | None = None,
    retry: bool = False
) -> NoReturn

Async push value onto side of queue identified by prefix in cache.

When prefix is None, integer keys are used. Otherwise, string keys are used.

Operation is atomic. Concurrent operations will be serialized.

See also apull.

Parameters:

Name Type Description Default

value

Any

Value for item.

required

prefix

str | None

Key prefix. If None, key is integer.

None

side

QueueSideLiteral | QueueSide

Either 'back' or 'front'.

BACK

expire

float | None

Seconds until the key expires. If None, no expiry.

None

tags

str | Iterable[str] | None

Tags to associate with key.

None

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
NoReturn

Key of the pushed item.

Examples:

import typed_diskcache


async def main() -> None:
    cache = typed_diskcache.Cache()
    print(await cache.apush("first value"))
    # 500000000000000
    print(await cache.aget(500000000000000))
    # first value
    print(await cache.apush("second value"))
    # 500000000000001
    print(await cache.apush("third value", side="front"))
    # 499999999999999
    print(await cache.apush(1234, prefix="userids"))
    # userids-500000000000000
Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def apush(
    self,
    value: Any,
    *,
    prefix: str | None = None,
    side: QueueSideLiteral | QueueSide = QueueSide.BACK,
    expire: float | None = None,
    tags: str | Iterable[str] | None = None,
    retry: bool = False,
) -> NoReturn:
    raise te.TypedDiskcacheNotImplementedError(
        "FanoutCache does not support push operation"
    )

pull

pull(
    *,
    prefix: str | None = None,
    default: tuple[Any, Any] | None = None,
    side: QueueSideLiteral | QueueSide = FRONT,
    retry: bool = False
) -> NoReturn

Pull key and value item pair from side of queue in cache.

When prefix is None, integer keys are used. Otherwise, string keys are used.

If queue is empty, return default.

Operation is atomic. Concurrent operations will be serialized.

See also push and get.

Parameters:

Name Type Description Default

prefix

str | None

Key prefix. If None, key is integer.

None

default

tuple[Any, Any] | None

Value to return if key is missing.

None

side

QueueSideLiteral | QueueSide

Either 'back' or 'front'.

FRONT

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
NoReturn

Container with cached value or default if queue is empty.

Examples:

import typed_diskcache


def main() -> None:
    cache = typed_diskcache.Cache()
    print(cache.pull())
    # Container(default=True, expire_time=None, tags=None)
    for letter in "abc":
        print(cache.push(letter))
    # 500000000000000
    # 500000000000001
    # 500000000000002
    container = cache.pull()
    print(container.key)
    # 500000000000000
    print(container.value)
    # a
    container = cache.pull(side="back")
    print(container.value)
    # c
    print(cache.push(1234, prefix="userids"))
    # userids-500000000000000
    container = cache.pull(prefix="userids")
    print(container.value)
    # 1234
Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def pull(
    self,
    *,
    prefix: str | None = None,
    default: tuple[Any, Any] | None = None,
    side: QueueSideLiteral | QueueSide = QueueSide.FRONT,
    retry: bool = False,
) -> NoReturn:
    raise te.TypedDiskcacheNotImplementedError(
        "FanoutCache does not support pull operation"
    )

apull async

apull(
    *,
    prefix: str | None = None,
    default: tuple[Any, Any] | None = None,
    side: QueueSideLiteral | QueueSide = FRONT,
    retry: bool = False
) -> NoReturn

Async pull key and value item pair from side of queue in cache.

When prefix is None, integer keys are used. Otherwise, string keys are used.

If queue is empty, return default.

Operation is atomic. Concurrent operations will be serialized.

See also apush and aget.

Parameters:

Name Type Description Default

prefix

str | None

Key prefix. If None, key is integer.

None

default

tuple[Any, Any] | None

Value to return if key is missing.

None

side

QueueSideLiteral | QueueSide

Either 'back' or 'front'.

FRONT

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
NoReturn

Container with cached value or default if queue is empty.

Examples:

import typed_diskcache


async def main() -> None:
    cache = typed_diskcache.Cache()
    print(await cache.apull())
    # Container(default=True, expire_time=None, tags=None)
    for letter in "abc":
        print(await cache.apush(letter))
    # 500000000000000
    # 500000000000001
    # 500000000000002
    container = await cache.apull()
    print(container.key)
    # 500000000000000
    print(container.value)
    # a
    container = await cache.apull(side="back")
    print(container.value)
    # c
    print(await cache.apush(1234, prefix="userids"))
    # userids-500000000000000
    container = await cache.apull(prefix="userids")
    print(container.value)
    # 1234
Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def apull(
    self,
    *,
    prefix: str | None = None,
    default: tuple[Any, Any] | None = None,
    side: QueueSideLiteral | QueueSide = QueueSide.FRONT,
    retry: bool = False,
) -> NoReturn:
    raise te.TypedDiskcacheNotImplementedError(
        "FanoutCache does not support pull operation"
    )

peek

peek(
    *,
    prefix: str | None = None,
    default: tuple[Any, Any] | None = None,
    side: QueueSideLiteral | QueueSide = BACK,
    retry: bool = False
) -> NoReturn

Peek at key and value item pair from side of queue in cache.

When prefix is None, integer keys are used. Otherwise, string keys are used.

If queue is empty, return default.

Expired items are deleted from cache. Operation is atomic. Concurrent operations will be serialized.

See also pull and push.

Parameters:

Name Type Description Default

prefix

str | None

Key prefix. If None, key is integer.

None

default

tuple[Any, Any] | None

Value to return if key is missing.

None

side

QueueSideLiteral | QueueSide

Either 'back' or 'front'.

BACK

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
NoReturn

Container with cached value or default if queue is empty.

Examples:

import typed_diskcache


def main() -> None:
    cache = typed_diskcache.Cache()
    for letter in "abc":
        print(cache.push(letter))
    # 500000000000000
    # 500000000000001
    # 500000000000002
    container = cache.peek()
    print(container.key)
    # 500000000000002
    print(container.value)
    # c
    container = cache.peek(side="front")
    print(container.key)
    # 500000000000000
    print(container.value)
    # a
Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def peek(
    self,
    *,
    prefix: str | None = None,
    default: tuple[Any, Any] | None = None,
    side: QueueSideLiteral | QueueSide = QueueSide.BACK,
    retry: bool = False,
) -> NoReturn:
    raise te.TypedDiskcacheNotImplementedError(
        "FanoutCache does not support peek operation"
    )

apeek async

apeek(
    *,
    prefix: str | None = None,
    default: tuple[Any, Any] | None = None,
    side: QueueSideLiteral | QueueSide = BACK,
    retry: bool = False
) -> NoReturn

Async peek at key and value item pair from side of queue in cache.

When prefix is None, integer keys are used. Otherwise, string keys are used.

If queue is empty, return default.

Expired items are deleted from cache. Operation is atomic. Concurrent operations will be serialized.

See also apull and apush.

Parameters:

Name Type Description Default

prefix

str | None

Key prefix. If None, key is integer.

None

default

tuple[Any, Any] | None

Value to return if key is missing.

None

side

QueueSideLiteral | QueueSide

Either 'back' or 'front'.

BACK

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
NoReturn

Container with cached value or default if queue is empty.

Examples:

import typed_diskcache


async def main() -> None:
    cache = typed_diskcache.Cache()
    for letter in "abc":
        print(await cache.apush(letter))
    # 500000000000000
    # 500000000000001
    # 500000000000002
    container = await cache.apeek()
    print(container.key)
    # 500000000000002
    print(container.value)
    # c
    container = await cache.apeek(side="front")
    print(container.key)
    # 500000000000000
    print(container.value)
    # a
Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def apeek(
    self,
    *,
    prefix: str | None = None,
    default: tuple[Any, Any] | None = None,
    side: QueueSideLiteral | QueueSide = QueueSide.BACK,
    retry: bool = False,
) -> NoReturn:
    raise te.TypedDiskcacheNotImplementedError(
        "FanoutCache does not support peek operation"
    )

peekitem

peekitem(
    *, last: bool = True, retry: bool = False
) -> NoReturn

Peek at key and value item pair in cache based on iteration order.

Expired items are deleted from cache. Operation is atomic. Concurrent operations will be serialized.

Parameters:

Name Type Description Default

last

bool

Last item in iteration order.

True

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
NoReturn

Container with cached value.

Examples:

import typed_diskcache


def main() -> None:
    cache = typed_diskcache.Cache()
    for num, letter in enumerate("abc"):
        cache[letter] = num
    container = cache.peekitem()
    print(container.key, container.value)
    # ('c', 2)
    container = cache.peekitem(last=False)
    print(container.key, container.value)
    # ('a', 0)
Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def peekitem(self, *, last: bool = True, retry: bool = False) -> NoReturn:
    raise te.TypedDiskcacheNotImplementedError(
        "FanoutCache does not support peekitem operation"
    )

apeekitem async

apeekitem(
    *, last: bool = True, retry: bool = False
) -> NoReturn

Async peek at key and value item pair in cache based on iteration order.

Expired items are deleted from cache. Operation is atomic. Concurrent operations will be serialized.

Parameters:

Name Type Description Default

last

bool

Last item in iteration order.

True

retry

bool

Retry if database timeout occurs.

False

Returns:

Type Description
NoReturn

Container with cached value.

Examples:

import typed_diskcache


async def main() -> None:
    cache = typed_diskcache.Cache()
    for num, letter in enumerate("abc"):
        cache[letter] = num
    container = await cache.apeekitem()
    print(container.key, container.value)
    # ('c', 2)
    container = await cache.apeekitem(last=False)
    print(container.key, container.value)
    # ('a', 0)
Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def apeekitem(self, *, last: bool = True, retry: bool = False) -> NoReturn:
    raise te.TypedDiskcacheNotImplementedError(
        "FanoutCache does not support peekitem operation"
    )

update_settings

update_settings(
    settings: Settings | SettingsKwargs | None = None,
    **kwargs: Unpack[SettingsKwargs]
) -> None

Update cache settings.

Parameters:

Name Type Description Default

settings

Settings | SettingsKwargs | None

New settings.

None

**kwargs

Unpack[SettingsKwargs]

Additional settings.

{}
Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
def update_settings(
    self,
    settings: Settings | SettingsKwargs | None = None,
    **kwargs: Unpack[SettingsKwargs],
) -> None:
    settings = cache_utils.combine_settings(settings, kwargs)
    for shard in self._shards:
        shard.update_settings(settings)
    self._cache.update_settings(settings)

aupdate_settings async

aupdate_settings(
    settings: Settings | SettingsKwargs | None = None,
    **kwargs: Unpack[SettingsKwargs]
) -> None

Async update cache settings.

Asynchronous version of update_settings.

Parameters:

Name Type Description Default

settings

Settings | SettingsKwargs | None

New settings.

None

**kwargs

Unpack[SettingsKwargs]

Additional settings.

{}
Source code in src/typed_diskcache/implement/cache/fanout/main.py
@override
async def aupdate_settings(
    self,
    settings: Settings | SettingsKwargs | None = None,
    **kwargs: Unpack[SettingsKwargs],
) -> None:
    validate_installed("anyio", "Consider installing extra `asyncio`.")
    import anyio  # noqa: PLC0415

    settings = cache_utils.combine_settings(settings, kwargs)
    async with anyio.create_task_group() as task_group:
        for shard in self._shards:
            task_group.start_soon(shard.aupdate_settings, settings)
        task_group.start_soon(self._cache.aupdate_settings, settings)