Engine#
Warning
This document is under construction.
The Engine
implements a high-performance data processing pipeline composed of acquisition, processing, and formatting stages.
See Concepts for more detail regarding this pipeline and its capabilities.
- class vortex.engine.Engine#
A real-time engine that moves data through an acquisition, processing, and formatting pipeline.
Once the pipeline is configured, the engine can operate the pipeline in multiple sessions, bounded by
start()
andstop()
calls. The session is active until thescan_queue
becomes empty,EngineConfig.blocks_to_acquire
have been acquired,stop()
is called, or an error occurs.- __init__(logger=None)#
Create a new object with optional logging.
- Parameters
logger (vortex.Logger) – Logger to receive status messages. Logging is disabled if not provided.
- initialize(config)#
Initialize the engine using the supplied configuration.
- Parameters
config (EngineConfig) – New configuration to apply.
- prepare()#
Prepare to start the engine. The engine allocates all blocks, assigns and allocates transfer buffers, and notifies endpoints to allocate their internal buffers, if needed.
- start()#
Start a new session with the engine. Acquisition and IO components are preloaded, if applicable, and started. Every call of this method must be paired with a call to
stop()
at a later time.
- wait()#
Wait for the active session to complete. This method will block indefinitely.
- Raises
RuntimeError – If any error occurred to shut down the session prematurely. This is guaranteed to be the first of such errors in case a cascade of errors occurs.
- wait_for(timeout)#
Wait the specified duration for the session to complete.
- Parameters
timeout (float) – Maximum duration to wait in seconds.
- Returns
bool –
True
if engine has finished andFalse
otherwise.- Raises
RuntimeError – If any error occurred to shut down the session prematurely. This is guaranteed to be the first of such errors in case a cascade of errors occurs.
- stop()#
Request that the engine shut down and wait for it to do so. This method must be called for every call to
start()
.- Raises
RuntimeError – If any error occurred to shut down the engine prematurely. This is guaranteed to be the first of such errors in case a cascade of errors occurs.
- status()#
Query the engine status.
- Returns
EngineStatus – The current status of the engine.
- property config: EngineConfig#
Copy of the active configuration.
- property done: bool#
Return
True
if the engine is stopped andFalse
otherwise.Warning
This property will return
True
untilstop()
is called. Do no rely on this property to determine if the engine has exited.
- property event_callback: Callable[[Engine.Event, Exception], None]#
Callback to receive status events from the engine.
- property job_callback: Callable[[int, EngineStatus, JobTiming], None]#
Callback to receive status and timing information for each block when it exits the pipeline.
Caution
Avoid computationally expensive tasks in this callback or the session may shut down prematurely due to delayed block recycling.
- class vortex.engine.EngineConfig#
Configuration object for
Engine
.- property records_per_block: int#
Number of records (spectra or A-scans) in a block. Each block represents a slice of time, as determined by the number of records acquired per second. Default is
1000
.
- property blocks_to_allocate: int#
Number of blocks to pre-allocate prior to starting the session. This determines the maximum session duration or size that can be buffered in memory. The maximum buffered duration in records is the product of
blocks_to_allocate
andrecords_per_block
. Default is4
.
- property preload_count: int#
Number of blocks to commit to the hardware drivers prior to starting the session. This determines how far in advance the engine is generating signals and scheduling buffers for acquisition. Once the engine starts, the engine will never have more that this number of blocks pending for acquisition. The product of
preload_count
andrecords_per_block
determines the number of records required for new inputs (e.g., scan pattern changes) to propagate through the pipeline. Default is2
.
- property blocks_to_acquire#
The total acquisition duration as measured in blocks. Set to
0
for an indefinite acquisition, which ends only when the last scan reports that it is complete. Default is0
.
- property post_scan_records: int#
The number of records to acquire after the last scan reports that is is complete. This is useful if hardware latencies cause the final samples of the scan to physically occur after the acquisition would have otherwise ended. Default is
0
.
- property scanner_warp: [NoWarp | AngularWarp | TelecentricWarp]#
Scan warp to generate the sample waveforms from the galvo waveforms. Default is
NoWarp
.
- property galvo_output_channels: int#
Number of output channels to allocate for galvo waveforms. Default is
2
.
- property galvo_input_channels: int#
Number of input channels to allocate for galvo waveforms. Default is
2
.
- property strobes: List[SampleStrobe | SegmentStrobe | VolumeStrobe | ScanStrobe | EventStrobe]#
Scan pattern-derived strobes to generate for output. Default is
[SampleStrobe(0, 2), SampleStrobe(1, 1000), SampleStrobe(2, 1000, Polarity.Low), SegmentStrobe(3), VolumeStrobe(4)]
.
- add_acquisition(acquisition, processors, preload=True, master=True)#
Register the acquisition with the engine and route its output data to the listed processors.
divide()
andcycle()
may be used to build arbitrarily nested schemes for distributing data between processors. For example,cycle()
can be used to realize “ping-pong” GPU processing of data, such as below.engine.add_acquisition(acquisition, [ cycle([ processorA, processorB ]) ])
There is a single acquisition dispatch thread for the whole engine. Processing is dispatched sequentially to divided processing in the listed order but may complete out of order.
- Parameters
acquisition (Acquisition) – The acquisition component to register with the engine. See the list of acquisition components for supported object types.
processors (List[Processor | Cycle | Divide]) – The graph of processing components to receive the data from this acquisition component. See the list of processing components for supported object types.
preload (bool) – Enable or disable preloading with this acquisition. If
True
,preload_count
blocks will be queued before the engine starts. IfFalse
, the first block will be queued immediately after the engine starts.master (bool) – Control when this acquisition is started in relation to other acquisition or IO components. All components with
master=True
are started after all components withmaster=False
.
- add_processor(processor, formatters)#
Register the processor with the engine and route its output to all listed formatters. The engine ensures that data arrives on the correct GPU device, if any, and deinterleaves multiple channels, if present, in preparation for processing. There is a single processing dispatch thread for the whole engine. Formatters are scheduled receive data in the order listed, although out-of-order completion may causes formatters to execute in different orders.
- Parameters
processor (Processor) – The processor component to register with the engine. See the list of processors for supported object types.
formatters (List[Formatter]) – The list of formatting components to receive the data from this processor component and its parent acquisition component.
- add_formatter(formatter, endpoints)#
Register the formatter component with the engine and apply its format plans to acquired and/or processed via the listed endpoints. Each formatter is allocated a dedicated thread for its endpoints. Endpoints receive data sequentially in the order listed.
- Parameters
formatter (Formatter) – The formater component to register with the engine. See the list of processors for supported object types.
endpoints (List[Endpoints]) – The list of endpoints to receive this formatter’s plan and the data associated with the parent acquisition and processor components.
- add_io(io, preload=True, master=False, lead_samples=0)#
Register the IO component with the engine.
- Parameters
io (IO) – The IO component to register with the engine.
preload (bool) – Enable or disable preloading for this IO component. See
add_acquisition()
for full explanation.master (bool) – Enable or disable master status for this IO component. See
add_acquisition()
for full explanation.lead_samples (int) – The number of samples in advance to generate output waveforms in order to cancel out IO delay. This is done by looking ahead into the scan pattern-derived waveforms by the specified number of samples. Default is
0
.
- validate()#
Check the configuration for errors.
- Raises
RuntimeError – If the configuration is invalid.
Warning
This method is not fully implemented yet.
- vortex.engine.divide(nodes)#
Return an object that informs the engine to divide the acquired data between multiple processors that execute in parallel. In Python, this is equivalent to creating a list of nodes so this function effectively returns
nodes
.- Parameters
nodes (List[Processor | Cycle | Divide]) – The processor components to divide.
- vortex.engine.cycle(nodes)#
Return an object that informs the engine to rotate between the processors in order for each block of acquired data.
- Parameters
nodes (List[Processor | Cycle | Divide]) – The processor components to cycle.
- class vortex.engine.ScanQueue#
A queue of scans to execute with the engine. When the scan queue is empty, the engine initiates a graceful shutdown.
- append(scan, callback=None, marker=ScanBoundary())#
Append the specified scan to the queue.
- Parameters
scan (Scan) – The scan to append to the queue. See the list of scans for supported object types.
callback (Callable[[int, ScanQueue.Event], None]) – Callback to receive notifications regarding this scan.
marker (ScanBoundary) – Boundary marker for the start of this scan.
Caution
This callback is executed in the acquisition dispatch thread of the engine, which is highly sensitive to timing delays. Avoid computationally expensive tasks in this callback or the session may shut down prematurely due to buffer underflows or overflows.
- interrupt(scan, callback=None, marker=ScanBoundary())#
Clear the scan queue and immediately switch to the specified scan. The latency with which the transition propagates through the engine is determined by the engine’s
preload_count
.- Parameters
scan (Scan) – The scan to append to the queue. See the list of scans for supported object types.
callback (Callable[[int, ScanQueue.Event], None]) – Callback to receive notifications regarding this scan.
marker (ScanBoundary) – Boundary marker for the start of this scan.
Caution
This callback is executed in the acquisition dispatch thread of the engine, which is highly sensitive to timing delays. Avoid computationally expensive tasks in this callback or the engine may shut down prematurely due to buffer underflows or overflows.
- reset()#
Clears the scan queue and internal scan state (i.e., last position and velocity).
- clear()#
Clears the scan queue but maintains internal scan state (e.g., last position and velocity).
- property empty_callback: Callable[[OnlineScanQueue], None]#
Callback to execute when the scan queue is empty but before the engine initiates a graceful shutdown. This represents the last opportunity to prolong the session. The callback receives a single argument of
OnlineScanQueue
which exposes a singleOnlineScanQueue.append()
method to provide a thread-safe mechanism to append another scan. This method is identical in operation toScanQueue.append()
. If no scan is appended, the engine will initiate a graceful shutdown immediately after the callback returns.Attention
Any attempt to call a method of the
ScanQueue
during this callback will lead to deadlock. Only call methods of theOnlineScanQueue
provided as the callback’s argument.