Stack Utilities
arraybridge provides utilities for stacking and unstacking 2D slices into 3D volumes, with support for all frameworks and automatic memory type conversion.
Overview
Stack utilities are useful for:
Processing image stacks (e.g., microscopy, medical imaging)
Batch processing of 2D data
Creating 3D volumes from 2D slices
Efficient slice-by-slice processing
Memory-efficient volume operations
Core Functions
stack_slices
Stack a list of 2D arrays into a 3D volume.
from arraybridge import stack_slices
import numpy as np
# Create list of 2D slices
slices = [np.random.rand(512, 512) for _ in range(100)]
# Stack into 3D volume (100, 512, 512)
volume = stack_slices(slices, memory_type='numpy', gpu_id=0)
Parameters:
slices: List of 2D arraysmemory_type: Target memory type (‘numpy’, ‘cupy’, ‘torch’, ‘tensorflow’, ‘jax’, ‘pyclesperanto’)gpu_id: GPU device ID (required, validated for GPU memory types)
Returns:
3D array/tensor with shape (num_slices, height, width)
Raises:
ValueError: If slices have inconsistent shapes or are not 2D
unstack_slices
Unstack a 3D volume into a list of 2D slices.
from arraybridge import unstack_slices
# Create 3D volume
volume = np.random.rand(100, 512, 512)
# Unstack into list of 2D slices
slices = unstack_slices(volume, memory_type='numpy', gpu_id=0)
print(len(slices)) # 100
print(slices[0].shape) # (512, 512)
Parameters:
array: 3D array/tensor with shape (depth, height, width)memory_type: Target memory type for output slicesgpu_id: GPU device ID (required, validated for GPU memory types)validate_slices: If True, validates that each extracted slice is 2D (default: True)
Returns:
List of 2D arrays/tensors
Raises:
ValueError: If input is not 3D
Basic Usage
NumPy Stacking
from arraybridge import stack_slices, unstack_slices
import numpy as np
# Create slices
slices = [np.random.rand(256, 256) for _ in range(50)]
# Stack
volume = stack_slices(slices, memory_type='numpy', gpu_id=0)
print(volume.shape) # (50, 256, 256)
# Unstack
recovered_slices = unstack_slices(volume, memory_type='numpy', gpu_id=0)
print(len(recovered_slices)) # 50
GPU Stacking
from arraybridge import stack_slices
import numpy as np
# Create CPU slices
cpu_slices = [np.random.rand(512, 512) for _ in range(100)]
# Stack directly to GPU
gpu_volume = stack_slices(
cpu_slices,
memory_type='cupy',
gpu_id=0
)
# Process entire volume on GPU
result = gpu_volume.sum(axis=0)
Cross-Framework Stacking
from arraybridge import stack_slices
import torch
# PyTorch slices
torch_slices = [torch.rand(128, 128) for _ in range(20)]
# Stack to NumPy
np_volume = stack_slices(torch_slices, memory_type='numpy', gpu_id=0)
# Or stack to CuPy
cupy_volume = stack_slices(torch_slices, memory_type='cupy', gpu_id=0)
Image Processing
Slice-by-Slice Processing
Process each slice individually:
from arraybridge import unstack_slices, stack_slices
import numpy as np
def process_slice(slice_2d):
"""Apply processing to single slice."""
return slice_2d * 2 + 1
# Load volume
volume = np.random.rand(100, 512, 512)
# Unstack, process, restack
slices = unstack_slices(volume, memory_type='numpy', gpu_id=0)
processed_slices = [process_slice(s) for s in slices]
processed_volume = stack_slices(processed_slices, memory_type='numpy', gpu_id=0)
GPU-Accelerated Processing
from arraybridge import unstack_slices, stack_slices, convert_memory
import cupyx.scipy.ndimage as ndi
def gpu_filter_volume(volume):
"""Apply GPU filtering to each slice."""
# Unstack to list
slices = unstack_slices(volume, memory_type='numpy', gpu_id=0)
# Process each slice on GPU
filtered_slices = []
for slice_2d in slices:
# Move to GPU
gpu_slice = convert_memory(
slice_2d,
source_type='numpy',
target_type='cupy',
gpu_id=0
)
# Apply filter
filtered = ndi.gaussian_filter(gpu_slice, sigma=2.0)
# Move back to CPU
cpu_filtered = convert_memory(
filtered,
source_type='cupy',
target_type='numpy',
gpu_id=0
)
filtered_slices.append(cpu_filtered)
# Restack
return stack_slices(filtered_slices, memory_type='numpy', gpu_id=0)
Batch Processing
Process slices in batches for efficiency:
from arraybridge import unstack_slices, stack_slices
def batch_process_volume(volume, batch_size=10):
"""Process volume in batches."""
slices = unstack_slices(volume, memory_type='numpy', gpu_id=0)
processed_slices = []
for i in range(0, len(slices), batch_size):
batch = slices[i:i+batch_size]
# Stack batch
batch_volume = stack_slices(batch, memory_type='torch', gpu_id=0)
# Process batch on GPU
processed_batch = process_on_gpu(batch_volume)
# Unstack batch
batch_slices = unstack_slices(processed_batch, memory_type='numpy', gpu_id=0)
processed_slices.extend(batch_slices)
return stack_slices(processed_slices, memory_type='numpy', gpu_id=0)
Medical Imaging Applications
CT/MRI Volume Processing
from arraybridge import stack_slices, unstack_slices
import numpy as np
def load_dicom_slices(dicom_dir):
"""Load DICOM slices (simplified example)."""
# Load DICOM files and extract pixel arrays
slices = []
for filename in sorted(os.listdir(dicom_dir)):
# ... load DICOM slice ...
slices.append(pixel_array)
return slices
def process_medical_volume(dicom_dir):
"""Process medical imaging volume."""
# Load slices
slices = load_dicom_slices(dicom_dir)
# Stack into volume
volume = stack_slices(slices, memory_type='numpy', gpu_id=0)
# Apply processing (e.g., segmentation, registration)
processed = medical_processing(volume)
# Unstack for saving
output_slices = unstack_slices(processed, memory_type='numpy', gpu_id=0)
return output_slices
Microscopy Image Stacks
from arraybridge import stack_slices
from scipy import ndimage
def process_microscopy_stack(image_files):
"""Process microscopy Z-stack."""
# Load images
slices = [load_image(f) for f in image_files]
# Stack
stack = stack_slices(slices, memory_type='numpy', gpu_id=0)
# Maximum intensity projection
mip = stack.max(axis=0)
# Average intensity projection
avg = stack.mean(axis=0)
return mip, avg
Performance Optimization
Memory-Efficient Stacking
For large volumes, consider processing in chunks:
def memory_efficient_stack(slices, chunk_size=10):
"""Stack large volumes in chunks."""
chunks = []
for i in range(0, len(slices), chunk_size):
chunk_slices = slices[i:i+chunk_size]
chunk = stack_slices(chunk_slices, memory_type='numpy', gpu_id=0)
chunks.append(chunk)
# Concatenate chunks
return np.concatenate(chunks, axis=0)
Lazy Loading
Load and process slices on-demand:
class LazyVolumeProcessor:
"""Lazy slice loading and processing."""
def __init__(self, slice_files):
self.slice_files = slice_files
def __len__(self):
return len(self.slice_files)
def __getitem__(self, idx):
"""Load slice on-demand."""
slice_data = load_image(self.slice_files[idx])
return process_slice(slice_data)
def to_volume(self):
"""Stack all slices."""
slices = [self[i] for i in range(len(self))]
return stack_slices(slices, memory_type='numpy', gpu_id=0)
Parallel Processing
Process slices in parallel:
from concurrent.futures import ThreadPoolExecutor
from arraybridge import stack_slices
def parallel_process_slices(slices, num_workers=4):
"""Process slices in parallel."""
def process_one(slice_2d):
return expensive_operation(slice_2d)
with ThreadPoolExecutor(max_workers=num_workers) as executor:
processed_slices = list(executor.map(process_one, slices))
return stack_slices(processed_slices, memory_type='numpy', gpu_id=0)
Multi-GPU Stacking
Distribute slices across GPUs:
from arraybridge import convert_memory, stack_slices
def multi_gpu_stack_process(slices, num_gpus=4):
"""Process slices across multiple GPUs."""
results = []
for i, slice_2d in enumerate(slices):
gpu_id = i % num_gpus
# Move to GPU
gpu_slice = convert_memory(
slice_2d,
source_type='numpy',
target_type='torch',
gpu_id=gpu_id
)
# Process
processed = process_on_gpu(gpu_slice)
# Move back
cpu_slice = convert_memory(
processed,
source_type='torch',
target_type='numpy',
gpu_id=gpu_id
)
results.append(cpu_slice)
return stack_slices(results, memory_type='numpy', gpu_id=0)
Advanced Usage
Custom Stack Dimensions
Stack along different axes:
from arraybridge import stack_slices
import numpy as np
slices = [np.random.rand(10, 20) for _ in range(5)]
# Default: stack along axis 0 → (5, 10, 20)
volume1 = stack_slices(slices, memory_type='numpy', gpu_id=0)
# For other axes, use NumPy directly after stacking
volume2 = np.moveaxis(volume1, 0, 2) # (10, 20, 5)
Mixed Precision Stacking
from arraybridge import stack_slices
import numpy as np
# Create float32 slices
slices_f32 = [np.random.rand(256, 256).astype(np.float32)
for _ in range(50)]
# Stack maintains dtype
volume = stack_slices(slices_f32, memory_type='numpy', gpu_id=0)
print(volume.dtype) # float32
Weighted Stacking
Apply weights during stacking:
from arraybridge import stack_slices
import numpy as np
def weighted_stack(slices, weights):
"""Stack with per-slice weights."""
# Stack normally
volume = stack_slices(slices, memory_type='numpy', gpu_id=0)
# Apply weights
weights = np.array(weights).reshape(-1, 1, 1)
weighted_volume = volume * weights
return weighted_volume
slices = [np.random.rand(100, 100) for _ in range(10)]
weights = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
result = weighted_stack(slices, weights)
Common Patterns
Pattern: Volume Iterator
class VolumeIterator:
"""Iterate over volume slices."""
def __init__(self, volume):
self.volume = volume
self.slices = unstack_slices(volume, memory_type='numpy', gpu_id=0)
def __iter__(self):
return iter(self.slices)
def __len__(self):
return len(self.slices)
def __getitem__(self, idx):
return self.slices[idx]
# Usage
volume = np.random.rand(100, 512, 512)
for slice_2d in VolumeIterator(volume):
process(slice_2d)
Pattern: Slice Cache
from functools import lru_cache
class SliceCache:
"""Cache processed slices."""
def __init__(self, volume):
self.volume = volume
self.slices = unstack_slices(volume, memory_type='numpy', gpu_id=0)
@lru_cache(maxsize=32)
def get_processed_slice(self, idx):
"""Get processed slice with caching."""
return process_expensive(self.slices[idx])
Pattern: Progressive Loading
def progressive_volume_processor(slice_generator):
"""Process volume progressively as slices arrive."""
processed_slices = []
for slice_2d in slice_generator:
# Process each slice as it arrives
processed = process_slice(slice_2d)
processed_slices.append(processed)
# Optionally save intermediate results
if len(processed_slices) % 10 == 0:
save_checkpoint(processed_slices)
return stack_slices(processed_slices, memory_type='numpy', gpu_id=0)
Error Handling
Shape Validation
from arraybridge import stack_slices
def safe_stack(slices):
"""Stack with shape validation."""
if not slices:
raise ValueError("Empty slice list")
# Check all slices have same shape
first_shape = slices[0].shape
for i, s in enumerate(slices[1:], 1):
if s.shape != first_shape:
raise ValueError(
f"Slice {i} shape {s.shape} != first shape {first_shape}"
)
return stack_slices(slices, memory_type='numpy', gpu_id=0)
Memory Error Handling
def robust_stack(slices, memory_type='numpy', gpu_id=0):
"""Stack with memory error handling."""
try:
return stack_slices(slices, memory_type=memory_type, gpu_id=gpu_id)
except MemoryError:
# Try processing in smaller chunks
print("Memory error, processing in chunks...")
return memory_efficient_stack(slices, chunk_size=5)
Testing
Unit Tests
import pytest
from arraybridge import stack_slices, unstack_slices
import numpy as np
def test_stack_unstack_roundtrip():
"""Test stack/unstack preserves data."""
slices = [np.random.rand(10, 10) for _ in range(5)]
volume = stack_slices(slices, memory_type='numpy', gpu_id=0)
recovered = unstack_slices(volume, memory_type='numpy', gpu_id=0)
assert len(recovered) == len(slices)
for orig, rec in zip(slices, recovered):
np.testing.assert_array_almost_equal(orig, rec)
API Reference
See API Reference for complete function signatures.
Next Steps
Learn about Conversion System for type conversion details
Explore GPU Features for GPU-accelerated processing
Check Advanced Topics for optimization strategies
Review User Guide for more examples