Skip to content

Store

elva.store

Module holding store components.

Classes:

  • SQLiteStore

    Store component saving Y updates in an ELVA SQLite database.

SQLiteStore(ydoc, identifier, path)

Bases: Component

Store component saving Y updates in an ELVA SQLite database.

Parameters:

  • ydoc (Doc) –

    instance of the synchronized Y Document.

  • identifier (str) –

    identifier of the synchronized Y Document.

  • path (str) –

    path where to store the SQLite database.

Methods:

  • get_metadata

    Retrieve metadata from a given ELVA SQLite database.

  • set_metadata

    Set given metadata in a given ELVA SQLite database.

  • callback

    Hook called on changes in ydoc.

  • before

    Hook executed before the component sets its started signal.

  • run

    Hook writing data to the ELVA SQLite database.

  • cleanup

    Hook cancelling subscription to changes and closing the database.

  • read

    Hook to read in metadata and updates from the ELVA SQLite database and apply them.

  • write

    Queue update to be written to the ELVA SQLite database.

Attributes:

  • subscription (Subscription) –

    Object holding subscription information to changes in ydoc.

  • ydoc (Doc) –

    Instance of the synchronized Y Document.

  • identifier (str) –

    Identifier of the synchronized Y Document.

  • path (Path) –

    Path where to store the SQLite database.

  • initialized (Event) –

    Event being set when the SQLite database is ready to be read.

  • lock (Lock) –

    Object for restricted resource management.

Source code in src/elva/store.py
def __init__(self, ydoc: Doc, identifier: str, path: str):
    """
    Arguments:
        ydoc: instance of the synchronized Y Document.
        identifier: identifier of the synchronized Y Document.
        path: path where to store the SQLite database.
    """
    self.ydoc = ydoc
    self.identifier = identifier
    self.path = Path(path)
    self.initialized = None
    self.lock = Lock()

subscription instance-attribute

Object holding subscription information to changes in ydoc.

ydoc = ydoc instance-attribute

Instance of the synchronized Y Document.

identifier = identifier instance-attribute

Identifier of the synchronized Y Document.

path = Path(path) instance-attribute

Path where to store the SQLite database.

initialized = None instance-attribute

Event being set when the SQLite database is ready to be read.

lock = Lock() instance-attribute

Object for restricted resource management.

get_metadata(path) staticmethod

Retrieve metadata from a given ELVA SQLite database.

Parameters:

  • path (str) –

    path to the ELVA SQLite database.

Returns:

  • dict

    mapping of metadata keys to values.

Source code in src/elva/store.py
@staticmethod
def get_metadata(path: str) -> dict:
    """
    Retrieve metadata from a given ELVA SQLite database.

    Arguments:
        path: path to the ELVA SQLite database.

    Returns:
        mapping of metadata keys to values.
    """
    db = sqlite3.connect(path)
    cur = db.cursor()
    try:
        res = cur.execute("SELECT * FROM metadata")
    except Exception:
        res = dict()
    else:
        res = dict(res.fetchall())
    finally:
        db.close()

    return res

set_metadata(path, metadata) staticmethod

Set given metadata in a given ELVA SQLite database.

Parameters:

  • path (str) –

    path to the ELVA SQLite database.

  • metadata (dict[str, str]) –

    mapping of metadata keys to values.

Source code in src/elva/store.py
@staticmethod
def set_metadata(path: str, metadata: dict[str, str]):
    """
    Set given metadata in a given ELVA SQLite database.

    Arguments:
        path: path to the ELVA SQLite database.
        metadata: mapping of metadata keys to values.
    """
    db = sqlite3.connect(path)
    cur = db.cursor()
    try:
        try:
            cur.executemany(
                "INSERT INTO metadata VALUES (?, ?)",
                list(metadata.items()),
            )
        except Exception:  # IntegrityError, UNIQUE constraint failed
            cur.executemany(
                "UPDATE metadata SET value = ? WHERE key = ?",
                list(zip(metadata.values(), metadata.keys())),
            )
        db.commit()
    except Exception:
        db.close()
        raise
    else:
        db.close()

callback(event)

Hook called on changes in ydoc.

When called, the event data are written to the ELVA SQLite database.

Parameters:

  • event (TransactionEvent) –

    object holding event information of changes in ydoc.

Source code in src/elva/store.py
def callback(self, event: TransactionEvent):
    """
    Hook called on changes in [`ydoc`][elva.store.SQLiteStore.ydoc].

    When called, the `event` data are written to the ELVA SQLite database.

    Arguments:
        event: object holding event information of changes in [`ydoc`][elva.store.SQLiteStore.ydoc].
    """
    self._task_group.start_soon(self.write, event.update)

before() async

Hook executed before the component sets its started signal.

The ELVA SQLite database is being initialized and read. Also, the component subscribes to changes in ydoc.

Source code in src/elva/store.py
async def before(self):
    """
    Hook executed before the component sets its [`started`][elva.component.Component.started] signal.

    The ELVA SQLite database is being initialized and read.
    Also, the component subscribes to changes in [`ydoc`][elva.store.SQLiteStore.ydoc].
    """
    await self._init_db()
    await self.read()
    self.subscription = self.ydoc.observe(self.callback)

run() async

Hook writing data to the ELVA SQLite database.

Source code in src/elva/store.py
async def run(self):
    """
    Hook writing data to the ELVA SQLite database.
    """
    self.stream_send, self.stream_recv = create_memory_object_stream(
        max_buffer_size=65543
    )
    async with self.stream_send, self.stream_recv:
        async for data in self.stream_recv:
            await self._write(data)

cleanup() async

Hook cancelling subscription to changes and closing the database.

Source code in src/elva/store.py
async def cleanup(self):
    """
    Hook cancelling subscription to changes and closing the database.
    """
    self.ydoc.unobserve(self.subscription)
    if self.initialized.is_set():
        await self.db.close()
        self.log.debug("closed database")

read() async

Hook to read in metadata and updates from the ELVA SQLite database and apply them.

Source code in src/elva/store.py
async def read(self):
    """
    Hook to read in metadata and updates from the ELVA SQLite database and apply them.
    """
    await self._wait_running()

    async with self.lock:
        await self.cursor.execute("SELECT * FROM metadata")
        self.metadata = dict(await self.cursor.fetchall())
        self.log.debug("read metadata from file")

        await self.cursor.execute("SELECT yupdate FROM yupdates")
        self.log.debug("read updates from file")
        for update, *rest in await self.cursor.fetchall():
            self.ydoc.apply_update(update)
        self.log.debug("applied updates to YDoc")

write(data) async

Queue update to be written to the ELVA SQLite database.

Parameters:

  • data (bytes) –

    update to queue for writing to disk.

Source code in src/elva/store.py
async def write(self, data: bytes):
    """
    Queue `update` to be written to the ELVA SQLite database.

    Arguments:
        data: update to queue for writing to disk.
    """
    await self.stream_send.send(data)