Skip to content

Component

elva.component

Module for a generic asynchronous app component.

Classes:

  • Component

    Generic asynchronous app component class.

Functions:

Attributes:

ComponentState = create_component_state('ComponentState') module-attribute

The default component states.

Component

Generic asynchronous app component class.

This class features graceful shutdown alongside annotated logging. It is used for writing providers, stores, parsers, renderers etc.

It supports explicit handling via the start and stop method as well as the asynchronous context manager protocol.

Methods:

  • subscribe

    Get an object to listen on for differences in component state.

  • unsubscribe

    Close and remove the memory object stream from the mapping of subscribers.

  • close

    Run unsubscribe from all subscriptions.

  • __del__

    Destructor callback.

  • __aenter__

    Asynchronous context manager enter callback.

  • __aexit__

    Asynchronous context manager exit callback.

  • start

    Start the component.

  • stop

    Stop the component by cancelling all inner task groups.

  • before

    Hook to run before the component signals that is running.

  • run

    Hook to run after the component set the RUNNING state.

  • cleanup

    Hook to run after the component's stop method

Attributes:

log instance-attribute

Logger instance to write logging messages to.

_subscribers instance-attribute

Mapping of receiving streams to their respective sending stream over which to publish state changes.

states property

Enumeration class holding all states the component can have.

state property

The current state of the component.

subscribe()

Get an object to listen on for differences in component state.

Returns:

  • MemoryObjectReceiveStream

    the receiving end of an asynchronous memory object stream emitting tuple of deleted and added states.

Source code in src/elva/component.py
def subscribe(self) -> MemoryObjectReceiveStream:
    """
    Get an object to listen on for differences in component state.

    Returns:
        the receiving end of an asynchronous memory object stream emitting
            tuple of deleted and added states.
    """
    # create a stream with a defined maximum buffer size,
    # otherwise - with default of max_buffer_size=0 - sending would block
    send, recv = create_memory_object_stream[tuple[Flag, Flag]](
        max_buffer_size=8192
    )

    # set the receiving end as key so that it can easily be unsubscribed
    self._subscribers[recv] = send
    self.log.info(f"added subscriber {id(recv)}")

    return recv

unsubscribe(recv)

Close and remove the memory object stream from the mapping of subscribers.

Parameters:

Source code in src/elva/component.py
def unsubscribe(self, recv: MemoryObjectReceiveStream):
    """
    Close and remove the memory object stream from the mapping of subscribers.

    Arguments:
        recv: the receiving end of the memory object stream as returned by
            [`subscribe`][elva.component.Component.subscribe].
    """
    send = self._subscribers.pop(recv)
    send.close()
    self.log.info(f"removed subscriber {id(recv)}")

_change_state(from_state, to_state)

Replace a state with another state within the current component state.

The special state NONE can be used as an identity, i.e. no-op, flag.

Parameters:

  • from_state (Flag) –

    the state to remove.

  • to_state (Flag) –

    the state to insert.

Source code in src/elva/component.py
def _change_state(self, from_state: Flag, to_state: Flag):
    """
    Replace a state with another state within the current component [`state`][elva.component.Component.state].

    The special state `NONE` can be used as an identity, i.e. no-op, flag.

    Arguments:
        from_state: the state to remove.
        to_state: the state to insert.
    """
    # no change in state
    if from_state == to_state:
        return

    # remove `from_state`, add `to_state`
    state = self.state & ~from_state | to_state

    # set the state from the component's states
    self._state = state
    self.log.info(f"set state to {state}")

    if from_state != self.states.NONE:
        self.log.info(f"removed state {from_state}")

    if to_state != self.states.NONE:
        self.log.info(f"added state {to_state}")

    # copy to avoid exceptions due to set changes during iteration
    subs = self._subscribers.copy()

    # send the state diff to the subscribers
    for recv, send in subs.items():
        try:
            send.send_nowait((from_state, to_state))
            self.log.debug(f"sent state change to subscriber {id(recv)}")
        except (BrokenResourceError, WouldBlock):
            # either the send stream has a respective closed receive stream
            # or the stream buffer is full, so it is not in use either way
            # and we unsubscribe ourselves
            self.unsubscribe(recv)

close()

Run unsubscribe from all subscriptions.

Source code in src/elva/component.py
def close(self):
    """
    Run [`unsubscribe`][elva.component.Component.unsubscribe] from all subscriptions.
    """
    subs = self._subscribers.copy()
    for recv in subs:
        self.unsubscribe(recv)

__del__()

Destructor callback.

It closes all subscriptions before this component gets deleted.

Source code in src/elva/component.py
def __del__(self):
    """
    Destructor callback.

    It closes all subscriptions before this component gets deleted.
    """
    self.close()

_get_start_lock()

Get a starting lock to enter the task group exclusively.

Source code in src/elva/component.py
def _get_start_lock(self) -> Lock:
    """
    Get a starting lock to enter the task group exclusively.
    """
    if self._start_lock is None:
        self._start_lock = Lock()
    return self._start_lock

__aenter__() async

Asynchronous context manager enter callback.

It starts the _run coroutine in a task group.

Source code in src/elva/component.py
async def __aenter__(self) -> Self:
    """
    Asynchronous context manager enter callback.

    It starts the [`_run`][elva.component.Component._run] coroutine
    in a task group.
    """
    if self._task_group is not None:
        raise RuntimeError(f"{self} already active")

    async with self._get_start_lock():
        # enter the asynchronous context and start the runner in it
        self._exit_stack = AsyncExitStack()
        await self._exit_stack.__aenter__()
        self._task_group = await self._exit_stack.enter_async_context(
            create_task_group()
        )

        self.log.info("starting")

        # add `ACTIVE` state
        self._change_state(self.states.NONE, self.states.ACTIVE)

        # start the main coroutine
        await self._task_group.start(self._run)

    return self

__aexit__(exc_type, exc_value, exc_tb) async

Asynchronous context manager exit callback.

It stops this component by cancelling the inner task group scope.

Parameters:

  • exc_type (None | Exception) –

    the type of the exception causing the exit.

  • exc_value (None | str) –

    the value of the exception causing the exit.

  • exc_tb (None | TracebackType) –

    the traceback of the exception causing the exit.

Returns:

  • Awaitable

    an awaitable from the exit stacks own exit callback.

Source code in src/elva/component.py
async def __aexit__(
    self,
    exc_type: None | Exception,
    exc_value: None | str,
    exc_tb: None | TracebackType,
) -> Awaitable:
    """
    Asynchronous context manager exit callback.

    It stops this component by cancelling the inner task group scope.

    Arguments:
        exc_type: the type of the exception causing the exit.
        exc_value: the value of the exception causing the exit.
        exc_tb: the traceback of the exception causing the exit.

    Returns:
        an awaitable from the exit stacks own exit callback.
    """
    await self.stop()
    return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb)

_run(task_status=TASK_STATUS_IGNORED) async

Hook handling the run method gracefully.

Raises:

Parameters:

  • task_status (TaskStatus[None], default: TASK_STATUS_IGNORED ) –

    an optional task status object to call started on when the task is considered to have started.

Source code in src/elva/component.py
async def _run(self, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
    """
    Hook handling the [`run`][elva.component.Component.run] method gracefully.

    Raises:
        get_cancelled_exc_class: the exception from cancellation.

    Arguments:
        task_status: an optional task status object to call
            [`started`][anyio.abc.TaskStatus.started] on when the task
            is considered to have started.
    """
    # start runner and do a shielded cleanup on cancellation
    try:
        await self.before()

        # signal that the setup has finished
        task_status.started()
        self.log.info("started")

        # add `RUNNING` state
        self._change_state(self.states.NONE, self.states.RUNNING)

        await self.run()

        # keep the task running when `self.run()` has finished
        # so the cancellation exception can be always caught
        await sleep_forever()
    except get_cancelled_exc_class():
        self.log.info("stopping")
        with CancelScope(shield=True):
            await self.cleanup()

        self._task_group = None
        self.log.info("stopped")

        # change from current state to `NONE`
        self._change_state(self.state, self.states.NONE)

        # always re-raise a captured cancellation exception,
        # otherwise the behavior is undefined
        raise

start(task_status=TASK_STATUS_IGNORED) async

Start the component.

Parameters:

  • task_status (TaskStatus[None], default: TASK_STATUS_IGNORED ) –

    an optional task status object to call started on when the task is considered to have started.

Source code in src/elva/component.py
async def start(self, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
    """
    Start the component.

    Arguments:
        task_status: an optional task status object to call
            [`started`][anyio.abc.TaskStatus.started] on when the task
            is considered to have started.
    """
    if self._task_group is not None:
        raise RuntimeError(f"{self} already active")

    async with self._get_start_lock():
        async with create_task_group() as self._task_group:
            self.log.info("starting")

            # add `ACTIVE` state
            self._change_state(self.states.NONE, self.states.ACTIVE)

            # start the main coroutine
            await self._task_group.start(self._run)

            # signal that the coroutine has started
            task_status.started()

stop() async

Stop the component by cancelling all inner task groups.

Source code in src/elva/component.py
async def stop(self):
    """
    Stop the component by cancelling all inner task groups.
    """
    if self._task_group is None:
        raise RuntimeError(f"{self} not active")

    self._task_group.cancel_scope.cancel()
    self.log.debug("cancelled")

before() async

Hook to run before the component signals that is running.

In here, one would define initializing steps necessary for the component to run. This method must return, otherwise the component will not set the RUNNING state.

It is defined as a no-op and supposed to be implemented in the inheriting class.

Source code in src/elva/component.py
async def before(self):
    """
    Hook to run before the component signals that is running.

    In here, one would define initializing steps necessary for the component to run.
    This method must return, otherwise the component will not set the
    `RUNNING` state.

    It is defined as a no-op and supposed to be implemented in the inheriting class.
    """
    ...

run() async

Hook to run after the component set the RUNNING state.

In here, one would define the main functionality of the component. This method may run indefinitely or return. The component is kept running regardless.

It is defined as a no-op and supposed to be implemented in the inheriting class.

Source code in src/elva/component.py
async def run(self):
    """
    Hook to run after the component set the `RUNNING` state.

    In here, one would define the main functionality of the component.
    This method may run indefinitely or return.
    The component is kept running regardless.

    It is defined as a no-op and supposed to be implemented in the inheriting class.
    """
    ...

cleanup() async

Hook to run after the component's stop method has been called and before it sets its state to NONE.

In here, one would define cleanup tasks such as closing connections. This method must return, otherwise the component will not stop.

It is defined as a no-op and supposed to be implemented in the inheriting class.

Source code in src/elva/component.py
async def cleanup(self):
    """
    Hook to run after the component's [`stop`][elva.component.Component.stop] method
    has been called and before it sets its state to `NONE`.

    In here, one would define cleanup tasks such as closing connections.
    This method must return, otherwise the component will not stop.

    It is defined as a no-op and supposed to be implemented in the inheriting class.
    """
    ...

create_component_state(name, additional_states=None)

Create a Flag enumeration with the default flags NONE and RUNNING for Components.

Parameters:

  • name (str) –

    the states class name.

  • additional_states (None | Iterable[str], default: None ) –

    states to include next to the default ones.

Returns:

  • Flag

    component states as flag enumeration.

Source code in src/elva/component.py
def create_component_state(
    name: str, additional_states: None | Iterable[str] = None
) -> Flag:
    """
    Create a [`Flag`][enum.Flag] enumeration with the default flags `NONE` and `RUNNING` for [`Component`s][elva.component.Component].

    Arguments:
        name: the states class name.
        additional_states: states to include next to the default ones.

    Returns:
        component states as flag enumeration.
    """
    states = ("NONE", "ACTIVE", "RUNNING")

    if additional_states is not None:
        states += tuple(additional_states)

    return Flag(name, states, start=0)