The hardware and bandwidth for this mirror is donated by METANET, the Webhosting and Full Service-Cloud Provider.
If you wish to report a bug, or if you are interested in having us mirror your free-software or open-source project, please feel free to contact us at mirror[@]metanet.ch.

Best Practices for Production Use

Overview

This guide presents best practices for using SafeMapper in production environments. Follow these guidelines to build robust, maintainable, and efficient data processing pipelines.

library(SafeMapper)
#> SafeMapper: Fault-tolerant functional programming

Architecture Patterns

Pattern 1: The Robust Pipeline

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Production Pipeline Architecture                          │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                         Configuration Layer                          │   │
│   │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                  │   │
│   │  │   Config    │  │   Logging   │  │  Monitoring │                  │   │
│   │  │   Setup     │  │   Setup     │  │   Setup     │                  │   │
│   │  └─────────────┘  └─────────────┘  └─────────────┘                  │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                      │                                       │
│                                      ▼                                       │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                         Input Validation                             │   │
│   │  ├── Check data types                                               │   │
│   │  ├── Validate required fields                                       │   │
│   │  └── Filter invalid records                                         │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                      │                                       │
│                                      ▼                                       │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                         SafeMapper Processing                        │   │
│   │  ├── s_map/s_future_map with checkpointing                         │   │
│   │  ├── Error wrappers (s_safely/s_possibly)                          │   │
│   │  └── Progress tracking                                              │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                      │                                       │
│                                      ▼                                       │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                         Post-Processing                              │   │
│   │  ├── Result validation                                              │   │
│   │  ├── Error aggregation                                              │   │
│   │  ├── Cleanup                                                        │   │
│   │  └── Reporting                                                      │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Implementation

# Production pipeline template
run_production_pipeline <- function(data, process_fn, config = list()) {
  # ===== Configuration Layer =====
  default_config <- list(
    batch_size = 100,
    retry_attempts = 3,
    session_id = paste0("pipeline_", format(Sys.time(), "%Y%m%d_%H%M%S")),
    fail_threshold = 0.1  # Max 10% failure rate
  )
  config <- modifyList(default_config, config)
  
  s_configure(
    batch_size = config$batch_size,
    retry_attempts = config$retry_attempts
  )
  
  # ===== Input Validation =====
  if (length(data) == 0) {
    stop("No data provided")
  }
  
  message(sprintf("[%s] Starting pipeline: %d items", Sys.time(), length(data)))
  
  # ===== SafeMapper Processing =====
  safe_fn <- s_safely(process_fn)
  
  results <- s_map(
    data,
    safe_fn,
    .session_id = config$session_id
  )
  
  # ===== Post-Processing =====
  successes <- sum(sapply(results, function(x) is.null(x$error)))
  failures <- sum(sapply(results, function(x) !is.null(x$error)))
  success_rate <- successes / length(results)
  
  message(sprintf("[%s] Complete: %d/%d (%.1f%% success)", 
                  Sys.time(), successes, length(results), success_rate * 100))
  
  # Check failure threshold
  if ((1 - success_rate) > config$fail_threshold) {
    warning(sprintf("Failure rate (%.1f%%) exceeds threshold (%.1f%%)",
                    (1 - success_rate) * 100, config$fail_threshold * 100))
  }
  
  # Return structured result
  list(
    results = results,
    summary = list(
      total = length(results),
      successes = successes,
      failures = failures,
      success_rate = success_rate
    ),
    config = config
  )
}

# Usage example
sample_data <- 1:50
output <- run_production_pipeline(
  sample_data,
  function(x) x^2,
  config = list(batch_size = 10, session_id = "demo_pipeline")
)
#> [2026-01-27 18:25:27.794915] Starting pipeline: 50 items
#> [2%] Processing items 1-10 of 50
#> [22%] Processing items 11-20 of 50
#> [42%] Processing items 21-30 of 50
#> [62%] Processing items 31-40 of 50
#> [82%] Processing items 41-50 of 50
#> Completed 50 items
#> [2026-01-27 18:25:27.820189] Complete: 50/50 (100.0% success)
print(output$summary)
#> $total
#> [1] 50
#> 
#> $successes
#> [1] 50
#> 
#> $failures
#> [1] 0
#> 
#> $success_rate
#> [1] 1

Configuration Best Practices

Environment-Based Configuration

# config.R - Environment-specific settings

get_config <- function(env = Sys.getenv("R_ENV", "development")) {
  configs <- list(
    development = list(
      batch_size = 10,
      retry_attempts = 1,
      auto_recover = FALSE,
      session_prefix = "dev"
    ),
    staging = list(
      batch_size = 50,
      retry_attempts = 3,
      auto_recover = TRUE,
      session_prefix = "staging"
    ),
    production = list(
      batch_size = 100,
      retry_attempts = 5,
      auto_recover = TRUE,
      session_prefix = "prod"
    )
  )
  
  config <- configs[[env]]
  if (is.null(config)) {
    warning("Unknown environment, using development settings")
    config <- configs$development
  }
  
  config
}

# Usage
config <- get_config("production")
s_configure(
  batch_size = config$batch_size,
  retry_attempts = config$retry_attempts,
  auto_recover = config$auto_recover
)

Configuration Decision Tree

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Choosing the Right Configuration                          │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   What type of operation?                                                    │
│   │                                                                          │
│   ├── Fast (< 10ms/item)                                                    │
│   │   └── batch_size: 500-1000                                              │
│   │       retry_attempts: 1                                                 │
│   │                                                                          │
│   ├── Medium (10-100ms/item)                                                │
│   │   └── batch_size: 100 (default)                                         │
│   │       retry_attempts: 3                                                 │
│   │                                                                          │
│   ├── Slow (100ms-1s/item)                                                  │
│   │   └── batch_size: 20-50                                                 │
│   │       retry_attempts: 3-5                                               │
│   │                                                                          │
│   └── Very Slow (> 1s/item)                                                 │
│       └── batch_size: 5-20                                                  │
│           retry_attempts: 2-3                                               │
│                                                                              │
│   Network involved?                                                          │
│   │                                                                          │
│   ├── YES ──► Increase retry_attempts (3-5)                                 │
│   │           Consider exponential backoff in function                      │
│   │                                                                          │
│   └── NO ───► Lower retry_attempts (1-2)                                    │
│               Errors usually persistent                                      │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Error Handling Strategies

The Three-Layer Defense

# Layer 1: Function-level error handling
process_with_validation <- function(item) {
  # Input validation
  if (is.null(item) || length(item) == 0) {
    return(list(status = "skipped", reason = "empty input"))
  }
  
  # Try the operation
  tryCatch({
    result <- item^2  # Your actual operation
    list(status = "success", value = result)
  }, error = function(e) {
    list(status = "error", reason = e$message)
  })
}

# Layer 2: s_possibly for unexpected errors
safe_process <- s_possibly(process_with_validation, 
                           otherwise = list(status = "failed", reason = "unknown"))

# Layer 3: SafeMapper checkpointing
results <- s_map(
  list(1, NULL, 3, "invalid", 5),
  safe_process,
  .session_id = "three_layer_demo"
)
#> [20%] Processing items 1-5 of 5
#> Completed 5 items

# Review results
statuses <- sapply(results, function(x) x$status)
table(statuses)
#> statuses
#>   error skipped success 
#>       1       1       3

Error Recovery Workflow

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Error Recovery Workflow                                   │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Step 1: Detect Failure                                                     │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │  Pipeline fails or returns high error rate                          │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                      │                                       │
│                                      ▼                                       │
│   Step 2: Analyze Errors                                                     │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │  # Extract error patterns                                           │   │
│   │  errors <- results[has_errors]                                      │   │
│   │  error_types <- table(error_messages)                               │   │
│   │  # Identify: Transient? Systematic? Data issue?                     │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                      │                                       │
│                                      ▼                                       │
│   Step 3: Take Action                                                        │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │  Transient errors ──► Re-run (checkpoint handles this)              │   │
│   │  Data errors ────────► Fix data, re-run with new session_id         │   │
│   │  Code errors ────────► Fix code, re-run with new session_id         │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Session Management

Naming Convention

# Recommended session ID formats

# Time-based (for scheduled jobs)
session_daily <- paste0("daily_report_", format(Sys.Date(), "%Y%m%d"))

# Version-based (for algorithm changes)
session_versioned <- "model_training_v2.1"

# Task-based (for specific operations)
session_task <- "customer_data_migration_batch1"

# Combined (recommended for production)
session_production <- paste0(
  "prod_",                                    # Environment
  "data_sync_",                               # Task name
  format(Sys.time(), "%Y%m%d_%H%M%S"),       # Timestamp
  "_v1"                                       # Version
)

print(session_production)
#> [1] "prod_data_sync_20260127_182527_v1"

Cleanup Strategy

# cleanup_jobs.R - Run periodically

perform_cleanup <- function() {
  message("Starting checkpoint cleanup...")
  
  # Remove old sessions (> 7 days)
  removed <- s_clean_sessions(older_than_days = 7)
  message(sprintf("Removed %d old session files", removed))
  
  # Note: In a real scenario, you might want to:
  # - Archive before deletion
  # - Send notifications for failed sessions
  # - Log cleanup activities
  
  invisible(removed)
}

# Run cleanup
perform_cleanup()
#> Starting checkpoint cleanup...
#> No session files found
#> Removed 0 old session files

Performance Optimization

Batch Size Tuning

# Performance comparison helper
benchmark_batch_sizes <- function(data, func, sizes = c(10, 50, 100, 200)) {
  results <- list()
  
  for (size in sizes) {
    s_configure(batch_size = size)
    
    start_time <- Sys.time()
    s_map(data, func, .session_id = paste0("bench_", size))
    end_time <- Sys.time()
    
    results[[as.character(size)]] <- as.numeric(end_time - start_time)
    
    # Clean up benchmark sessions
    s_clean_sessions(session_ids = paste0("bench_", size))
  }
  
  data.frame(
    batch_size = sizes,
    time_seconds = unlist(results)
  )
}

# Example (with small data for demo)
# In production, use larger datasets
test_data <- 1:100
results <- benchmark_batch_sizes(
  test_data, 
  function(x) { Sys.sleep(0.001); x^2 },
  sizes = c(10, 25, 50)
)
#> [1%] Processing items 1-10 of 100
#> [11%] Processing items 11-20 of 100
#> [21%] Processing items 21-30 of 100
#> [31%] Processing items 31-40 of 100
#> [41%] Processing items 41-50 of 100
#> [51%] Processing items 51-60 of 100
#> [61%] Processing items 61-70 of 100
#> [71%] Processing items 71-80 of 100
#> [81%] Processing items 81-90 of 100
#> [91%] Processing items 91-100 of 100
#> Completed 100 items
#> No files to remove
#> [1%] Processing items 1-25 of 100
#> [26%] Processing items 26-50 of 100
#> [51%] Processing items 51-75 of 100
#> [76%] Processing items 76-100 of 100
#> Completed 100 items
#> No files to remove
#> [1%] Processing items 1-50 of 100
#> [51%] Processing items 51-100 of 100
#> Completed 100 items
#> No files to remove
print(results)
#>    batch_size time_seconds
#> 10         10    0.1324372
#> 25         25    0.1363878
#> 50         50    0.1389260

Memory Management

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Memory Management Guidelines                              │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   1. Process Large Data in Chunks                                            │
│      ├── Don't load entire dataset at once                                  │
│      ├── Use generators or iterators where possible                         │
│      └── Clear intermediate results: rm(temp_data); gc()                    │
│                                                                              │
│   2. Mind Checkpoint Size                                                    │
│      ├── Each checkpoint stores all completed results                       │
│      ├── Large result objects = large checkpoint files                      │
│      └── Consider storing references instead of full objects               │
│                                                                              │
│   3. Parallel Processing Memory                                              │
│      ├── Each worker copies global data                                     │
│      ├── Reduce workers if memory-constrained                               │
│      └── Use .options$globals to minimize copies                            │
│                                                                              │
│   4. Monitor Memory Usage                                                    │
│      ├── Check with: pryr::mem_used() or lobstr::obj_size()                │
│      ├── Profile with: profmem package                                      │
│      └── Set limits: options(future.globals.maxSize = 1e9)                 │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Monitoring and Logging

Progress Tracking

# Custom progress wrapper
process_with_logging <- function(items, func, session_id) {
  total <- length(items)
  start_time <- Sys.time()
  
  message(sprintf("[START] %s - Processing %d items", session_id, total))
  
  results <- s_map(items, function(item) {
    result <- func(item)
    result
  }, .session_id = session_id)
  
  end_time <- Sys.time()
  duration <- as.numeric(end_time - start_time, units = "secs")
  
  message(sprintf("[END] %s - Completed in %.2f seconds (%.2f items/sec)", 
                  session_id, duration, total / duration))
  
  results
}

# Usage
results <- process_with_logging(
  1:50,
  function(x) { Sys.sleep(0.01); x^2 },
  "logged_task"
)
#> [START] logged_task - Processing 50 items
#> [2%] Processing items 1-50 of 50
#> Completed 50 items
#> [END] logged_task - Completed in 0.61 seconds (82.15 items/sec)

Production Checklist

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Production Readiness Checklist                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Before Deployment:                                                         │
│   □ Configuration                                                            │
│     □ batch_size tuned for workload                                         │
│     □ retry_attempts appropriate for error types                            │
│     □ Environment-specific configs set up                                   │
│                                                                              │
│   □ Error Handling                                                           │
│     □ s_safely/s_possibly wrapper in place                                  │
│     □ Input validation implemented                                          │
│     □ Error aggregation and reporting                                       │
│                                                                              │
│   □ Session Management                                                       │
│     □ Meaningful session_id naming convention                               │
│     □ Cleanup job scheduled                                                 │
│     □ Checkpoint directory has sufficient space                             │
│                                                                              │
│   □ Monitoring                                                               │
│     □ Logging implemented                                                   │
│     □ Progress tracking visible                                             │
│     □ Failure alerts configured                                             │
│                                                                              │
│   □ Testing                                                                  │
│     □ Tested with representative data                                       │
│     □ Tested recovery from interruption                                     │
│     □ Tested error scenarios                                                │
│     □ Performance benchmarked                                               │
│                                                                              │
│   During Operation:                                                          │
│   □ Monitor checkpoint directory size                                       │
│   □ Review error rates regularly                                            │
│   □ Update session_id when code changes                                     │
│   □ Run cleanup jobs on schedule                                            │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Anti-Patterns to Avoid

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Common Mistakes and Solutions                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   ❌ Using same session_id for different tasks                              │
│   ✅ Use unique, descriptive session IDs                                    │
│                                                                              │
│   ❌ Very small batch_size for fast operations                              │
│   ✅ Match batch_size to operation speed                                    │
│                                                                              │
│   ❌ Not handling function errors (letting them propagate)                  │
│   ✅ Use s_safely/s_possibly for graceful error handling                    │
│                                                                              │
│   ❌ Never cleaning up old checkpoints                                      │
│   ✅ Schedule regular cleanup with s_clean_sessions()                       │
│                                                                              │
│   ❌ Ignoring checkpoint after code changes                                 │
│   ✅ Update session_id when algorithm changes                               │
│                                                                              │
│   ❌ Using parallel for simple/fast operations                              │
│   ✅ Reserve s_future_map for CPU-intensive tasks                           │
│                                                                              │
│   ❌ Storing very large objects in results                                  │
│   ✅ Store references or summaries, write large objects to disk             │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Summary

# Quick reference for production use

# 1. Configure appropriately
s_configure(
  batch_size = 100,     # Tune based on operation speed
  retry_attempts = 3    # Higher for network operations
)

# 2. Use descriptive session IDs
session_id <- paste0("prod_task_", format(Sys.time(), "%Y%m%d"))

# 3. Wrap functions for safety (using s_safely for error capture)
my_function <- function(x) x^2
safe_fn <- s_safely(my_function)

# 4. Process with fault tolerance
data <- 1:20
results <- s_map(data, safe_fn, .session_id = session_id)
#> [5%] Processing items 1-20 of 20
#> Completed 20 items

# 5. Handle results properly (s_safely returns list with result/error)
successes <- sum(sapply(results, function(x) is.null(x$error)))
cat("Success rate:", successes / length(results) * 100, "%\n")
#> Success rate: 100 %

# 6. Clean up periodically
s_clean_sessions(older_than_days = 7)
#> No session files found

Additional Resources

These binaries (installable software) and packages are in development.
They may not be fully stable and should be used with caution. We make no claims about them.