Advanced Topics
This document covers advanced features and optimization techniques in arraybridge, including OOM recovery, performance tuning, and advanced patterns.
Out-of-Memory (OOM) Recovery
arraybridge provides automatic OOM recovery for GPU operations, helping prevent crashes from memory exhaustion.
Automatic OOM Recovery
Enable OOM recovery with decorators:
from arraybridge import torch
@torch(gpu_id=0, oom_recovery=True, clear_cuda_cache=True)
def memory_intensive_operation(data):
"""Automatically handles OOM errors."""
# Will retry with cache clearing if OOM occurs
return data @ data.T @ data
# Won't crash on OOM - will retry after clearing cache
result = memory_intensive_operation(large_array)
How OOM Recovery Works
When enabled, arraybridge:
Detects OOM: Catches framework-specific OOM errors
Clears Cache: Runs garbage collection and clears GPU caches
Retries: Attempts the operation again
Falls Back: If retry fails, raises the original error
# Pseudo-code of OOM recovery process
def with_oom_recovery(func):
try:
return func()
except OutOfMemoryError:
# Clear memory
gc.collect()
torch.cuda.empty_cache()
# Retry once
return func()
Manual OOM Handling
Implement custom OOM recovery:
import torch
import gc
def process_with_fallback(data):
"""Process with manual OOM handling."""
try:
# Try full batch
return process_on_gpu(data)
except RuntimeError as e:
if "out of memory" in str(e):
# Clear memory
gc.collect()
torch.cuda.empty_cache()
# Try with smaller batch
half_size = len(data) // 2
result1 = process_on_gpu(data[:half_size])
result2 = process_on_gpu(data[half_size:])
return torch.cat([result1, result2])
raise
Batch Size Reduction
Automatically reduce batch size on OOM:
def adaptive_batch_process(data, initial_batch_size=32):
"""Adaptively reduce batch size on OOM."""
batch_size = initial_batch_size
results = []
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
while True:
try:
result = process_batch(batch)
results.append(result)
break
except RuntimeError as e:
if "out of memory" in str(e):
# Reduce batch size
batch_size = max(1, batch_size // 2)
torch.cuda.empty_cache()
# Retry with smaller batch
batch = batch[:batch_size]
else:
raise
return results
Memory Management Strategies
Strategy 1: Gradient Checkpointing
import torch.utils.checkpoint as checkpoint
def memory_efficient_forward(x):
"""Use checkpointing to save memory."""
# Trade compute for memory
return checkpoint.checkpoint(expensive_layer, x)
Strategy 2: Mixed Precision
from torch.cuda.amp import autocast
@torch(gpu_id=0)
def mixed_precision_op(x):
"""Use FP16 for memory savings."""
with autocast():
return x @ x.T
Strategy 3: CPU Offloading
def cpu_offload_process(data):
"""Offload intermediate results to CPU."""
# Process in stages
stage1 = process_gpu_stage1(data).cpu()
stage2 = process_gpu_stage2(stage1.cuda())
stage3 = process_gpu_stage3(stage2)
return stage3
Performance Optimization
Conversion Performance
Use Zero-Copy When Possible:
import cupy as cp
from arraybridge import convert_memory
# Zero-copy via DLPack (fast!)
cupy_data = cp.random.rand(1000, 1000)
torch_data = convert_memory(cupy_data, 'cupy', 'torch', gpu_id=0)
# Verify zero-copy
cupy_data[0, 0] = 999
print(torch_data[0, 0]) # Also 999 - same memory!
Batch Conversions:
# Bad: Convert in loop
for item in items:
gpu_item = convert_memory(item, 'numpy', 'torch', gpu_id=0)
process(gpu_item)
# Good: Batch convert
batch = np.stack(items)
gpu_batch = convert_memory(batch, 'numpy', 'torch', gpu_id=0)
for i in range(len(gpu_batch)):
process(gpu_batch[i])
Memory Layout Optimization
Contiguous Arrays:
import numpy as np
# Ensure contiguous for fast conversion
non_contiguous = data[::2, ::2] # Strided view
contiguous = np.ascontiguousarray(non_contiguous)
# Faster conversion
gpu_data = convert_memory(contiguous, 'numpy', 'torch', gpu_id=0)
Optimal Data Types:
# Use float32 instead of float64 when possible
data_f32 = np.array(data, dtype=np.float32) # Half the memory
gpu_data = convert_memory(data_f32, 'numpy', 'torch', gpu_id=0)
Caching and Memoization
from functools import lru_cache
from arraybridge import convert_memory
class GPUCache:
"""Cache GPU conversions."""
def __init__(self):
self.cache = {}
def get_or_convert(self, data_id, data, target_type, gpu_id=0):
"""Get cached GPU data or convert."""
key = (data_id, target_type, gpu_id)
if key not in self.cache:
self.cache[key] = convert_memory(
data, 'numpy', target_type, gpu_id=gpu_id
)
return self.cache[key]
def clear(self):
"""Clear cache."""
self.cache.clear()
Profiling and Debugging
Timing Conversions
import time
from arraybridge import convert_memory
def time_conversion(data, source, target, gpu_id=0):
"""Time a conversion."""
start = time.time()
result = convert_memory(data, source, target, gpu_id=gpu_id)
# Synchronize GPU if needed
if target in ['torch', 'cupy']:
if target == 'torch':
import torch
torch.cuda.synchronize()
else:
import cupy as cp
cp.cuda.Stream.null.synchronize()
elapsed = time.time() - start
print(f"{source} → {target}: {elapsed*1000:.2f} ms")
return result
Memory Profiling
PyTorch Memory Profiling:
import torch
def profile_memory(func):
"""Profile GPU memory usage."""
torch.cuda.reset_peak_memory_stats()
torch.cuda.empty_cache()
result = func()
peak_memory = torch.cuda.max_memory_allocated() / 1e9
print(f"Peak GPU memory: {peak_memory:.2f} GB")
return result
CuPy Memory Profiling:
import cupy as cp
def profile_cupy_memory(func):
"""Profile CuPy memory usage."""
mempool = cp.get_default_memory_pool()
mempool.free_all_blocks()
result = func()
used = mempool.used_bytes() / 1e9
total = mempool.total_bytes() / 1e9
print(f"CuPy memory: {used:.2f} GB used, {total:.2f} GB total")
return result
Debugging Conversions
from arraybridge import detect_memory_type, convert_memory
import logging
logging.basicConfig(level=logging.DEBUG)
def debug_convert(data, target_type, gpu_id=0):
"""Convert with debug logging."""
source_type = detect_memory_type(data)
logging.debug(f"Source type: {source_type}")
logging.debug(f"Source shape: {data.shape}")
logging.debug(f"Source dtype: {data.dtype}")
result = convert_memory(data, source_type, target_type, gpu_id=gpu_id)
logging.debug(f"Target type: {target_type}")
logging.debug(f"Target shape: {result.shape}")
logging.debug(f"Target dtype: {result.dtype}")
return result
Advanced Patterns
Pattern: Lazy Conversion
class LazyArray:
"""Lazy conversion wrapper."""
def __init__(self, data):
self.data = data
self.cached_conversions = {}
def as_type(self, target_type, gpu_id=0):
"""Convert only when needed."""
key = (target_type, gpu_id)
if key not in self.cached_conversions:
source_type = detect_memory_type(self.data)
self.cached_conversions[key] = convert_memory(
self.data, source_type, target_type, gpu_id=gpu_id
)
return self.cached_conversions[key]
# Usage
lazy = LazyArray(np_data)
torch_data = lazy.as_type('torch', gpu_id=0) # Converts
torch_data2 = lazy.as_type('torch', gpu_id=0) # Cached
Pattern: Conversion Pipeline
from arraybridge import convert_memory
class ConversionPipeline:
"""Chain multiple conversions and operations."""
def __init__(self, data):
self.data = data
self.operations = []
def convert_to(self, target_type, gpu_id=0):
"""Add conversion step."""
def op(data):
source_type = detect_memory_type(data)
return convert_memory(data, source_type, target_type, gpu_id)
self.operations.append(op)
return self
def apply(self, func):
"""Add processing step."""
self.operations.append(func)
return self
def execute(self):
"""Execute pipeline."""
result = self.data
for op in self.operations:
result = op(result)
return result
# Usage
result = (ConversionPipeline(np_data)
.convert_to('torch', gpu_id=0)
.apply(lambda x: x * 2)
.apply(lambda x: x + 1)
.convert_to('numpy')
.execute())
Pattern: Framework Fallback Chain
def try_frameworks(data, operation, frameworks=['cupy', 'torch', 'numpy']):
"""Try operation with different frameworks."""
errors = []
for framework in frameworks:
try:
# Convert to framework
source_type = detect_memory_type(data)
converted = convert_memory(
data, source_type, framework, gpu_id=0
)
# Try operation
result = operation(converted)
# Convert back
return convert_memory(result, framework, source_type)
except Exception as e:
errors.append((framework, str(e)))
continue
# All frameworks failed
raise RuntimeError(f"All frameworks failed: {errors}")
# Usage
result = try_frameworks(data, lambda x: x @ x.T)
Pattern: Multi-Backend Abstraction
class MultiBackendArray:
"""Array that works with any backend."""
def __init__(self, data, preferred_backend='torch'):
self.data = data
self.backend = preferred_backend
self._cached = None
def _ensure_backend(self):
"""Ensure data is in preferred backend."""
if self._cached is None:
source_type = detect_memory_type(self.data)
self._cached = convert_memory(
self.data, source_type, self.backend, gpu_id=0
)
return self._cached
def __matmul__(self, other):
"""Matrix multiplication."""
self_backend = self._ensure_backend()
other_backend = other._ensure_backend()
result = self_backend @ other_backend
return MultiBackendArray(result, self.backend)
def to_numpy(self):
"""Convert to NumPy."""
backend_data = self._ensure_backend()
return convert_memory(backend_data, self.backend, 'numpy')
Thread and Process Safety
Thread-Local GPU Contexts
import threading
from arraybridge import convert_memory
class ThreadLocalGPU:
"""Thread-local GPU device management."""
def __init__(self):
self.local = threading.local()
def get_gpu_id(self):
"""Get GPU ID for current thread."""
if not hasattr(self.local, 'gpu_id'):
# Assign GPU based on thread ID
self.local.gpu_id = threading.get_ident() % 4
return self.local.gpu_id
def convert(self, data, target_type):
"""Convert using thread-local GPU."""
gpu_id = self.get_gpu_id()
source_type = detect_memory_type(data)
return convert_memory(data, source_type, target_type, gpu_id=gpu_id)
# Usage
gpu_manager = ThreadLocalGPU()
def worker(data):
# Each thread uses its own GPU
gpu_data = gpu_manager.convert(data, 'torch')
return process(gpu_data)
Multiprocessing with GPUs
from multiprocessing import Pool
from arraybridge import convert_memory
def init_worker(gpu_id):
"""Initialize worker with specific GPU."""
global worker_gpu_id
worker_gpu_id = gpu_id
def process_with_gpu(data):
"""Process using worker's GPU."""
global worker_gpu_id
gpu_data = convert_memory(
data, 'numpy', 'torch', gpu_id=worker_gpu_id
)
result = process(gpu_data)
return convert_memory(result, 'torch', 'numpy')
# Create pool with 4 workers, each on different GPU
with Pool(4, initializer=init_worker, initargs=(range(4),)) as pool:
results = pool.map(process_with_gpu, batches)
Custom Memory Types
While arraybridge doesn’t support custom memory types out-of-the-box, you can wrap conversions:
class CustomArrayWrapper:
"""Wrapper for custom array types."""
def __init__(self, custom_array):
self.custom_array = custom_array
def to_numpy(self):
"""Convert custom array to NumPy."""
# Implement custom conversion logic
return np.array(self.custom_array.data)
def convert_to(self, target_type):
"""Convert to arraybridge-supported type."""
np_data = self.to_numpy()
return convert_memory(np_data, 'numpy', target_type)
Integration with Other Libraries
scikit-learn Integration
from sklearn.decomposition import PCA
from arraybridge import convert_memory
def gpu_accelerated_pca(data, n_components=10):
"""PCA with GPU-accelerated computation."""
# Convert to GPU for covariance computation
gpu_data = convert_memory(data, 'numpy', 'torch', gpu_id=0)
# Compute covariance on GPU
centered = gpu_data - gpu_data.mean(dim=0)
cov = (centered.T @ centered) / len(gpu_data)
# Back to NumPy for sklearn
np_cov = convert_memory(cov, 'torch', 'numpy')
# Use sklearn for eigendecomposition
pca = PCA(n_components=n_components)
pca.fit_transform(np_data)
return pca
Dask Integration
import dask.array as da
from arraybridge import convert_memory
def dask_gpu_process(dask_array):
"""Process Dask array with GPU."""
def process_chunk(chunk):
# Convert chunk to GPU
gpu_chunk = convert_memory(chunk, 'numpy', 'torch', gpu_id=0)
# Process
result = gpu_process(gpu_chunk)
# Back to NumPy
return convert_memory(result, 'torch', 'numpy')
return dask_array.map_blocks(process_chunk)
Troubleshooting
Common Issues and Solutions
Issue: Slow Conversions
Solution: Use zero-copy when possible, ensure contiguous arrays
Issue: OOM Errors
Solution: Enable OOM recovery, reduce batch sizes, use mixed precision
Issue: Incorrect Results
Solution: Check dtype preservation, verify device placement
Issue: Memory Leaks
Solution: Clear caches regularly, delete unused variables
See Also
GPU Features for GPU-specific features
Conversion System for conversion details
Decorator System for decorator usage
API Reference for complete API