Process#
Processing components perform transformations of data in vortex, such as computing the FFT. Each component provides a similar API.
Overview#
A processing component is first initialized with a call to initialize()
, supplying a matching configuration object.
Initialization brings the component into a state where processing can begin and frequently involves the allocation of memory buffers.
An unlimited amount of data can then be transformed through the processor.
Processors allow changes to their configuration using the change()
method which accepts the new configuration.
Not all aspects of the configuration can be changed, however.
Take care when changes of configuration require reallocation of internal buffers.
Data is transformed synchronously with next()
or asynchronously with next_async()
.
Both methods accept an id
argument, which is provided only for user bookkeeping and logging.
In the documentation below, the input is referred to as \(x[i,j,k]\) (records) and the output is referred to as \(y[i,j,k]\) (A-scans).
The indices \(i\), \(j\), and \(k\) correspond to the record/A-scan, sample, and channel dimensions, respectively.
All buffers must have a single channel per sample (i.e., \(k \in \{ 1 \}\)).
Components#
Null#
- class vortex.process.NullProcessor#
Perform no processing.
This class is provided as an engine placeholder for testing or mocking. No operation is performed on the input or output buffers.
- property config: NullProcessorConfig#
Copy of the active configuration.
- class vortex.process.NullProcessorConfig#
Configuration object for
NullProcessor
.- property input_shape: List[int[3]]#
Required shape of input buffers. Returns list of [
records_per_block
,samples_per_record
,1
]. Read-only.
- property records_per_block: int#
Number of records in each input buffer or block. Identical to
ascans_per_block
.
- property output_shape: List[int[3]]#
Required shape of output buffers. Returns list of [
ascans_per_block
,samples_per_ascan
,1
]. Read-only.
- property samples_per_ascan: int#
Number of samples per A-scan. Returns either
samples_per_record
or the length ofresampling_samples
, if the latter is available. Read-only.
- property ascans_per_block: int#
Number of A-scans in each acquired buffer or block. Identical to
records_per_block
.
- validate()#
Check the configuration for errors.
- Raises
RuntimeError – If the configuration is invalid.
- copy()#
Create a copy of this configuration.
- Returns
NullProcessorConfig – The copy.
Copy#
- class vortex.process.CopyProcessor#
Copy data from input to output buffers with optional slicing (\(s[n]\)) and linear transformation (\(a\) and \(b\)).
\[y[i,j,k] = a + b x[i,s[j],k]\]OCT processing is not performed. This processor is intended primarily for acquisitions that provide processed OCT data, such as
AlazarFFTAcquisition
. Computation is performed on the CPU.- __init__(logger=None)#
Create a new object with optional logging.
- Parameters
logger (vortex.Logger) – Logger to receive status messages. Logging is disabled if not provided.
- initialize(config)#
Initialize the processor using the supplied configuration. All necessary internal buffers are allocated when this method returns.
- Parameters
config (CopyProcessorConfig) – New configuration to apply.
- change(config)#
Change the processor configuration. All configuration options may be changed, but changes to
CopyProcessorConfig.slots
will not have an effect.Danger
It is not safe to call this method while a block is currently processing.
- Parameters
config (CopyProcessorConfig) – New configuration to apply.
- next(input_buffer, output_buffer, id=0, append_history=True)#
Process the next buffer.
- Parameters
input_buffer (numpy.ndarray[numpy.uint16]) – The input buffer, with shape that matches
self.config.input_shape
.output_buffer (numpy.ndarray[numpy.int8]) – The output buffer, with shape that matches
self.config.output_shape
.id (int) – Number to associate with the buffer for logging purposes.
- next_async(input_buffer, output_buffer, callback, id=0, append_history=True)#
Process the next buffer asynchronously and execute the callback when complete.
Caution
The callback may be executed in the calling thread before this method returns if an error occurs while queueing the background acquisition.
- Parameters
input_buffer (numpy.ndarray[numpy.uint16]) – The input buffer, with shape that matches
self.config.input_shape
.output_buffer (numpy.ndarray[numpy.int8]) – The output buffer, with shape that matches
self.config.output_shape
.callback (Callable[[Exception], None]) – Callback to execute when buffer is processed. The callback receives as an argument any exception which occurred during processing.
id (int) – Number to associate with the buffer for logging purposes.
- property config: CopyProcessorConfig#
Copy of the active configuration.
- class vortex.process.CopyProcessorConfig#
Base:
NullProcessorConfig
Configuration object for
CopyProcessor
.- property sample_slice: [NullSlice | SimpleSlice]#
Optional slicing operation \(s[n]\) to apply along each record/A-scan. Defaults to
NullSlice()
.
- property sample_transform: [NullTransform | LinearTransform]#
Optional linear transformation \(a\) and \(b\) applied to each sample during copy. Defaults to
NullTransform()
.
- property slots: int#
Number of parallel processing pipelines. Adjust to achieve the desired CPU utilization for machines with high hardware concurrency. Recommended minimum is
2
slots to facilitate pipelining of successive blocks. The copy itself parallelized across all CPU cores; this field only affects pipeline-level parallelism. Defaults to2
.
- copy()#
Create a copy of this configuration.
- Returns
CopyProcessorConfig – The copy.
OCT#
These OCT processors perform averaging, resampling, filtering, and FFT operations to transform raw spectra into A-scans. These operations are applied in the order shown below.
Rolling average with window length \(M\).
\[\hat{x}[i,j,k] = x[i,j,k] - \frac{1}{M} \sum_{l=0}^{M-1}{ x[i - l,j,k] } ,\]Resampling with linear interpolation (\(r[j]\)).
\[z[i,j,k] = \big(\lceil r[j] \rceil - r[j] \big) \hat{x}\big[i, \lfloor r[j] \rfloor, k \big] + \big(r[j] - \lfloor r[j] \rfloor\big) \hat{x}\big[i, \lceil r[j] \rceil, k \big] ,\]Frequency-domain filtering (\(h[i,j,k]\)) and inverse FFT with normalization.
\[y[i,j,k] = \log_{10} \left| \frac{1}{N} \mathcal{F}^{-1} \big\{ h[:,j,:] z[i,j,k] \big\} \right|^2\]
All computation is performed in floating-point until the cast to the output datatype. Each operation can be enabled, disabled, and/or customized via the processor configuration options.
CPU#
- class vortex.process.CPUProcessor#
Perform OCT processing with averaging, resampling by linear interpolation, spectral filtering, and FFT on the CPU.
- __init__(logger=None)#
Create a new object with optional logging.
- Parameters
logger (vortex.Logger) – Logger to receive status messages. Logging is disabled if not provided.
- initialize(config)#
Initialize the processor using the supplied configuration. All necessary internal buffers are allocated when this method returns.
- Parameters
config (CPUProcessorConfig) – New configuration to apply.
- change(config)#
Change the processor configuration. If the change requires buffer reallocation, the pipeline is stalled and record history for the rolling average may be lost. May be called while a block is currently processing.
- Parameters
config (CPUProcessorConfig) – New configuration to apply.
- next(input_buffer, output_buffer, id=0, append_history=True)#
Process the next buffer.
- Parameters
input_buffer (numpy.ndarray[numpy.uint16]) – The input buffer, with shape that matches
self.config.input_shape
.output_buffer (numpy.ndarray[numpy.int8]) – The output buffer, with shape that matches
self.config.output_shape
.id (int) – Number to associate with the buffer for logging purposes.
append_history (bool) – Include the raw spectra from this input buffer in the rolling average.
- next_async(input_buffer, output_buffer, callback, id=0, append_history=True)#
Process the next buffer asynchronously and execute the callback when complete.
Caution
The callback may be executed in the calling thread before this method returns if an error occurs while queueing the background acquisition.
- Parameters
input_buffer (numpy.ndarray[numpy.uint16]) – The input buffer, with shape that matches
self.config.input_shape
.output_buffer (numpy.ndarray[numpy.int8]) – The output buffer, with shape that matches
self.config.output_shape
.callback (Callable[[Exception], None]) – Callback to execute when buffer is processed. The callback receives as an argument any exception which occurred during processing.
id (int) – Number to associate with the buffer for logging purposes.
append_history (bool) – Include the raw spectra from this input buffer in the rolling average.
- property config: CPUProcessorConfig#
Copy of the active configuration.
- class vortex.process.CPUProcessorConfig#
Base:
NullProcessorConfig
Configuration object for
CPUProcessor
.- property average_window: int#
Length \(M\) of rolling average window in records/A-scans for background subtraction. Disable background subtraction by setting to
0
. Disabled by default.Note
Record/A-scan history includes both active and inactive records.
Warning
Record/A-scan history is stored internally within the processor to allow
average_window
to exceedrecords_per_block
. Create a new processor to discard this history.
- property resampling_samples: numpy.ndarray[numpy.float32]#
Optional positions at which to resample the raw spectra prior FFT. The number of resampled positions determines
samples_per_record
. Valid positions are in the range [0
,samples_per_record
]. Set to an empty array ([]) to disable resampling. Disabled by default.
- property spectral_filter: numpy.ndarray[numpy.float32 | numpy.complex64]#
Optional spectral filter to multiply with the spectra after resampling but before the FFT. Must have shape compatible with [
samples_per_ascan
]; that is, the spectral filter should match the length of the output A-scan, not the input record. Set to an empty array ([]) to disable spectral filtering. Disabled by default.
- property enable_ifft: bool#
Enable the FFT for OCT processing with length \(N\) determined by
samples_per_ascan
. When enabled, the FFT is multiplied by the normalization factor of \(1 / N\). When disabled, the squared complex magnitude is performed instead and no normalization factor is applied. Enabled by default.Note
If FFT normalization is not desired, scale the spectral filter by \(N\) to cancel out the normalization factor.
- property enable_log10#
Enable the application of \(log_{10}(...)\) after the FFT. Enabled by default.
- property channel: int#
Index of channel to select for processing. Used only by engine.
Note
This property is likely to be moved from the processor configuration to the engine configuration in the future.
- property slots: int#
Number of parallel processing pipelines. Adjust to achieve the desired CPU utilization for machines with high hardware concurrency. Recommended minimum is
2-4
slots to facilitate pipelining of successive blocks but higher numbers will likely yield better throughput. Each processing step is individually parallelized across all CPU cores; this field only affects pipeline-level parallelism. Defaults to2
.
- copy()#
Create a copy of this configuration.
- Returns
CPUProcessorConfig – The copy.
GPU#
- class vortex.process.CUDAProcessor#
Perform OCT processing with averaging, resampling by linear interpolation, spectral filtering, and FFT on a CUDA-capable GPU.
- __init__(logger=None)#
Create a new object with optional logging.
- Parameters
logger (vortex.Logger) – Logger to receive status messages. Logging is disabled if not provided.
- initialize(config)#
Initialize the processor using the supplied configuration. All necessary internal buffers are allocated when this method returns.
- Parameters
config (CUDAProcessorConfig) – New configuration to apply.
- change(config)#
Change the processor configuration. May be called while a block is currently processing. If the change requires buffer reallocation, the pipeline is stalled and record history for the rolling average may be lost. Changes that require no buffer reallocation and take effect immediately are
disabling any processing step (e.g., resampling or FFT),
reducing the window of the rolling average, and
altering a non-empty spectral filter or resampling vector without increasing its length.
All other changes will likely require a buffer reallocation.
- Parameters
config (CUDAProcessorConfig) – New configuration to apply.
- next(input_buffer, output_buffer, id=0, append_history=True)#
Process the next buffer.
- Parameters
input_buffer (cupy.ndarray[cupy.uint16]) – The input buffer, with shape that matches
self.config.input_shape
.output_buffer (cupy.ndarray[cupy.int8]) – The output buffer, with shape that matches
self.config.output_shape
.id (int) – Number to associate with the buffer for logging purposes.
append_history (bool) – Include the raw spectra from this input buffer in the rolling average.
- next_async(input_buffer, output_buffer, callback, id=0, append_history=True)#
Process the next buffer asynchronously and execute the callback when complete.
Caution
The callback may be executed in the calling thread before this method returns if an error occurs while queueing the background acquisition.
- Parameters
input_buffer (cupy.ndarray[cupy.uint16]) – The input buffer, with shape that matches
self.config.input_shape
.output_buffer (cupy.ndarray[cupy.int8]) – The output buffer, with shape that matches
self.config.output_shape
.callback (Callable[[Exception], None]) – Callback to execute when buffer is processed. The callback receives as an argument any exception which occurred during processing.
id (int) – Number to associate with the buffer for logging purposes.
append_history (bool) – Include the raw spectra from this input buffer in the rolling average.
- property config: CUDAProcessorConfig#
Copy of the current configuration.
- class vortex.process.CUDAProcessorConfig#
Base:
CPUProcessorConfig
Configuration object for
CUDAProcessor
.- property slots: int#
Number of parallel CUDA streams to use for processing. Recommended minimum is
2
slots to facilitate pipelining of successive blocks. For GPUs with sufficient compute resources, increasing the number of slots could enable parallel computation. Defaults to2
.
- copy()#
Create a copy of this configuration.
- Returns
CUDAProcessorConfig – The copy.