Skip to content

Semaphore

SyncSemaphore

SyncSemaphore(
    cache: CacheProtocol,
    key: Any,
    value: int = 1,
    *,
    timeout: float = DEFAULT_LOCK_TIMEOUT,
    expire: float | None = None,
    tags: str | Iterable[str] | None = None
)

Bases: SyncSemaphoreProtocol

Synchronous semaphore implementation using spin-lock algorithm.

Assumes the key will not be evicted. Set the eviction policy to 'none' on the cache to guarantee the key is not evicted.

Parameters:

Name Type Description Default

cache

CacheProtocol

Cache to use for semaphore.

required

key

Any

Key for semaphore.

required

value

int

Value for semaphore.

1

timeout

float

Timeout for semaphore.

DEFAULT_LOCK_TIMEOUT

expire

float | None

Expiration time for semaphore.

None

tags

str | Iterable[str] | None

Tags for semaphore.

None

Examples:

import typed_diskcache


def main() -> None:
    cache = typed_diskcache.Cache()
    semaphore = typed_diskcache.SyncSemaphore(cache, "some-key", value=2)
    semaphore.acquire()
    semaphore.acquire()
    semaphore.release()
    with semaphore:
        pass
Source code in src/typed_diskcache/implement/sync/semaphore.py
def __init__(  # noqa: PLR0913
    self,
    cache: CacheProtocol,
    key: Any,
    value: int = 1,
    *,
    timeout: float = DEFAULT_LOCK_TIMEOUT,
    expire: float | None = None,
    tags: str | Iterable[str] | None = None,
) -> None:
    self._cache = cache
    self._key = key
    self._value = value
    self._timeout = timeout
    self._expire = expire
    self._tags = frozenset() if tags is None else frozenset(tags)

key property

key: Any

Key for semaphore.

value property

value: int

Value for semaphore.

timeout property

timeout: float

Timeout for semaphore.

expire property

expire: float | None

Expiration time for semaphore.

tags property

tags: frozenset[str]

Tags for semaphore.

acquire

acquire() -> None

Acquire semaphore by decrementing value using spin-lock algorithm.

Source code in src/typed_diskcache/implement/sync/semaphore.py
@context
@override
def acquire(self) -> None:
    start = time.monotonic()
    timeout = 0
    with ExitStack() as stack:
        session = stack.enter_context(self._cache.conn.session())
        sub_stack = stack.enter_context(ExitStack())
        while timeout < self.timeout:
            sub_stack.enter_context(transact(session))
            context = sub_stack.enter_context(
                self._cache.conn.enter_session(session)
            )
            container = context.run(self._cache.get, self.key, default=self._value)
            container_value = validate_semaphore_value(container.value)
            if container_value > 0:
                context.run(
                    self._cache.set,
                    self.key,
                    container_value - 1,
                    expire=self.expire,
                    tags=self.tags,
                )
                return
            sub_stack.close()
            time.sleep(SPIN_LOCK_SLEEP)
            timeout = time.monotonic() - start

    raise te.TypedDiskcacheTimeoutError("lock acquire timeout")

release

release() -> None

Release semaphore by incrementing value.

Source code in src/typed_diskcache/implement/sync/semaphore.py
@context
@override
def release(self) -> None:
    with ExitStack() as stack:
        session = stack.enter_context(self._cache.conn.session())
        stack.enter_context(transact(session))
        context = stack.enter_context(self._cache.conn.enter_session(session))
        container = context.run(self._cache.get, self.key, default=self._value)
        container_value = validate_semaphore_value(container.value)
        if self._value <= container_value:
            logger.error(
                "cannot release un-acquired semaphore, value: %d, container: %d",
                self._value,
                container_value,
            )
            raise te.TypedDiskcacheRuntimeError(
                "cannot release un-acquired semaphore"
            )
        context.run(
            self._cache.set,
            self.key,
            container_value + 1,
            expire=self.expire,
            tags=self.tags,
        )

AsyncSemaphore

AsyncSemaphore(
    cache: CacheProtocol,
    key: Any,
    value: int = 1,
    *,
    timeout: float = DEFAULT_LOCK_TIMEOUT,
    expire: float | None = None,
    tags: str | Iterable[str] | None = None
)

Bases: AsyncSemaphoreProtocol

Asynchronous semaphore implementation using spin-lock algorithm.

Warning

If the current Python interpreter is free-threading (without GIL), using AsyncSemaphore may lead to unexpected behavior. Consider using SyncSemaphore instead in such cases.

Assumes the key will not be evicted. Set the eviction policy to 'none' on the cache to guarantee the key is not evicted.

Asynchronous version of SyncSemaphore.

Parameters:

Name Type Description Default

cache

CacheProtocol

Cache to use for semaphore.

required

key

Any

Key for semaphore.

required

value

int

Value for semaphore.

1

timeout

float

Timeout for semaphore.

DEFAULT_LOCK_TIMEOUT

expire

float | None

Expiration time for semaphore.

None

tags

str | Iterable[str] | None

Tags for semaphore.

None

Warns:

Type Description
RuntimeWarning

If the current Python interpreter is free-threading (without GIL), using AsyncSemaphore may lead to unexpected behavior. Consider using SyncSemaphore instead in such cases.

Examples:

import typed_diskcache


async def main() -> None:
        cache = typed_diskcache.Cache()
        semaphore = typed_diskcache.AsyncSemaphore(cache, "some-key", value=2)
        await semaphore.acquire()
        await semaphore.acquire()
        await semaphore.release()
        async with semaphore:
            pass
Source code in src/typed_diskcache/implement/sync/semaphore.py
def __init__(  # noqa: PLR0913
    self,
    cache: CacheProtocol,
    key: Any,
    value: int = 1,
    *,
    timeout: float = DEFAULT_LOCK_TIMEOUT,
    expire: float | None = None,
    tags: str | Iterable[str] | None = None,
) -> None:
    if IS_FREE_THREAD:
        message = (
            "The current Python interpreter is free-threading (without GIL). "
            "However, AsyncSemaphore does not support free-threading mode. "
            "Consider using SyncLock instead."
        )
        warnings.warn(message, RuntimeWarning, stacklevel=2)

    self._cache = cache
    self._key = key
    self._value = value
    self._timeout = timeout
    self._expire = expire
    self._tags = frozenset() if tags is None else frozenset(tags)

key property

key: Any

Key for semaphore.

value property

value: int

Value for semaphore.

timeout property

timeout: float

Timeout for semaphore.

expire property

expire: float | None

Expiration time for semaphore.

tags property

tags: frozenset[str]

Tags for semaphore.

acquire async

acquire() -> None

Acquire semaphore by decrementing value using spin-lock algorithm.

Source code in src/typed_diskcache/implement/sync/semaphore.py
@context
@override
async def acquire(self) -> None:
    validate_installed("anyio", "Consider installing extra `asyncio`.")
    import anyio  # noqa: PLC0415

    try:
        async with AsyncExitStack() as stack:
            stack.enter_context(anyio.fail_after(self.timeout))
            session = await stack.enter_async_context(self._cache.conn.asession())
            sub_stack = await stack.enter_async_context(AsyncExitStack())
            while True:
                await sub_stack.enter_async_context(transact(session))
                context = stack.enter_context(
                    self._cache.conn.enter_session(session)
                )
                container = await context.run(
                    self._cache.aget, self.key, default=self._value
                )
                container_value = validate_semaphore_value(container.value)
                if container_value > 0:
                    await context.run(
                        self._cache.aset,
                        self.key,
                        container_value - 1,
                        expire=self.expire,
                        tags=self.tags,
                    )
                    return
                await sub_stack.aclose()
                await anyio.sleep(SPIN_LOCK_SLEEP)
    except TimeoutError as exc:
        raise te.TypedDiskcacheTimeoutError("lock acquire timeout") from exc

release async

release() -> None

Release semaphore by incrementing value.

Source code in src/typed_diskcache/implement/sync/semaphore.py
@context
@override
async def release(self) -> None:
    async with AsyncExitStack() as stack:
        session = await stack.enter_async_context(self._cache.conn.asession())
        await stack.enter_async_context(transact(session))
        context = stack.enter_context(self._cache.conn.enter_session(session))
        container = await context.run(
            self._cache.aget, self.key, default=self._value
        )
        container_value = validate_semaphore_value(container.value)
        if self._value <= container_value:
            logger.error(
                "cannot release un-acquired semaphore, value: %d, container: %d",
                self._value,
                container_value,
            )
            raise te.TypedDiskcacheRuntimeError(
                "cannot release un-acquired semaphore"
            )
        await context.run(
            self._cache.aset,
            self.key,
            container_value + 1,
            expire=self.expire,
            tags=self.tags,
        )