servomexlib¶
Top-level re-exports. This page documents whatever is currently exported.
servomexlib ¶
servomexlib — async Python driver for Servomex SERVOPRO 4000-series gas analysers.
One protocol-neutral, channel-oriented API decodes the analyser's three communication modes — Continuous ASCII (unsolicited broadcast), Modbus RTU, and Modbus ASCII — into a single set of typed models.
The public API is semantic and protocol-neutral: a caller asks for
poll(), read_channel("I1"), snapshot(), identify(),
stream(), start_calibration(group) and the session dispatches the
right per-protocol client selected (or sniffed via AUTO) at open time.
The core is async (built on anyio); a thin sync facade lives at
:mod:servomexlib.sync for scripts, notebooks, and REPL use.
servomexlib is a member of the *lib instrument-driver family; family
harmony is defined at the boundary (entry point, frozen models, error
hierarchy, streaming/sinks/sync/CLI conventions, tooling).
This module re-exports the public surface.
AcquisitionSummary
dataclass
¶
AcquisitionSummary(
started_at,
finished_at=None,
samples_emitted=0,
samples_late=0,
max_drift_ms=0.0,
tick_duration_ms_p50=0.0,
tick_duration_ms_p99=0.0,
disconnects=0,
)
Per-run summary, owned and mutated by the recorder (sole writer).
Counters update in place during the run so progress-polling consumers see
live values; consumers treat it as read-only. finished_at and the
percentile fields are populated on context-manager exit.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
Wall-clock at the first scheduled tick. |
finished_at |
datetime | None
|
Wall-clock at producer shutdown, or |
samples_emitted |
int
|
Per-tick batches pushed onto the stream (a tick whose reads all failed still counts as one emitted batch). |
samples_late |
int
|
Ticks that missed their slot (overrun, overflow drop, or a reconnect gap). |
max_drift_ms |
float
|
Largest positive drift of an emitted batch from its target. |
tick_duration_ms_p50 |
float
|
Median |
tick_duration_ms_p99 |
float
|
99th-percentile |
disconnects |
int
|
|
AnalyserStatus
dataclass
¶
Analyser-level status.
Analyzer ¶
Protocol-neutral async-context-manager facade over one analyser.
Source code in src/servomexlib/devices/analyzer.py
dropped_frames
property
¶
Count of frames dropped for parse/checksum failures (continuous resync).
analyser_status
async
¶
calibration_status
async
¶
Return autocalibration progress for group (Modbus only).
Source code in src/servomexlib/devices/analyzer.py
identify
async
¶
Return device identity, caching it on first call.
poll
async
¶
Return one frame — all channels + analyser status, one tick.
wait_fresh=True waits for the next continuous broadcast instead of
returning the cached latest; it is a no-op for the polled Modbus path.
Source code in src/servomexlib/devices/analyzer.py
poll_samples
async
¶
Poll one frame and fan it out into long-format samples (PollSource).
Makes the analyser a :class:~servomexlib.streaming.poll_source.PollSource
the recorder can drive. names is ignored for a solo analyser (it is the
manager's device-subset selector).
Source code in src/servomexlib/devices/analyzer.py
read_all
async
¶
Return every channel's latest reading keyed by id.
Source code in src/servomexlib/devices/analyzer.py
read_channel
async
¶
Return one channel's latest reading.
snapshot ¶
start_calibration
async
¶
Start autocalibration for group (Modbus only; requires confirm=True).
Source code in src/servomexlib/devices/analyzer.py
status
async
¶
Return one channel's latest status.
stop_calibration
async
¶
Stop all autocalibration (Modbus only; requires confirm=True).
Source code in src/servomexlib/devices/analyzer.py
stream ¶
Stream samples — AUTOPRINT (continuous) or POLL (Modbus).
Defaults the mode per protocol. Continuous mode subscribes passively to the
unsolicited broadcast (rate_hz is ignored — the analyser sets the
cadence). Modbus mode drives the drift-free
:func:~servomexlib.streaming.recorder.record loop at rate_hz
(default 1.0) inside a task group the returned session owns.
Use it as an async context manager so the poll loop starts and stops with the session::
async with anz.stream(rate_hz=2) as samples:
async for sample in samples:
...
Source code in src/servomexlib/devices/analyzer.py
Availability ¶
Bases: StrEnum
Whether a capability is known-present, known-absent, or unprobed.
CalGroupState
dataclass
¶
One cal-group's autocalibration state.
Decoded from the continuous-frame autocal field (S/C + 1/2
per group) or the Modbus analyser-status cal-group discretes.
CalPhase ¶
Bases: StrEnum
The phase of an autocalibration cycle for one cal-group.
CalibrationProgress
dataclass
¶
Autocalibration progress for one cal-group.
Capability ¶
Bases: Flag
What a :class:ProtocolClient can do.
A small :class:enum.Flag: the continuous client advertises read/identify
only; the Modbus client advertises everything. The session gates each op
against the active client's set.
ChannelId ¶
Bases: StrEnum
A channel slot on the analyser.
I* are measured transducers, D* are derived channels, E* are
external analogue (mA) inputs. The wire id matches the enum value ("I1").
ChannelInfo
dataclass
¶
Identity of one populated channel slot (part of :class:DeviceInfo).
ChannelKind ¶
Bases: StrEnum
Classification of a channel slot.
Drives the per-kind status-bit exceptions: external
inputs map bit 0 to Invalid rather than Fault; derived channels
carry copies of their parent transducer's flags.
ChannelStatus
dataclass
¶
Per-channel status flags.
DeviceInfo
dataclass
¶
Identity snapshot produced by Analyzer.identify.
ErrorContext
dataclass
¶
ErrorContext(
port=None,
protocol=None,
address=None,
channel=None,
register=None,
function_code=None,
request=None,
response=None,
elapsed_s=None,
extra=_empty_extra(),
)
Structured context attached to every :class:ServomexError.
Fields are best-effort — missing data is None rather than raising.
extra accepts any Mapping and is always frozen into a read-only
:class:types.MappingProxyType at construction so the shared empty
sentinel can never be mutated through error.context.extra[k] = v.
merged ¶
Return a new context with updates overlaid. Unknown keys go to extra.
Source code in src/servomexlib/errors.py
ErrorPolicy ¶
Bases: Enum
How the manager surfaces per-device failures.
Under :attr:RAISE, the manager collects every result and — if any failed —
raises an :class:ExceptionGroup after the task group joins. Under
:attr:RETURN, each device yields a :class:DeviceResult to inspect.
FakeTransport ¶
Bases: ByteStreamTransport
Scripted, in-process :class:ByteStreamTransport.
Source code in src/servomexlib/transport/fake.py
add_script ¶
emit
async
¶
Feed frames one at a time, optionally interval seconds apart.
Run as a background task to simulate a continuous broadcaster:
task_group.start_soon(lambda: fake.emit(frames, interval=0.05)).
Source code in src/servomexlib/transport/fake.py
Frame
dataclass
¶
One continuous frame, or one Modbus sweep — a timestamped channel set.
as_samples ¶
Fan the frame out into one long-format :class:Sample per reading.
Source code in src/servomexlib/devices/models.py
channel ¶
Return the :class:Reading for cid.
Raises:
| Type | Description |
|---|---|
ServomexValidationError
|
|
Source code in src/servomexlib/devices/models.py
OverflowPolicy ¶
Bases: Enum
What record() does when the receive-stream buffer is full.
The producer runs on an absolute schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks the response.
BLOCK
class-attribute
instance-attribute
¶
Await the slow consumer (default). Silent drops are surprising in acquisition, so the recorder blocks rather than discarding.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the batch about to be enqueued. Counted as late.
DROP_OLDEST
class-attribute
instance-attribute
¶
Evict the oldest queued batch and enqueue the newest. For real-time monitoring where the latest reading matters most. Each eviction is late.
PollSource ¶
Bases: Protocol
Minimal shape the recorder needs from its dispatcher.
poll_samples
async
¶
Read this tick's samples across every channel (and managed device).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
names
|
Sequence[str] | None
|
Subset of device names to poll (manager only); |
None
|
timeout
|
float | None
|
Per-poll I/O ceiling, or |
None
|
Returns:
| Type | Description |
|---|---|
Sequence[Sample]
|
A flat sequence of :class: |
Source code in src/servomexlib/streaming/poll_source.py
ProtocolKind ¶
Bases: StrEnum
Which communication mode the analyser speaks.
All three modes are mutually exclusive on the wire and selected on the
analyser's front panel. AUTO is only valid at open_device call
time; by the time a session exists it has resolved to one of the others.
Reading
dataclass
¶
One decoded channel reading.
value is None on over-range / invalid; name is None when the
slot is unlabelled (continuous ||||||). name and unit have been
routed through the display charset.
__format__ ¶
Delegate non-empty format specs to :attr:value.
f"{r:.4f}" formats the value; f"{r}" prints the dataclass repr.
An over-range reading (value is None) formats as "None" for
any numeric spec rather than raising.
Source code in src/servomexlib/devices/models.py
as_dict ¶
Flatten the reading into a row-shaped dict for tabular sinks.
Content-only — timing provenance lives on the surrounding
:class:Sample. Booleans render as 0 / 1 so SQLite picks
INTEGER affinity and CSV / JSONL round-trip cleanly.
Source code in src/servomexlib/devices/models.py
Recording
dataclass
¶
Container yielded by :func:record — stream + live summary + rate.
Shares the cross-family shape (stream / summary / rate_hz) so
downstream consumers are vendor-agnostic. For servomexlib the payload is
Recording[Sequence[Sample]] — per-tick batches.
Attributes:
| Name | Type | Description |
|---|---|---|
stream |
AsyncIterator[T]
|
Async iterator of per-tick :class: |
summary |
AcquisitionSummary
|
Live :class: |
rate_hz |
float
|
The cadence captured at |
SafetyTier ¶
Bases: StrEnum
How disruptive an operation is.
STATEFUL operations (autocalibration) require confirm=True and are
gated before any byte is sent.
Sample
dataclass
¶
Sample(
device,
channel,
reading,
protocol,
monotonic_ns,
received_at,
requested_at=None,
latency_s=None,
metadata=_empty_metadata(),
error=None,
)
Long-format row — one channel reading with streaming provenance.
requested_at / latency_s are None in passive continuous mode (we
did not ask). error is set when a frame was dropped/corrupt; reading
is then None (the two are mutually exclusive) and channel is None
because a dropped frame is not tied to one channel.
SerialSettings
dataclass
¶
SerialSettings(
port,
baudrate=MAX_BAUD,
bytesize=ByteSize.EIGHT,
parity=Parity.NONE,
stopbits=StopBits.ONE,
rtscts=False,
xonxoff=False,
exclusive=True,
)
Frozen serial framing descriptor. Default 19200 / 8-N-1.
ServomexCapabilityError ¶
Bases: ServomexError
An operation is not available on this device / mode.
Source code in src/servomexlib/errors.py
ServomexChecksumError ¶
Bases: ServomexProtocolError
A continuous-frame checksum did not match the recomputed value.
Source code in src/servomexlib/errors.py
ServomexConfigurationError ¶
Bases: ServomexError
Configuration-level error (bad args, wrong confirm flag, etc.).
Source code in src/servomexlib/errors.py
ServomexConfirmationRequiredError ¶
Bases: ServomexConfigurationError
A SafetyTier.STATEFUL op was attempted without confirm=True.
Source code in src/servomexlib/errors.py
ServomexConnectionError ¶
Bases: ServomexTransportError
Could not open / lost the connection (or no recognised protocol on AUTO).
Source code in src/servomexlib/errors.py
ServomexError ¶
Bases: Exception
Base class for every :mod:servomexlib exception.
Carries a typed :class:ErrorContext. The message is the human-readable
summary; the context is the machine-readable detail.
Source code in src/servomexlib/errors.py
with_context ¶
Return a copy of this error with its context updated.
Useful when an inner layer raises and an outer layer wants to enrich
the context (for instance adding port or elapsed_s).
Source code in src/servomexlib/errors.py
ServomexFrameError ¶
Bases: ServomexProtocolError
Structural frame error (wrong field count, truncated block).
Source code in src/servomexlib/errors.py
ServomexManager ¶
Coordinator for many analysers across one or more serial ports.
Usage::
async with ServomexManager() as mgr:
await mgr.add("a1", "COM11", address=1)
await mgr.add("a2", "COM11", address=2)
samples = await mgr.poll_samples()
Source code in src/servomexlib/manager.py
add
async
¶
add(
name,
source,
*,
protocol=ProtocolKind.MODBUS_RTU,
address=1,
serial_settings=None,
timeout=1.0,
)
Register an analyser under name and return it.
source discriminates lifecycle ownership: a pre-built :class:Analyzer
(caller-owned, tracked only), a str port path (the manager opens and
shares a transport across the bus), or a :class:Transport (bound, not
owned). Continuous-ASCII / AUTO are refused — the manager is a Modbus
multidrop coordinator.
Raises:
| Type | Description |
|---|---|
ServomexValidationError
|
duplicate |
ServomexConnectionError
|
the manager is closed. |
Source code in src/servomexlib/manager.py
close
async
¶
execute_each
async
¶
Run op(analyzer) on every (or named) analyser concurrently across ports.
Source code in src/servomexlib/manager.py
get ¶
Return the analyser registered under name.
poll
async
¶
Read one :class:Frame per (or named) analyser, keyed by name.
Cross-port concurrent, same-port serialised. Always returns a complete
mapping; per-device failures land in :attr:DeviceResult.error.
Source code in src/servomexlib/manager.py
poll_samples
async
¶
Poll every (or named) analyser concurrently across ports → flat samples.
One :class:Sample per channel read. Failed devices are dropped and logged
at WARN (the recorder never sees them). Same-port devices serialise on the
shared port lock, acquired once per port-group so a coherent snapshot is
not interleaved. Satisfies :class:PollSource.
Source code in src/servomexlib/manager.py
remove
async
¶
Unregister name, closing the shared transport on the last ref.
Source code in src/servomexlib/manager.py
ServomexModbusError ¶
Bases: ServomexProtocolError
A Modbus exception response or engine-level Modbus failure.
Rooted under :class:ServomexProtocolError (single MRO path) so the
inherited __init__ and :meth:~ServomexError.with_context resolve
unambiguously; the per-exception-code subclasses below add no competing
__init__ / __slots__.
Source code in src/servomexlib/errors.py
ServomexParseError ¶
Bases: ServomexProtocolError
A frame could not be parsed (bad header, unparsable field).
Source code in src/servomexlib/errors.py
ServomexProtocolError ¶
Bases: ServomexError
Protocol-level error (framing, parsing, checksum, mode mismatch).
Source code in src/servomexlib/errors.py
ServomexProtocolUnsupportedError ¶
Bases: ServomexProtocolError
The active protocol cannot perform this operation (e.g. autocal in continuous).
Source code in src/servomexlib/errors.py
ServomexTimeoutError ¶
Bases: ServomexTransportError
A transport read or write timed out (or a Modbus request got no reply).
Source code in src/servomexlib/errors.py
ServomexTransportError ¶
Bases: ServomexError
I/O-layer error from the serial transport.
Source code in src/servomexlib/errors.py
ServomexValidationError ¶
Bases: ServomexConfigurationError
Request validation failed before I/O (bad channel, group out of range).
Source code in src/servomexlib/errors.py
StreamMode ¶
Bases: StrEnum
How a :meth:Analyzer.stream session sources samples.
AUTOPRINT is the inherited family member (sartorius SBI vocabulary),
reused verbatim for boundary harmony; for the 4100 it denotes a passive
unsolicited-broadcast subscribe.
StreamingSession ¶
Async-iterable context manager over one subscriber's :class:Sample stream.
Two backing shapes share this one interface:
- Passive
AUTOPRINT(continuous):receiveris fed by the client's already-running background loop;on_closeunsubscribes. - Active
POLL(Modbus): aproducercoroutine is supplied. It is run in a task group this session owns — started on__aenter__and cancelled on close — so the recorder's lifetime is strictly nested in the session.
Source code in src/servomexlib/streaming/stream_session.py
aclose
async
¶
Stop the owned producer (if any), unsubscribe, and close the stream.
Source code in src/servomexlib/streaming/stream_session.py
Transport ¶
Bases: Protocol
Structural interface the rest of the library depends on.
Every concrete transport is also an anyio.abc.ByteStream (it carries the
receive / send / send_eof / aclose face), so it can be handed
straight to anymodbus.Bus while still exposing the framing helpers the
continuous path uses.
Unit ¶
Bases: StrEnum
A measurement unit. The value is the canonical display string.
UnitKind ¶
Bases: StrEnum
Coarse classification of a :class:Unit.
open_continuous
async
¶
Build a continuous-mode :class:Analyzer (explicit, no sniff).
A thin convenience over :func:open_device with protocol=CONTINUOUS_ASCII
and identify=False — kept for callers and tests that want the continuous
path without the AUTO ladder.
Source code in src/servomexlib/devices/factory.py
open_device
async
¶
open_device(
port,
*,
protocol=ProtocolKind.AUTO,
address=1,
serial_settings=None,
timeout=1.0,
identify=True,
)
Open an analyser and return a (not-yet-entered) :class:Analyzer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
port
|
str | Transport
|
Serial path ( |
required |
protocol
|
ProtocolKind
|
Wire protocol, or |
AUTO
|
address
|
int
|
Modbus slave address (RS485 multidrop). |
1
|
serial_settings
|
SerialSettings | None
|
Overrides the default |
None
|
timeout
|
float
|
Default per-call/first-frame timeout. |
1.0
|
identify
|
bool
|
Cache :class: |
True
|
Returns:
| Name | Type | Description |
|---|---|---|
An |
Analyzer
|
class: |
Source code in src/servomexlib/devices/factory.py
record
async
¶
record(
source,
*,
rate_hz,
duration=None,
names=None,
timeout=None,
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
auto_reconnect=False,
reconnect_factory=None,
)
Record polled samples into a receive stream at an absolute cadence.
Usage::
async with record(analyzer, rate_hz=2, duration=10) as rec:
async for batch in rec.stream:
for sample in batch:
print(sample.channel, sample.value)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
PollSource
|
Any :class: |
required |
rate_hz
|
float
|
Target cadence; |
required |
duration
|
float | None
|
Total acquisition seconds, or |
None
|
names
|
Sequence[str] | None
|
Subset of device names to poll (manager only); |
None
|
timeout
|
float | None
|
Per-poll I/O ceiling passed to |
None
|
overflow
|
OverflowPolicy
|
Backpressure policy when the buffer is full. |
BLOCK
|
buffer_size
|
int
|
Receive-stream capacity, in per-tick batches. |
64
|
auto_reconnect
|
bool
|
Treat :class: |
False
|
reconnect_factory
|
Callable[[], Awaitable[PollSource]] | None
|
Rebuilds the source after a disconnect when supplied. |
None
|
Yields:
| Name | Type | Description |
|---|---|---|
A |
AsyncGenerator[Recording[Sequence[Sample]]]
|
class: |
Raises:
| Type | Description |
|---|---|
ValueError
|
|
Source code in src/servomexlib/streaming/recorder.py
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 | |