Parallel Processing with SafeMapper

Overview

SafeMapper provides fault-tolerant parallel processing through the s_future_* family of functions. These are drop-in replacements for furrr functions with automatic checkpointing.

library(SafeMapper)

Why Parallel + Fault Tolerance?

┌─────────────────────────────────────────────────────────────────────────────┐
│                    The Challenge of Parallel Processing                      │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Traditional Parallel Processing:                                           │
│   ┌───────────────────────────────────────────────────────────────────┐     │
│   │  Worker 1: ████████████████████                                   │     │
│   │  Worker 2: █████████████████████████                              │     │
│   │  Worker 3: ██████████████████████████████                         │     │
│   │  Worker 4: ████████████████████████████ ❌ CRASH!                 │     │
│   └───────────────────────────────────────────────────────────────────┘     │
│                                                                              │
│   Result: ALL workers' progress lost, must restart everything               │
│                                                                              │
│   SafeMapper Parallel Processing:                                            │
│   ┌───────────────────────────────────────────────────────────────────┐     │
│   │  Worker 1: ████████████████████ 💾                                │     │
│   │  Worker 2: █████████████████████████ 💾                           │     │
│   │  Worker 3: ██████████████████████████████ 💾                      │     │
│   │  Worker 4: ████████████████████████████ ❌ CRASH!                 │     │
│   └───────────────────────────────────────────────────────────────────┘     │
│                                                                              │
│   Result: Resume from last checkpoint, only redo partial work               │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Prerequisites

SafeMapper’s parallel functions require the furrr and future packages:

install.packages(c("furrr", "future"))

Setting Up Parallel Processing

Step 1: Load Required Packages

library(SafeMapper)
library(future)

Step 2: Configure Workers

# Use multiple R sessions (works on all platforms)
plan(multisession, workers = 4)

# Or use forked processes (faster, but Unix/Mac only)
# plan(multicore, workers = 4)

Step 3: Use s_future_* Functions

# Instead of furrr::future_map()
result <- s_future_map(1:1000, expensive_function)

Available Parallel Functions

┌─────────────────────────────────────────────────────────────────────────────┐
│                    s_future_* Function Family                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Single-Input                                                               │
│   ┌────────────────────────────────────────────────────────────────────┐    │
│   │  s_future_map      ──► Parallel map, returns list                  │    │
│   │  s_future_map_chr  ──► Returns character vector                    │    │
│   │  s_future_map_dbl  ──► Returns numeric vector                      │    │
│   │  s_future_map_int  ──► Returns integer vector                      │    │
│   │  s_future_map_lgl  ──► Returns logical vector                      │    │
│   │  s_future_map_dfr  ──► Returns row-bound data frame                │    │
│   │  s_future_map_dfc  ──► Returns column-bound data frame             │    │
│   └────────────────────────────────────────────────────────────────────┘    │
│                                                                              │
│   Dual-Input                                                                 │
│   ┌────────────────────────────────────────────────────────────────────┐    │
│   │  s_future_map2      ──► Parallel map with two inputs               │    │
│   │  s_future_map2_chr  ──► Returns character vector                   │    │
│   │  s_future_map2_dbl  ──► Returns numeric vector                     │    │
│   │  s_future_map2_int  ──► Returns integer vector                     │    │
│   │  s_future_map2_lgl  ──► Returns logical vector                     │    │
│   │  s_future_map2_dfr  ──► Returns row-bound data frame               │    │
│   │  s_future_map2_dfc  ──► Returns column-bound data frame            │    │
│   └────────────────────────────────────────────────────────────────────┘    │
│                                                                              │
│   Multi-Input & Side Effects                                                 │
│   ┌────────────────────────────────────────────────────────────────────┐    │
│   │  s_future_pmap     ──► Parallel map with multiple inputs           │    │
│   │  s_future_imap     ──► Parallel indexed map                        │    │
│   │  s_future_walk     ──► Parallel side effects                       │    │
│   │  s_future_walk2    ──► Parallel dual-input side effects            │    │
│   └────────────────────────────────────────────────────────────────────┘    │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Basic Usage Examples

s_future_map

library(future)
plan(multisession, workers = 2)

# CPU-intensive computation
result <- s_future_map(1:100, function(x) {
  Sys.sleep(0.1)  # Simulate work
  x^2
})

# Reset to sequential
plan(sequential)

s_future_map2

plan(multisession, workers = 2)

x <- 1:50
y <- 51:100

# Process pairs in parallel
results <- s_future_map2(x, y, function(a, b) {
  Sys.sleep(0.1)
  a * b
})

plan(sequential)

s_future_pmap

plan(multisession, workers = 2)

params <- list(
  a = 1:30,
  b = 31:60,
  c = 61:90
)

# Process multiple inputs in parallel
results <- s_future_pmap(params, function(a, b, c) {
  Sys.sleep(0.1)
  a + b + c
})

plan(sequential)

Execution Flow

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Parallel Processing Flow                                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │  s_future_map(data, func)                                            │   │
│   └────────────────────────────────┬────────────────────────────────────┘   │
│                                    │                                         │
│                                    ▼                                         │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │  1. Check for existing checkpoint                                    │   │
│   │     ├── Found: Resume from checkpoint                               │   │
│   │     └── Not found: Start fresh                                      │   │
│   └────────────────────────────────┬────────────────────────────────────┘   │
│                                    │                                         │
│                                    ▼                                         │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │  2. Split data into batches                                          │   │
│   │     Data: [1, 2, 3, ..., 1000]                                      │   │
│   │     Batch 1: [1-100], Batch 2: [101-200], ...                       │   │
│   └────────────────────────────────┬────────────────────────────────────┘   │
│                                    │                                         │
│                                    ▼                                         │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │  3. Process each batch with furrr::future_map                        │   │
│   │                                                                      │   │
│   │     ┌─────────────────────────────────────────────────────────┐     │   │
│   │     │  Batch items distributed to workers                     │     │   │
│   │     │                                                         │     │   │
│   │     │  Worker 1: [1-25]    ────►  Results [1-25]             │     │   │
│   │     │  Worker 2: [26-50]   ────►  Results [26-50]            │     │   │
│   │     │  Worker 3: [51-75]   ────►  Results [51-75]            │     │   │
│   │     │  Worker 4: [76-100]  ────►  Results [76-100]           │     │   │
│   │     └─────────────────────────────────────────────────────────┘     │   │
│   │                                                                      │   │
│   └────────────────────────────────┬────────────────────────────────────┘   │
│                                    │                                         │
│                                    ▼                                         │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │  4. Save checkpoint after each batch                                 │   │
│   │     💾 Checkpoint: "Batch 1 complete, 100/1000 items"               │   │
│   └────────────────────────────────┬────────────────────────────────────┘   │
│                                    │                                         │
│                                    ▼                                         │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │  5. Repeat until all batches complete                                │   │
│   │     Then: Delete checkpoint, return full results                    │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Configuration Options

Batch Size for Parallel

For parallel processing, batch size affects both checkpoint frequency and parallel efficiency:

# Larger batches = more efficient parallel execution
# But less frequent checkpoints
s_configure(batch_size = 200)

# Smaller batches = more frequent checkpoints  
# But more overhead from parallelization
s_configure(batch_size = 50)

furrr Options

Pass furrr options through .options parameter:

# Custom furrr options
opts <- furrr::furrr_options(
  seed = 123,           # Reproducible random numbers
  globals = TRUE,       # Export global variables
  packages = "dplyr"    # Load packages in workers
)

result <- s_future_map(
  1:100,
  my_function,
  .options = opts
)

When to Use Parallel Processing

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Decision Tree: Sequential vs Parallel                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Is each operation CPU-intensive (> 100ms)?                                │
│   │                                                                          │
│   ├── YES ──► Is total data size > 100 items?                               │
│   │           │                                                              │
│   │           ├── YES ──► ✅ Use s_future_map (parallel)                    │
│   │           │                                                              │
│   │           └── NO ───► ⚠️  Overhead may outweigh benefit                 │
│   │                        Use s_map (sequential)                           │
│   │                                                                          │
│   └── NO ───► Is the operation I/O bound (network, disk)?                   │
│               │                                                              │
│               ├── YES ──► ⚠️  Parallel may help, but consider:              │
│               │              - Rate limits                                  │
│               │              - Connection pools                             │
│               │              - Resource contention                          │
│               │                                                              │
│               └── NO ───► ❌ Use s_map (sequential)                         │
│                            Parallel overhead not worth it                   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Good Use Cases for Parallel

# 1. Heavy computations
s_future_map(large_datasets, function(data) {
  # Complex statistical model fitting
  fit_complex_model(data)
})

# 2. Image/file processing
s_future_map(image_files, function(file) {
  # CPU-intensive image transformation
  process_image(file)
})

# 3. Simulations
s_future_map(1:1000, function(i) {
  # Monte Carlo simulation
  run_simulation(seed = i)
})

Poor Use Cases for Parallel

# 1. Simple operations (overhead > benefit)
# DON'T:
s_future_map(1:1000, ~ .x + 1)  # Too simple
# DO:
s_map(1:1000, ~ .x + 1)

# 2. Rate-limited API calls
# DON'T:
s_future_map(urls, fetch_api)  # May hit rate limits
# DO:
s_map(urls, fetch_api)  # Sequential respects rate limits

Handling Progress

# Enable progress bar
result <- s_future_map(
  1:100,
  slow_function,
  .progress = TRUE
)

Error Handling in Parallel

When errors occur in parallel execution, SafeMapper handles them gracefully:

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Error Handling in Parallel                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Batch Processing with Error:                                               │
│   ┌───────────────────────────────────────────────────────────────────┐     │
│   │  Batch 1: [1-100]   ✅ Success  ──► 💾 Save checkpoint            │     │
│   │  Batch 2: [101-200] ✅ Success  ──► 💾 Save checkpoint            │     │
│   │  Batch 3: [201-300] ❌ Error in worker                            │     │
│   │                     │                                              │     │
│   │                     ▼                                              │     │
│   │              Retry entire batch (up to N attempts)                │     │
│   │                     │                                              │     │
│   │                     ├── Success ──► Continue                      │     │
│   │                     └── Fail ────► Error (200 items saved)        │     │
│   └───────────────────────────────────────────────────────────────────┘     │
│                                                                              │
│   On re-run: Resume from item 201                                           │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Best Practices

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Parallel Processing Best Practices                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   1. Choose the Right Number of Workers                                      │
│      ├── CPU-bound: workers = parallel::detectCores() - 1                   │
│      └── Memory-intensive: fewer workers to avoid OOM                       │
│                                                                              │
│   2. Mind Memory Usage                                                       │
│      ├── Each worker gets a copy of global data                            │
│      ├── Use .options$globals to minimize data transfer                     │
│      └── Consider chunking very large datasets                             │
│                                                                              │
│   3. Set Appropriate Batch Sizes                                             │
│      ├── Too small: High checkpoint I/O overhead                           │
│      ├── Too large: More work lost on failure                              │
│      └── Rule of thumb: 1-5 minutes of work per batch                      │
│                                                                              │
│   4. Handle Random Seeds                                                     │
│      ├── Use .options$seed for reproducibility                             │
│      └── Each worker gets independent but reproducible stream              │
│                                                                              │
│   5. Clean Up Resources                                                      │
│      ├── Call plan(sequential) when done                                   │
│      └── Or let R clean up on exit                                         │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Complete Example

library(SafeMapper)
library(future)

# Configure parallel backend
plan(multisession, workers = 4)

# Configure SafeMapper
s_configure(
  batch_size = 100,     # Checkpoint every 100 items
  retry_attempts = 3    # Retry failed batches 3 times
)

# Define your processing function
process_item <- function(x) {
  Sys.sleep(0.5)  # Simulate work
  result <- x^2 + rnorm(1)
  return(result)
}

# Run with fault tolerance
results <- s_future_map(
  1:500,
  process_item,
  .progress = TRUE,
  .options = furrr::furrr_options(seed = 42)
)

# Clean up
plan(sequential)

# If interrupted, just re-run the same code!
# It will resume from the last checkpoint.

Next Steps