Skip to content

servomexlib

Top-level re-exports. This page documents whatever is currently exported.

servomexlib

servomexlib — async Python driver for Servomex SERVOPRO 4000-series gas analysers.

One protocol-neutral, channel-oriented API decodes the analyser's three communication modes — Continuous ASCII (unsolicited broadcast), Modbus RTU, and Modbus ASCII — into a single set of typed models.

The public API is semantic and protocol-neutral: a caller asks for poll(), read_channel("I1"), snapshot(), identify(), stream(), start_calibration(group) and the session dispatches the right per-protocol client selected (or sniffed via AUTO) at open time.

The core is async (built on anyio); a thin sync facade lives at :mod:servomexlib.sync for scripts, notebooks, and REPL use.

servomexlib is a member of the *lib instrument-driver family; family harmony is defined at the boundary (entry point, frozen models, error hierarchy, streaming/sinks/sync/CLI conventions, tooling).

This module re-exports the public surface.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at=None,
    samples_emitted=0,
    samples_late=0,
    max_drift_ms=0.0,
    tick_duration_ms_p50=0.0,
    tick_duration_ms_p99=0.0,
    disconnects=0,
)

Per-run summary, owned and mutated by the recorder (sole writer).

Counters update in place during the run so progress-polling consumers see live values; consumers treat it as read-only. finished_at and the percentile fields are populated on context-manager exit.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime | None

Wall-clock at producer shutdown, or None while running.

samples_emitted int

Per-tick batches pushed onto the stream (a tick whose reads all failed still counts as one emitted batch).

samples_late int

Ticks that missed their slot (overrun, overflow drop, or a reconnect gap).

max_drift_ms float

Largest positive drift of an emitted batch from its target.

tick_duration_ms_p50 float

Median poll_samples duration (set on exit).

tick_duration_ms_p99 float

99th-percentile poll_samples duration (on exit).

disconnects int

ServomexConnectionError events absorbed under auto_reconnect; 0 when it was off.

AnalyserStatus dataclass

AnalyserStatus(fault, maintenance, cal_groups, clock)

Analyser-level status.

Analyzer

Analyzer(client, *, device='', identify_on_enter=False)

Protocol-neutral async-context-manager facade over one analyser.

Source code in src/servomexlib/devices/analyzer.py
def __init__(
    self,
    client: ProtocolClient,
    *,
    device: str = "",
    identify_on_enter: bool = False,
) -> None:
    self._client = client
    self._device = device
    self._session = Session(client)
    self._identify_on_enter = identify_on_enter
    self._info: DeviceInfo | None = None

capabilities property

capabilities

The active client's capability set.

dropped_frames property

dropped_frames

Count of frames dropped for parse/checksum failures (continuous resync).

info property

info

The cached :class:DeviceInfo, if identify has run.

protocol property

protocol

The active wire protocol.

analyser_status async

analyser_status(*, timeout=None)

Return the analyser-level status.

Source code in src/servomexlib/devices/analyzer.py
async def analyser_status(self, *, timeout: float | None = None) -> AnalyserStatus:
    """Return the analyser-level status."""
    return await self._session.analyser_status(timeout=timeout)

calibration_status async

calibration_status(group=1, *, timeout=None)

Return autocalibration progress for group (Modbus only).

Source code in src/servomexlib/devices/analyzer.py
async def calibration_status(
    self, group: int = 1, *, timeout: float | None = None
) -> CalibrationProgress:
    """Return autocalibration progress for ``group`` (Modbus only)."""
    return await self._session.calibration_status(group, timeout=timeout)

identify async

identify(*, timeout=None)

Return device identity, caching it on first call.

Source code in src/servomexlib/devices/analyzer.py
async def identify(self, *, timeout: float | None = None) -> DeviceInfo:
    """Return device identity, caching it on first call."""
    self._info = await self._session.identify(timeout=timeout)
    return self._info

poll async

poll(*, wait_fresh=False, timeout=None)

Return one frame — all channels + analyser status, one tick.

wait_fresh=True waits for the next continuous broadcast instead of returning the cached latest; it is a no-op for the polled Modbus path.

Source code in src/servomexlib/devices/analyzer.py
async def poll(self, *, wait_fresh: bool = False, timeout: float | None = None) -> Frame:
    """Return one frame — all channels + analyser status, one tick.

    ``wait_fresh=True`` waits for the *next* continuous broadcast instead of
    returning the cached latest; it is a no-op for the polled Modbus path.
    """
    return await self._session.read_frame(wait_fresh=wait_fresh, timeout=timeout)

poll_samples async

poll_samples(*, names=None, timeout=None)

Poll one frame and fan it out into long-format samples (PollSource).

Makes the analyser a :class:~servomexlib.streaming.poll_source.PollSource the recorder can drive. names is ignored for a solo analyser (it is the manager's device-subset selector).

Source code in src/servomexlib/devices/analyzer.py
async def poll_samples(
    self, *, names: Sequence[str] | None = None, timeout: float | None = None
) -> list[Sample]:
    """Poll one frame and fan it out into long-format samples (``PollSource``).

    Makes the analyser a :class:`~servomexlib.streaming.poll_source.PollSource`
    the recorder can drive. ``names`` is ignored for a solo analyser (it is the
    manager's device-subset selector).
    """
    del names
    frame = await self._session.read_frame(timeout=timeout)
    return frame.as_samples(device=self._device)

read_all async

read_all(*, timeout=None)

Return every channel's latest reading keyed by id.

Source code in src/servomexlib/devices/analyzer.py
async def read_all(self, *, timeout: float | None = None) -> dict[ChannelId, Reading]:
    """Return every channel's latest reading keyed by id."""
    frame = await self._session.read_frame(timeout=timeout)
    return {reading.channel: reading for reading in frame.readings}

read_channel async

read_channel(channel, *, timeout=None)

Return one channel's latest reading.

Source code in src/servomexlib/devices/analyzer.py
async def read_channel(
    self, channel: ChannelId | str, *, timeout: float | None = None
) -> Reading:
    """Return one channel's latest reading."""
    return await self._session.read_channel(_coerce_channel(channel), timeout=timeout)

snapshot

snapshot()

Return the cached latest frame without any I/O.

Source code in src/servomexlib/devices/analyzer.py
def snapshot(self) -> Frame:
    """Return the cached latest frame without any I/O."""
    return self._session.snapshot()

start_calibration async

start_calibration(group, *, confirm=False, timeout=None)

Start autocalibration for group (Modbus only; requires confirm=True).

Source code in src/servomexlib/devices/analyzer.py
async def start_calibration(
    self, group: int, *, confirm: bool = False, timeout: float | None = None
) -> None:
    """Start autocalibration for ``group`` (Modbus only; requires ``confirm=True``)."""
    await self._session.start_calibration(group, confirm=confirm, timeout=timeout)

status async

status(channel, *, timeout=None)

Return one channel's latest status.

Source code in src/servomexlib/devices/analyzer.py
async def status(
    self, channel: ChannelId | str, *, timeout: float | None = None
) -> ChannelStatus:
    """Return one channel's latest status."""
    return await self._session.status(_coerce_channel(channel), timeout=timeout)

stop_calibration async

stop_calibration(*, confirm=False, timeout=None)

Stop all autocalibration (Modbus only; requires confirm=True).

Source code in src/servomexlib/devices/analyzer.py
async def stop_calibration(
    self, *, confirm: bool = False, timeout: float | None = None
) -> None:
    """Stop all autocalibration (Modbus only; requires ``confirm=True``)."""
    await self._session.stop_calibration(confirm=confirm, timeout=timeout)

stream

stream(*, mode=None, rate_hz=None)

Stream samples — AUTOPRINT (continuous) or POLL (Modbus).

Defaults the mode per protocol. Continuous mode subscribes passively to the unsolicited broadcast (rate_hz is ignored — the analyser sets the cadence). Modbus mode drives the drift-free :func:~servomexlib.streaming.recorder.record loop at rate_hz (default 1.0) inside a task group the returned session owns.

Use it as an async context manager so the poll loop starts and stops with the session::

async with anz.stream(rate_hz=2) as samples:
    async for sample in samples:
        ...
Source code in src/servomexlib/devices/analyzer.py
def stream(
    self, *, mode: StreamMode | None = None, rate_hz: float | None = None
) -> StreamingSession:
    """Stream samples — ``AUTOPRINT`` (continuous) or ``POLL`` (Modbus).

    Defaults the mode per protocol. Continuous mode subscribes passively to the
    unsolicited broadcast (``rate_hz`` is ignored — the analyser sets the
    cadence). Modbus mode drives the drift-free
    :func:`~servomexlib.streaming.recorder.record` loop at ``rate_hz``
    (default ``1.0``) inside a task group the returned session owns.

    Use it as an async context manager so the poll loop starts and stops with
    the session::

        async with anz.stream(rate_hz=2) as samples:
            async for sample in samples:
                ...
    """
    client_stream = getattr(self._client, "stream", None)
    if callable(client_stream):  # continuous: passive broadcast subscribe
        return cast("StreamingSession", client_stream(mode=mode))
    resolved = mode if mode is not None else StreamMode.POLL
    if resolved is not StreamMode.POLL:
        raise ServomexValidationError(
            f"{self.protocol.value} mode only supports POLL streaming, not {resolved.value}",
        )
    return self._poll_stream(rate_hz if rate_hz is not None else 1.0)

Availability

Bases: StrEnum

Whether a capability is known-present, known-absent, or unprobed.

CalGroupState dataclass

CalGroupState(group, calibrating, cal_gas)

One cal-group's autocalibration state.

Decoded from the continuous-frame autocal field (S/C + 1/2 per group) or the Modbus analyser-status cal-group discretes.

CalPhase

Bases: StrEnum

The phase of an autocalibration cycle for one cal-group.

CalibrationProgress dataclass

CalibrationProgress(group, active, phase)

Autocalibration progress for one cal-group.

Capability

Bases: Flag

What a :class:ProtocolClient can do.

A small :class:enum.Flag: the continuous client advertises read/identify only; the Modbus client advertises everything. The session gates each op against the active client's set.

ChannelId

Bases: StrEnum

A channel slot on the analyser.

I* are measured transducers, D* are derived channels, E* are external analogue (mA) inputs. The wire id matches the enum value ("I1").

ChannelInfo dataclass

ChannelInfo(channel, kind, name, unit)

Identity of one populated channel slot (part of :class:DeviceInfo).

ChannelKind

Bases: StrEnum

Classification of a channel slot.

Drives the per-kind status-bit exceptions: external inputs map bit 0 to Invalid rather than Fault; derived channels carry copies of their parent transducer's flags.

ChannelStatus dataclass

ChannelStatus(
    fault, maintenance, calibrating, warming_up, alarms
)

Per-channel status flags.

ok property

ok

True when the channel reports no fault and no active alarm.

DeviceInfo dataclass

DeviceInfo(
    model, channels, protocol, address, serial_settings
)

Identity snapshot produced by Analyzer.identify.

DeviceResult dataclass

DeviceResult(value, error)

Per-device result — value or error, never both.

ok property

ok

True when the device produced a value (error is None).

failure classmethod

failure(error)

Build a failure result wrapping error.

Source code in src/servomexlib/manager.py
@classmethod
def failure(cls, error: ServomexError) -> Self:
    """Build a failure result wrapping ``error``."""
    return cls(value=None, error=error)

success classmethod

success(value)

Build a success result wrapping value.

Source code in src/servomexlib/manager.py
@classmethod
def success(cls, value: T) -> Self:
    """Build a success result wrapping ``value``."""
    return cls(value=value, error=None)

ErrorContext dataclass

ErrorContext(
    port=None,
    protocol=None,
    address=None,
    channel=None,
    register=None,
    function_code=None,
    request=None,
    response=None,
    elapsed_s=None,
    extra=_empty_extra(),
)

Structured context attached to every :class:ServomexError.

Fields are best-effort — missing data is None rather than raising.

extra accepts any Mapping and is always frozen into a read-only :class:types.MappingProxyType at construction so the shared empty sentinel can never be mutated through error.context.extra[k] = v.

merged

merged(**updates)

Return a new context with updates overlaid. Unknown keys go to extra.

Source code in src/servomexlib/errors.py
def merged(self, **updates: Any) -> Self:
    """Return a new context with ``updates`` overlaid. Unknown keys go to ``extra``."""
    known: dict[str, Any] = {}
    extra_updates: dict[str, Any] = {}
    for key, value in updates.items():
        if key in _CONTEXT_KNOWN_FIELDS:
            known[key] = value
        else:
            extra_updates[key] = value

    new_extra: Mapping[str, Any] = (
        MappingProxyType({**self.extra, **extra_updates}) if extra_updates else self.extra
    )
    return replace(self, **known, extra=new_extra)

ErrorPolicy

Bases: Enum

How the manager surfaces per-device failures.

Under :attr:RAISE, the manager collects every result and — if any failed — raises an :class:ExceptionGroup after the task group joins. Under :attr:RETURN, each device yields a :class:DeviceResult to inspect.

FakeTransport

FakeTransport(script=None, *, label='fake://test')

Bases: ByteStreamTransport

Scripted, in-process :class:ByteStreamTransport.

Source code in src/servomexlib/transport/fake.py
def __init__(
    self,
    script: Mapping[bytes, ScriptedReply] | None = None,
    *,
    label: str = "fake://test",
) -> None:
    super().__init__(label=label)
    self._script: dict[bytes, ScriptedReply] = dict(script or {})
    self._writes: list[bytes] = []
    self._inbound = bytearray()
    self._event = anyio.Event()
    self._closed = False

writes property

writes

Every payload written through :meth:send, in order.

add_script

add_script(request, reply)

Register or overwrite the canned reply for request.

Source code in src/servomexlib/transport/fake.py
def add_script(self, request: bytes, reply: ScriptedReply) -> None:
    """Register or overwrite the canned reply for ``request``."""
    self._script[bytes(request)] = reply

emit async

emit(frames, *, interval=0.0)

Feed frames one at a time, optionally interval seconds apart.

Run as a background task to simulate a continuous broadcaster: task_group.start_soon(lambda: fake.emit(frames, interval=0.05)).

Source code in src/servomexlib/transport/fake.py
async def emit(self, frames: Sequence[bytes], *, interval: float = 0.0) -> None:
    """Feed ``frames`` one at a time, optionally ``interval`` seconds apart.

    Run as a background task to simulate a continuous broadcaster:
    ``task_group.start_soon(lambda: fake.emit(frames, interval=0.05))``.
    """
    for frame in frames:
        if interval:
            await anyio.sleep(interval)
        self.feed(frame)

feed

feed(data)

Push unsolicited bytes into the inbound buffer.

Source code in src/servomexlib/transport/fake.py
def feed(self, data: bytes) -> None:
    """Push unsolicited bytes into the inbound buffer."""
    self._inbound += data
    self._wake()

Frame dataclass

Frame(
    readings,
    analyser,
    protocol,
    received_at,
    monotonic_ns,
    raw,
)

One continuous frame, or one Modbus sweep — a timestamped channel set.

as_samples

as_samples(*, device='')

Fan the frame out into one long-format :class:Sample per reading.

Source code in src/servomexlib/devices/models.py
def as_samples(self, *, device: str = "") -> list[Sample]:
    """Fan the frame out into one long-format :class:`Sample` per reading."""
    return [
        Sample(
            device=device,
            channel=reading.channel,
            reading=reading,
            protocol=self.protocol,
            monotonic_ns=self.monotonic_ns,
            received_at=self.received_at,
        )
        for reading in self.readings
    ]

channel

channel(cid)

Return the :class:Reading for cid.

Raises:

Type Description
ServomexValidationError

cid is not present in this frame.

Source code in src/servomexlib/devices/models.py
def channel(self, cid: ChannelId) -> Reading:
    """Return the :class:`Reading` for ``cid``.

    Raises:
        ServomexValidationError: ``cid`` is not present in this frame.
    """
    for reading in self.readings:
        if reading.channel == cid:
            return reading
    raise ServomexValidationError(
        f"channel {getattr(cid, 'value', cid)} not present in frame",
    )

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks the response.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer (default). Silent drops are surprising in acquisition, so the recorder blocks rather than discarding.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the batch about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch and enqueue the newest. For real-time monitoring where the latest reading matters most. Each eviction is late.

PollSource

Bases: Protocol

Minimal shape the recorder needs from its dispatcher.

poll_samples async

poll_samples(*, names=None, timeout=None)

Read this tick's samples across every channel (and managed device).

Parameters:

Name Type Description Default
names Sequence[str] | None

Subset of device names to poll (manager only); None polls everything. A solo analyser ignores this.

None
timeout float | None

Per-poll I/O ceiling, or None for the source default.

None

Returns:

Type Description
Sequence[Sample]

A flat sequence of :class:Sample — empty when every read failed.

Source code in src/servomexlib/streaming/poll_source.py
async def poll_samples(
    self, *, names: Sequence[str] | None = None, timeout: float | None = None
) -> Sequence[Sample]:
    """Read this tick's samples across every channel (and managed device).

    Args:
        names: Subset of device names to poll (manager only); ``None`` polls
            everything. A solo analyser ignores this.
        timeout: Per-poll I/O ceiling, or ``None`` for the source default.

    Returns:
        A flat sequence of :class:`Sample` — empty when every read failed.
    """
    ...

ProtocolKind

Bases: StrEnum

Which communication mode the analyser speaks.

All three modes are mutually exclusive on the wire and selected on the analyser's front panel. AUTO is only valid at open_device call time; by the time a session exists it has resolved to one of the others.

Reading dataclass

Reading(
    channel,
    kind,
    name,
    value,
    unit,
    status,
    protocol,
    received_at,
    monotonic_ns,
    raw,
)

One decoded channel reading.

value is None on over-range / invalid; name is None when the slot is unlabelled (continuous ||||||). name and unit have been routed through the display charset.

__format__

__format__(format_spec)

Delegate non-empty format specs to :attr:value.

f"{r:.4f}" formats the value; f"{r}" prints the dataclass repr. An over-range reading (value is None) formats as "None" for any numeric spec rather than raising.

Source code in src/servomexlib/devices/models.py
def __format__(self, format_spec: str) -> str:
    """Delegate non-empty format specs to :attr:`value`.

    ``f"{r:.4f}"`` formats the value; ``f"{r}"`` prints the dataclass repr.
    An over-range reading (``value`` is ``None``) formats as ``"None"`` for
    any numeric spec rather than raising.
    """
    if format_spec == "":
        return str(self)
    if self.value is None:
        return "None"
    return format(self.value, format_spec)

as_dict

as_dict()

Flatten the reading into a row-shaped dict for tabular sinks.

Content-only — timing provenance lives on the surrounding :class:Sample. Booleans render as 0 / 1 so SQLite picks INTEGER affinity and CSV / JSONL round-trip cleanly.

Source code in src/servomexlib/devices/models.py
def as_dict(self) -> dict[str, float | int | str | bool | None]:
    """Flatten the reading into a row-shaped dict for tabular sinks.

    Content-only — timing provenance lives on the surrounding
    :class:`Sample`. Booleans render as ``0`` / ``1`` so SQLite picks
    INTEGER affinity and CSV / JSONL round-trip cleanly.
    """
    return {
        "channel": self.channel.value,
        "kind": self.kind.value,
        "name": self.name,
        "value": self.value,
        "unit": self.unit.value,
        "fault": int(self.status.fault),
        "maintenance": int(self.status.maintenance),
        "calibrating": int(self.status.calibrating),
        "warming_up": int(self.status.warming_up),
        "alarm1": int(self.status.alarms[0]),
        "alarm2": int(self.status.alarms[1]),
        "alarm3": int(self.status.alarms[2]),
        "alarm4": int(self.status.alarms[3]),
        "ok": int(self.status.ok),
        "protocol": self.protocol.value,
    }

Recording dataclass

Recording(stream, summary, rate_hz)

Container yielded by :func:record — stream + live summary + rate.

Shares the cross-family shape (stream / summary / rate_hz) so downstream consumers are vendor-agnostic. For servomexlib the payload is Recording[Sequence[Sample]] — per-tick batches.

Attributes:

Name Type Description
stream AsyncIterator[T]

Async iterator of per-tick :class:Sample batches.

summary AcquisitionSummary

Live :class:AcquisitionSummary (recorder mutates, consumer reads); summary.finished_at is set on CM exit.

rate_hz float

The cadence captured at record() entry.

SafetyTier

Bases: StrEnum

How disruptive an operation is.

STATEFUL operations (autocalibration) require confirm=True and are gated before any byte is sent.

Sample dataclass

Sample(
    device,
    channel,
    reading,
    protocol,
    monotonic_ns,
    received_at,
    requested_at=None,
    latency_s=None,
    metadata=_empty_metadata(),
    error=None,
)

Long-format row — one channel reading with streaming provenance.

requested_at / latency_s are None in passive continuous mode (we did not ask). error is set when a frame was dropped/corrupt; reading is then None (the two are mutually exclusive) and channel is None because a dropped frame is not tied to one channel.

SerialSettings dataclass

SerialSettings(
    port,
    baudrate=MAX_BAUD,
    bytesize=ByteSize.EIGHT,
    parity=Parity.NONE,
    stopbits=StopBits.ONE,
    rtscts=False,
    xonxoff=False,
    exclusive=True,
)

Frozen serial framing descriptor. Default 19200 / 8-N-1.

ServomexCapabilityError

ServomexCapabilityError(message='', *, context=None)

Bases: ServomexError

An operation is not available on this device / mode.

Source code in src/servomexlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

ServomexChecksumError

ServomexChecksumError(message='', *, context=None)

Bases: ServomexProtocolError

A continuous-frame checksum did not match the recomputed value.

Source code in src/servomexlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

ServomexConfigurationError

ServomexConfigurationError(message='', *, context=None)

Bases: ServomexError

Configuration-level error (bad args, wrong confirm flag, etc.).

Source code in src/servomexlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

ServomexConfirmationRequiredError

ServomexConfirmationRequiredError(
    message="", *, context=None
)

Bases: ServomexConfigurationError

A SafetyTier.STATEFUL op was attempted without confirm=True.

Source code in src/servomexlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

ServomexConnectionError

ServomexConnectionError(message='', *, context=None)

Bases: ServomexTransportError

Could not open / lost the connection (or no recognised protocol on AUTO).

Source code in src/servomexlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

ServomexError

ServomexError(message='', *, context=None)

Bases: Exception

Base class for every :mod:servomexlib exception.

Carries a typed :class:ErrorContext. The message is the human-readable summary; the context is the machine-readable detail.

Source code in src/servomexlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

with_context

with_context(**updates)

Return a copy of this error with its context updated.

Useful when an inner layer raises and an outer layer wants to enrich the context (for instance adding port or elapsed_s).

Source code in src/servomexlib/errors.py
def with_context(self, **updates: Any) -> Self:
    """Return a copy of this error with its context updated.

    Useful when an inner layer raises and an outer layer wants to enrich
    the context (for instance adding ``port`` or ``elapsed_s``).
    """
    cls = type(self)
    new = cls.__new__(cls)
    new.args = self.args
    try:
        new.__dict__.update(self.__dict__)
    except AttributeError:  # pragma: no cover — no slotted subclass today
        for slot in getattr(cls, "__slots__", ()):
            if hasattr(self, slot):
                object.__setattr__(new, slot, getattr(self, slot))
    new.context = self.context.merged(**updates)
    new.__cause__ = self.__cause__
    new.__context__ = self.__context__
    new.__traceback__ = self.__traceback__
    return new

ServomexFrameError

ServomexFrameError(message='', *, context=None)

Bases: ServomexProtocolError

Structural frame error (wrong field count, truncated block).

Source code in src/servomexlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

ServomexManager

ServomexManager(*, error_policy=ErrorPolicy.RAISE)

Coordinator for many analysers across one or more serial ports.

Usage::

async with ServomexManager() as mgr:
    await mgr.add("a1", "COM11", address=1)
    await mgr.add("a2", "COM11", address=2)
    samples = await mgr.poll_samples()
Source code in src/servomexlib/manager.py
def __init__(self, *, error_policy: ErrorPolicy = ErrorPolicy.RAISE) -> None:
    self._error_policy = error_policy
    self._devices: dict[str, _DeviceEntry] = {}
    self._ports: dict[str, _PortEntry] = {}
    self._state_lock = anyio.Lock()
    self._closed = False

closed property

closed

True once :meth:close has been called.

error_policy property

error_policy

The :class:ErrorPolicy this manager was constructed with.

names property

names

Insertion-ordered tuple of managed analyser names.

add async

add(
    name,
    source,
    *,
    protocol=ProtocolKind.MODBUS_RTU,
    address=1,
    serial_settings=None,
    timeout=1.0,
)

Register an analyser under name and return it.

source discriminates lifecycle ownership: a pre-built :class:Analyzer (caller-owned, tracked only), a str port path (the manager opens and shares a transport across the bus), or a :class:Transport (bound, not owned). Continuous-ASCII / AUTO are refused — the manager is a Modbus multidrop coordinator.

Raises:

Type Description
ServomexValidationError

duplicate name, a non-Modbus protocol, or a protocol clash with an existing device on the same port.

ServomexConnectionError

the manager is closed.

Source code in src/servomexlib/manager.py
async def add(
    self,
    name: str,
    source: Analyzer | str | Transport,
    *,
    protocol: ProtocolKind = ProtocolKind.MODBUS_RTU,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    timeout: float = 1.0,
) -> Analyzer:
    """Register an analyser under ``name`` and return it.

    ``source`` discriminates lifecycle ownership: a pre-built :class:`Analyzer`
    (caller-owned, tracked only), a ``str`` port path (the manager opens and
    shares a transport across the bus), or a :class:`Transport` (bound, not
    owned). Continuous-ASCII / ``AUTO`` are refused — the manager is a Modbus
    multidrop coordinator.

    Raises:
        ServomexValidationError: duplicate ``name``, a non-Modbus protocol, or
            a protocol clash with an existing device on the same port.
        ServomexConnectionError: the manager is closed.
    """
    async with self._state_lock:
        self._check_open()
        if name in self._devices:
            raise ServomexValidationError(f"manager: name {name!r} already in use")

        if isinstance(source, Analyzer):
            self._devices[name] = _DeviceEntry(name=name, analyzer=source, port_key=None)
            _logger.info("manager.add device=%s port=prebuilt", name)
            return source

        if protocol not in _MODBUS_KINDS:
            raise ServomexValidationError(
                f"manager: only Modbus protocols can be grouped (got {protocol.value}); "
                "a continuous-ASCII analyser is a single broadcaster, not a multidrop peer",
            )

        port_key, port_entry = await self._resolve_port(
            source, protocol=protocol, serial_settings=serial_settings
        )
        analyzer = self._build_analyzer(
            port_entry, name=name, protocol=protocol, address=address, timeout=timeout
        )
        self._devices[name] = _DeviceEntry(name=name, analyzer=analyzer, port_key=port_key)
        port_entry.refs.add(name)
        _logger.info(
            "manager.add device=%s port=%s protocol=%s address=%s",
            name,
            port_key,
            protocol.value,
            address,
        )
        return analyzer

close async

close()

Tear down every managed analyser and port (LIFO).

Source code in src/servomexlib/manager.py
async def close(self) -> None:
    """Tear down every managed analyser and port (LIFO)."""
    await self._close(suppress_errors=False)

execute_each async

execute_each(op, names=None)

Run op(analyzer) on every (or named) analyser concurrently across ports.

Source code in src/servomexlib/manager.py
async def execute_each[T](
    self,
    op: Callable[[Analyzer], Awaitable[T]],
    names: Sequence[str] | None = None,
) -> dict[str, DeviceResult[T]]:
    """Run ``op(analyzer)`` on every (or named) analyser concurrently across ports."""
    groups = self._group_by_port(self._resolve_names(names))
    results: dict[str, DeviceResult[T]] = {}
    errors: list[ServomexError] = []
    result_lock = anyio.Lock()

    async def _run_group(port_key: str, members: list[str]) -> None:
        lock = self._lock_for(port_key)
        async with lock:
            for member in members:
                try:
                    value = await op(self._devices[member].analyzer)
                except ServomexError as err:
                    async with result_lock:
                        results[member] = DeviceResult.failure(err)
                        errors.append(err)
                else:
                    async with result_lock:
                        results[member] = DeviceResult[T].success(value)

    async with anyio.create_task_group() as tg:
        for port_key, members in groups.items():
            _ = tg.start_soon(_run_group, port_key, members)

    if self._error_policy is ErrorPolicy.RAISE and errors:
        raise ExceptionGroup("manager.execute_each: one or more analysers failed", errors)
    return results

get

get(name)

Return the analyser registered under name.

Source code in src/servomexlib/manager.py
def get(self, name: str) -> Analyzer:
    """Return the analyser registered under ``name``."""
    try:
        return self._devices[name].analyzer
    except KeyError:
        raise ServomexValidationError(f"manager: no analyser named {name!r}") from None

poll async

poll(names=None, *, timeout=None)

Read one :class:Frame per (or named) analyser, keyed by name.

Cross-port concurrent, same-port serialised. Always returns a complete mapping; per-device failures land in :attr:DeviceResult.error.

Source code in src/servomexlib/manager.py
async def poll(
    self, names: Sequence[str] | None = None, *, timeout: float | None = None
) -> Mapping[str, DeviceResult[Frame]]:
    """Read one :class:`Frame` per (or named) analyser, keyed by name.

    Cross-port concurrent, same-port serialised. Always returns a complete
    mapping; per-device failures land in :attr:`DeviceResult.error`.
    """
    groups = self._group_by_port(self._resolve_names(names))
    results: dict[str, DeviceResult[Frame]] = {}
    result_lock = anyio.Lock()

    async def _run_group(port_key: str, members: list[str]) -> None:
        lock = self._lock_for(port_key)
        async with lock:
            for member in members:
                try:
                    frame = await self._devices[member].analyzer.poll(timeout=timeout)
                except ServomexError as err:
                    async with result_lock:
                        results[member] = DeviceResult.failure(err)
                else:
                    async with result_lock:
                        results[member] = DeviceResult[Frame].success(frame)

    async with anyio.create_task_group() as tg:
        for port_key, members in groups.items():
            _ = tg.start_soon(_run_group, port_key, members)
    return results

poll_samples async

poll_samples(*, names=None, timeout=None)

Poll every (or named) analyser concurrently across ports → flat samples.

One :class:Sample per channel read. Failed devices are dropped and logged at WARN (the recorder never sees them). Same-port devices serialise on the shared port lock, acquired once per port-group so a coherent snapshot is not interleaved. Satisfies :class:PollSource.

Source code in src/servomexlib/manager.py
async def poll_samples(
    self, *, names: Sequence[str] | None = None, timeout: float | None = None
) -> list[Sample]:
    """Poll every (or named) analyser concurrently across ports → flat samples.

    One :class:`Sample` per channel read. Failed devices are dropped and logged
    at WARN (the recorder never sees them). Same-port devices serialise on the
    shared port lock, acquired once per port-group so a coherent snapshot is
    not interleaved. Satisfies :class:`PollSource`.
    """
    groups = self._group_by_port(self._resolve_names(names))
    result_lock = anyio.Lock()
    all_samples: list[Sample] = []

    async def _run_group(port_key: str, members: list[str]) -> None:
        local: list[Sample] = []
        lock = self._lock_for(port_key)
        async with lock:
            for member in members:
                try:
                    local.extend(
                        await self._devices[member].analyzer.poll_samples(timeout=timeout)
                    )
                except ServomexError as err:
                    _logger.warning("manager.poll_failed device=%s error=%r", member, err)
        async with result_lock:
            all_samples.extend(local)

    async with anyio.create_task_group() as tg:
        for port_key, members in groups.items():
            _ = tg.start_soon(_run_group, port_key, members)
    return all_samples

remove async

remove(name)

Unregister name, closing the shared transport on the last ref.

Source code in src/servomexlib/manager.py
async def remove(self, name: str) -> None:
    """Unregister ``name``, closing the shared transport on the last ref."""
    async with self._state_lock:
        self._check_open()
        if name not in self._devices:
            raise ServomexValidationError(f"manager: no analyser named {name!r}")
        await self._teardown_device(self._devices.pop(name))
        _logger.info("manager.remove device=%s", name)

ServomexModbusError

ServomexModbusError(message='', *, context=None)

Bases: ServomexProtocolError

A Modbus exception response or engine-level Modbus failure.

Rooted under :class:ServomexProtocolError (single MRO path) so the inherited __init__ and :meth:~ServomexError.with_context resolve unambiguously; the per-exception-code subclasses below add no competing __init__ / __slots__.

Source code in src/servomexlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

ServomexParseError

ServomexParseError(message='', *, context=None)

Bases: ServomexProtocolError

A frame could not be parsed (bad header, unparsable field).

Source code in src/servomexlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

ServomexProtocolError

ServomexProtocolError(message='', *, context=None)

Bases: ServomexError

Protocol-level error (framing, parsing, checksum, mode mismatch).

Source code in src/servomexlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

ServomexProtocolUnsupportedError

ServomexProtocolUnsupportedError(
    message="", *, context=None
)

Bases: ServomexProtocolError

The active protocol cannot perform this operation (e.g. autocal in continuous).

Source code in src/servomexlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

ServomexTimeoutError

ServomexTimeoutError(message='', *, context=None)

Bases: ServomexTransportError

A transport read or write timed out (or a Modbus request got no reply).

Source code in src/servomexlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

ServomexTransportError

ServomexTransportError(message='', *, context=None)

Bases: ServomexError

I/O-layer error from the serial transport.

Source code in src/servomexlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

ServomexValidationError

ServomexValidationError(message='', *, context=None)

Bases: ServomexConfigurationError

Request validation failed before I/O (bad channel, group out of range).

Source code in src/servomexlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

StreamMode

Bases: StrEnum

How a :meth:Analyzer.stream session sources samples.

AUTOPRINT is the inherited family member (sartorius SBI vocabulary), reused verbatim for boundary harmony; for the 4100 it denotes a passive unsolicited-broadcast subscribe.

StreamingSession

StreamingSession(
    receiver, *, mode, on_close=None, producer=None
)

Async-iterable context manager over one subscriber's :class:Sample stream.

Two backing shapes share this one interface:

  • Passive AUTOPRINT (continuous): receiver is fed by the client's already-running background loop; on_close unsubscribes.
  • Active POLL (Modbus): a producer coroutine is supplied. It is run in a task group this session owns — started on __aenter__ and cancelled on close — so the recorder's lifetime is strictly nested in the session.
Source code in src/servomexlib/streaming/stream_session.py
def __init__(
    self,
    receiver: MemoryObjectReceiveStream[Sample],
    *,
    mode: StreamMode,
    on_close: Callable[[], None] | None = None,
    producer: Callable[[], Coroutine[Any, Any, None]] | None = None,
) -> None:
    self._receiver = receiver
    self._mode = mode
    self._on_close = on_close
    self._producer = producer
    self._task_group: TaskGroup | None = None

mode property

mode

The mode this session is streaming in.

aclose async

aclose()

Stop the owned producer (if any), unsubscribe, and close the stream.

Source code in src/servomexlib/streaming/stream_session.py
async def aclose(self) -> None:
    """Stop the owned producer (if any), unsubscribe, and close the stream."""
    if self._task_group is not None:
        self._task_group.cancel()
        await self._task_group.__aexit__(None, None, None)
        self._task_group = None
    if self._on_close is not None:
        self._on_close()
        self._on_close = None
    await self._receiver.aclose()

Transport

Bases: Protocol

Structural interface the rest of the library depends on.

Every concrete transport is also an anyio.abc.ByteStream (it carries the receive / send / send_eof / aclose face), so it can be handed straight to anymodbus.Bus while still exposing the framing helpers the continuous path uses.

Unit

Bases: StrEnum

A measurement unit. The value is the canonical display string.

UnitKind

Bases: StrEnum

Coarse classification of a :class:Unit.

open_continuous async

open_continuous(
    port, *, device="", timeout=1.0, serial_settings=None
)

Build a continuous-mode :class:Analyzer (explicit, no sniff).

A thin convenience over :func:open_device with protocol=CONTINUOUS_ASCII and identify=False — kept for callers and tests that want the continuous path without the AUTO ladder.

Source code in src/servomexlib/devices/factory.py
async def open_continuous(
    port: str | Transport,
    *,
    device: str = "",
    timeout: float = 1.0,
    serial_settings: SerialSettings | None = None,
) -> Analyzer:
    """Build a continuous-mode :class:`Analyzer` (explicit, no sniff).

    A thin convenience over :func:`open_device` with ``protocol=CONTINUOUS_ASCII``
    and ``identify=False`` — kept for callers and tests that want the continuous
    path without the AUTO ladder.
    """
    transport = await resolve_transport(port, serial_settings)
    label = device or transport.label
    client = ContinuousClient(transport, device=label, timeout=timeout)
    return Analyzer(client, device=label)

open_device async

open_device(
    port,
    *,
    protocol=ProtocolKind.AUTO,
    address=1,
    serial_settings=None,
    timeout=1.0,
    identify=True,
)

Open an analyser and return a (not-yet-entered) :class:Analyzer.

Parameters:

Name Type Description Default
port str | Transport

Serial path ("COM11") or a pre-built transport.

required
protocol ProtocolKind

Wire protocol, or AUTO to sniff.

AUTO
address int

Modbus slave address (RS485 multidrop).

1
serial_settings SerialSettings | None

Overrides the default 19200 / 8-N-1 for a str port.

None
timeout float

Default per-call/first-frame timeout.

1.0
identify bool

Cache :class:DeviceInfo on __aenter__ (continuous waits for the first frame).

True

Returns:

Name Type Description
An Analyzer

class:Analyzer; enter it as an async context manager to start I/O.

Source code in src/servomexlib/devices/factory.py
async def open_device(
    port: str | Transport,
    *,
    protocol: ProtocolKind = ProtocolKind.AUTO,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    timeout: float = 1.0,
    identify: bool = True,
) -> Analyzer:
    """Open an analyser and return a (not-yet-entered) :class:`Analyzer`.

    Args:
        port: Serial path (``"COM11"``) or a pre-built transport.
        protocol: Wire protocol, or ``AUTO`` to sniff.
        address: Modbus slave address (RS485 multidrop).
        serial_settings: Overrides the default ``19200 / 8-N-1`` for a ``str`` port.
        timeout: Default per-call/first-frame timeout.
        identify: Cache :class:`DeviceInfo` on ``__aenter__`` (continuous waits for
            the first frame).

    Returns:
        An :class:`Analyzer`; enter it as an async context manager to start I/O.
    """
    transport = await resolve_transport(port, serial_settings)
    resolved = protocol
    if resolved is ProtocolKind.AUTO:
        from servomexlib.protocol.detect import detect_protocol

        resolved = await detect_protocol(
            transport, address=address, listen_timeout=max(timeout, 2.0)
        )
    client = build_client(transport, resolved, address=address, timeout=timeout)
    return Analyzer(client, device=transport.label, identify_on_enter=identify)

record async

record(
    source,
    *,
    rate_hz,
    duration=None,
    names=None,
    timeout=None,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
    auto_reconnect=False,
    reconnect_factory=None,
)

Record polled samples into a receive stream at an absolute cadence.

Usage::

async with record(analyzer, rate_hz=2, duration=10) as rec:
    async for batch in rec.stream:
        for sample in batch:
            print(sample.channel, sample.value)

Parameters:

Name Type Description Default
source PollSource

Any :class:PollSource (an :class:Analyzer or a :class:ServomexManager).

required
rate_hz float

Target cadence; target[n] = start + n / rate_hz. Must be > 0.

required
duration float | None

Total acquisition seconds, or None for "until the caller exits the CM".

None
names Sequence[str] | None

Subset of device names to poll (manager only); None polls all.

None
timeout float | None

Per-poll I/O ceiling passed to source.poll_samples.

None
overflow OverflowPolicy

Backpressure policy when the buffer is full.

BLOCK
buffer_size int

Receive-stream capacity, in per-tick batches.

64
auto_reconnect bool

Treat :class:ServomexConnectionError from the source as a transient drop: log, back off, optionally rebuild via reconnect_factory, and keep going (missed ticks count late).

False
reconnect_factory Callable[[], Awaitable[PollSource]] | None

Rebuilds the source after a disconnect when supplied.

None

Yields:

Name Type Description
A AsyncGenerator[Recording[Sequence[Sample]]]

class:Recording[Sequence[Sample]].

Raises:

Type Description
ValueError

rate_hz <= 0, duration <= 0, or buffer_size < 1.

Source code in src/servomexlib/streaming/recorder.py
@asynccontextmanager
async def record(
    source: PollSource,
    *,
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    timeout: float | None = None,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
    auto_reconnect: bool = False,
    reconnect_factory: Callable[[], Awaitable[PollSource]] | None = None,
) -> AsyncGenerator[Recording[Sequence[Sample]]]:
    """Record polled samples into a receive stream at an absolute cadence.

    Usage::

        async with record(analyzer, rate_hz=2, duration=10) as rec:
            async for batch in rec.stream:
                for sample in batch:
                    print(sample.channel, sample.value)

    Args:
        source: Any :class:`PollSource` (an :class:`Analyzer` or a
            :class:`ServomexManager`).
        rate_hz: Target cadence; ``target[n] = start + n / rate_hz``. Must be > 0.
        duration: Total acquisition seconds, or ``None`` for "until the caller
            exits the CM".
        names: Subset of device names to poll (manager only); ``None`` polls all.
        timeout: Per-poll I/O ceiling passed to ``source.poll_samples``.
        overflow: Backpressure policy when the buffer is full.
        buffer_size: Receive-stream capacity, in per-tick batches.
        auto_reconnect: Treat :class:`ServomexConnectionError` from the source as
            a transient drop: log, back off, optionally rebuild via
            ``reconnect_factory``, and keep going (missed ticks count late).
        reconnect_factory: Rebuilds the source after a disconnect when supplied.

    Yields:
        A :class:`Recording[Sequence[Sample]]`.

    Raises:
        ValueError: ``rate_hz <= 0``, ``duration <= 0``, or ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if duration is not None and duration <= 0:
        raise ValueError(f"duration must be > 0 or None, got {duration!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")

    period = 1.0 / rate_hz
    total_ticks = None if duration is None else max(1, round(duration * rate_hz))

    send_stream, receive_stream = anyio.create_memory_object_stream[Sequence[Sample]](
        max_buffer_size=buffer_size,
    )
    # Producer-side clone for DROP_OLDEST eviction, off the consumer's iterator.
    drop_rx = receive_stream.clone()

    started_at = datetime.now(UTC)
    summary = AcquisitionSummary(started_at=started_at)
    tick_durations_ms: list[float] = []
    _logger.info(
        "recorder.start rate_hz=%s duration_s=%s overflow=%s buffer_size=%s names=%s",
        rate_hz,
        duration,
        overflow.value,
        buffer_size,
        list(names) if names is not None else None,
    )

    try:
        async with anyio.create_task_group() as tg, receive_stream:

            async def _producer_entrypoint() -> None:
                await _run_producer(
                    source,
                    send_stream,
                    drop_rx,
                    names,
                    timeout,
                    period,
                    total_ticks,
                    overflow,
                    summary,
                    tick_durations_ms,
                    auto_reconnect=auto_reconnect,
                    reconnect_factory=reconnect_factory,
                )

            _ = tg.start_soon(_producer_entrypoint)
            try:
                yield Recording(stream=receive_stream, summary=summary, rate_hz=rate_hz)
            finally:
                tg.cancel()
    except BaseExceptionGroup as eg:
        # A lone producer failure (e.g. ServomexConnectionError without
        # auto_reconnect) surfaces as a single-member group from the task group;
        # collapse it so callers catch the concrete error, not the wrapper.
        raise _collapse(eg) from None
    finally:
        summary.finished_at = datetime.now(UTC)
        summary.tick_duration_ms_p50, summary.tick_duration_ms_p99 = _tick_percentiles(
            tick_durations_ms
        )
        _logger.info(
            "recorder.stop emitted=%s late=%s max_drift_ms=%.3f tick_p50_ms=%.3f tick_p99_ms=%.3f",
            summary.samples_emitted,
            summary.samples_late,
            summary.max_drift_ms,
            summary.tick_duration_ms_p50,
            summary.tick_duration_ms_p99,
        )