The hardware and bandwidth for this mirror is donated by METANET, the Webhosting and Full Service-Cloud Provider.
If you wish to report a bug, or if you are interested in having us mirror your free-software or open-source project, please feel free to contact us at mirror[@]metanet.ch.
This guide presents best practices for using SafeMapper in production environments. Follow these guidelines to build robust, maintainable, and efficient data processing pipelines.
┌─────────────────────────────────────────────────────────────────────────────┐
│ Production Pipeline Architecture │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Configuration Layer │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Config │ │ Logging │ │ Monitoring │ │ │
│ │ │ Setup │ │ Setup │ │ Setup │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Input Validation │ │
│ │ ├── Check data types │ │
│ │ ├── Validate required fields │ │
│ │ └── Filter invalid records │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ SafeMapper Processing │ │
│ │ ├── s_map/s_future_map with checkpointing │ │
│ │ ├── Error wrappers (s_safely/s_possibly) │ │
│ │ └── Progress tracking │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Post-Processing │ │
│ │ ├── Result validation │ │
│ │ ├── Error aggregation │ │
│ │ ├── Cleanup │ │
│ │ └── Reporting │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
# Production pipeline template
run_production_pipeline <- function(data, process_fn, config = list()) {
# ===== Configuration Layer =====
default_config <- list(
batch_size = 100,
retry_attempts = 3,
session_id = paste0("pipeline_", format(Sys.time(), "%Y%m%d_%H%M%S")),
fail_threshold = 0.1 # Max 10% failure rate
)
config <- modifyList(default_config, config)
s_configure(
batch_size = config$batch_size,
retry_attempts = config$retry_attempts
)
# ===== Input Validation =====
if (length(data) == 0) {
stop("No data provided")
}
message(sprintf("[%s] Starting pipeline: %d items", Sys.time(), length(data)))
# ===== SafeMapper Processing =====
safe_fn <- s_safely(process_fn)
results <- s_map(
data,
safe_fn,
.session_id = config$session_id
)
# ===== Post-Processing =====
successes <- sum(sapply(results, function(x) is.null(x$error)))
failures <- sum(sapply(results, function(x) !is.null(x$error)))
success_rate <- successes / length(results)
message(sprintf("[%s] Complete: %d/%d (%.1f%% success)",
Sys.time(), successes, length(results), success_rate * 100))
# Check failure threshold
if ((1 - success_rate) > config$fail_threshold) {
warning(sprintf("Failure rate (%.1f%%) exceeds threshold (%.1f%%)",
(1 - success_rate) * 100, config$fail_threshold * 100))
}
# Return structured result
list(
results = results,
summary = list(
total = length(results),
successes = successes,
failures = failures,
success_rate = success_rate
),
config = config
)
}
# Usage example
sample_data <- 1:50
output <- run_production_pipeline(
sample_data,
function(x) x^2,
config = list(batch_size = 10, session_id = "demo_pipeline")
)
#> [2026-01-27 18:25:27.794915] Starting pipeline: 50 items
#> [2%] Processing items 1-10 of 50
#> [22%] Processing items 11-20 of 50
#> [42%] Processing items 21-30 of 50
#> [62%] Processing items 31-40 of 50
#> [82%] Processing items 41-50 of 50
#> Completed 50 items
#> [2026-01-27 18:25:27.820189] Complete: 50/50 (100.0% success)
print(output$summary)
#> $total
#> [1] 50
#>
#> $successes
#> [1] 50
#>
#> $failures
#> [1] 0
#>
#> $success_rate
#> [1] 1# config.R - Environment-specific settings
get_config <- function(env = Sys.getenv("R_ENV", "development")) {
configs <- list(
development = list(
batch_size = 10,
retry_attempts = 1,
auto_recover = FALSE,
session_prefix = "dev"
),
staging = list(
batch_size = 50,
retry_attempts = 3,
auto_recover = TRUE,
session_prefix = "staging"
),
production = list(
batch_size = 100,
retry_attempts = 5,
auto_recover = TRUE,
session_prefix = "prod"
)
)
config <- configs[[env]]
if (is.null(config)) {
warning("Unknown environment, using development settings")
config <- configs$development
}
config
}
# Usage
config <- get_config("production")
s_configure(
batch_size = config$batch_size,
retry_attempts = config$retry_attempts,
auto_recover = config$auto_recover
)┌─────────────────────────────────────────────────────────────────────────────┐
│ Choosing the Right Configuration │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ What type of operation? │
│ │ │
│ ├── Fast (< 10ms/item) │
│ │ └── batch_size: 500-1000 │
│ │ retry_attempts: 1 │
│ │ │
│ ├── Medium (10-100ms/item) │
│ │ └── batch_size: 100 (default) │
│ │ retry_attempts: 3 │
│ │ │
│ ├── Slow (100ms-1s/item) │
│ │ └── batch_size: 20-50 │
│ │ retry_attempts: 3-5 │
│ │ │
│ └── Very Slow (> 1s/item) │
│ └── batch_size: 5-20 │
│ retry_attempts: 2-3 │
│ │
│ Network involved? │
│ │ │
│ ├── YES ──► Increase retry_attempts (3-5) │
│ │ Consider exponential backoff in function │
│ │ │
│ └── NO ───► Lower retry_attempts (1-2) │
│ Errors usually persistent │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
# Layer 1: Function-level error handling
process_with_validation <- function(item) {
# Input validation
if (is.null(item) || length(item) == 0) {
return(list(status = "skipped", reason = "empty input"))
}
# Try the operation
tryCatch({
result <- item^2 # Your actual operation
list(status = "success", value = result)
}, error = function(e) {
list(status = "error", reason = e$message)
})
}
# Layer 2: s_possibly for unexpected errors
safe_process <- s_possibly(process_with_validation,
otherwise = list(status = "failed", reason = "unknown"))
# Layer 3: SafeMapper checkpointing
results <- s_map(
list(1, NULL, 3, "invalid", 5),
safe_process,
.session_id = "three_layer_demo"
)
#> [20%] Processing items 1-5 of 5
#> Completed 5 items
# Review results
statuses <- sapply(results, function(x) x$status)
table(statuses)
#> statuses
#> error skipped success
#> 1 1 3┌─────────────────────────────────────────────────────────────────────────────┐
│ Error Recovery Workflow │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Step 1: Detect Failure │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Pipeline fails or returns high error rate │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Step 2: Analyze Errors │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ # Extract error patterns │ │
│ │ errors <- results[has_errors] │ │
│ │ error_types <- table(error_messages) │ │
│ │ # Identify: Transient? Systematic? Data issue? │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Step 3: Take Action │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Transient errors ──► Re-run (checkpoint handles this) │ │
│ │ Data errors ────────► Fix data, re-run with new session_id │ │
│ │ Code errors ────────► Fix code, re-run with new session_id │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
# Recommended session ID formats
# Time-based (for scheduled jobs)
session_daily <- paste0("daily_report_", format(Sys.Date(), "%Y%m%d"))
# Version-based (for algorithm changes)
session_versioned <- "model_training_v2.1"
# Task-based (for specific operations)
session_task <- "customer_data_migration_batch1"
# Combined (recommended for production)
session_production <- paste0(
"prod_", # Environment
"data_sync_", # Task name
format(Sys.time(), "%Y%m%d_%H%M%S"), # Timestamp
"_v1" # Version
)
print(session_production)
#> [1] "prod_data_sync_20260127_182527_v1"# cleanup_jobs.R - Run periodically
perform_cleanup <- function() {
message("Starting checkpoint cleanup...")
# Remove old sessions (> 7 days)
removed <- s_clean_sessions(older_than_days = 7)
message(sprintf("Removed %d old session files", removed))
# Note: In a real scenario, you might want to:
# - Archive before deletion
# - Send notifications for failed sessions
# - Log cleanup activities
invisible(removed)
}
# Run cleanup
perform_cleanup()
#> Starting checkpoint cleanup...
#> No session files found
#> Removed 0 old session files# Performance comparison helper
benchmark_batch_sizes <- function(data, func, sizes = c(10, 50, 100, 200)) {
results <- list()
for (size in sizes) {
s_configure(batch_size = size)
start_time <- Sys.time()
s_map(data, func, .session_id = paste0("bench_", size))
end_time <- Sys.time()
results[[as.character(size)]] <- as.numeric(end_time - start_time)
# Clean up benchmark sessions
s_clean_sessions(session_ids = paste0("bench_", size))
}
data.frame(
batch_size = sizes,
time_seconds = unlist(results)
)
}
# Example (with small data for demo)
# In production, use larger datasets
test_data <- 1:100
results <- benchmark_batch_sizes(
test_data,
function(x) { Sys.sleep(0.001); x^2 },
sizes = c(10, 25, 50)
)
#> [1%] Processing items 1-10 of 100
#> [11%] Processing items 11-20 of 100
#> [21%] Processing items 21-30 of 100
#> [31%] Processing items 31-40 of 100
#> [41%] Processing items 41-50 of 100
#> [51%] Processing items 51-60 of 100
#> [61%] Processing items 61-70 of 100
#> [71%] Processing items 71-80 of 100
#> [81%] Processing items 81-90 of 100
#> [91%] Processing items 91-100 of 100
#> Completed 100 items
#> No files to remove
#> [1%] Processing items 1-25 of 100
#> [26%] Processing items 26-50 of 100
#> [51%] Processing items 51-75 of 100
#> [76%] Processing items 76-100 of 100
#> Completed 100 items
#> No files to remove
#> [1%] Processing items 1-50 of 100
#> [51%] Processing items 51-100 of 100
#> Completed 100 items
#> No files to remove
print(results)
#> batch_size time_seconds
#> 10 10 0.1324372
#> 25 25 0.1363878
#> 50 50 0.1389260┌─────────────────────────────────────────────────────────────────────────────┐
│ Memory Management Guidelines │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. Process Large Data in Chunks │
│ ├── Don't load entire dataset at once │
│ ├── Use generators or iterators where possible │
│ └── Clear intermediate results: rm(temp_data); gc() │
│ │
│ 2. Mind Checkpoint Size │
│ ├── Each checkpoint stores all completed results │
│ ├── Large result objects = large checkpoint files │
│ └── Consider storing references instead of full objects │
│ │
│ 3. Parallel Processing Memory │
│ ├── Each worker copies global data │
│ ├── Reduce workers if memory-constrained │
│ └── Use .options$globals to minimize copies │
│ │
│ 4. Monitor Memory Usage │
│ ├── Check with: pryr::mem_used() or lobstr::obj_size() │
│ ├── Profile with: profmem package │
│ └── Set limits: options(future.globals.maxSize = 1e9) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
# Custom progress wrapper
process_with_logging <- function(items, func, session_id) {
total <- length(items)
start_time <- Sys.time()
message(sprintf("[START] %s - Processing %d items", session_id, total))
results <- s_map(items, function(item) {
result <- func(item)
result
}, .session_id = session_id)
end_time <- Sys.time()
duration <- as.numeric(end_time - start_time, units = "secs")
message(sprintf("[END] %s - Completed in %.2f seconds (%.2f items/sec)",
session_id, duration, total / duration))
results
}
# Usage
results <- process_with_logging(
1:50,
function(x) { Sys.sleep(0.01); x^2 },
"logged_task"
)
#> [START] logged_task - Processing 50 items
#> [2%] Processing items 1-50 of 50
#> Completed 50 items
#> [END] logged_task - Completed in 0.61 seconds (82.15 items/sec)┌─────────────────────────────────────────────────────────────────────────────┐
│ Production Readiness Checklist │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Before Deployment: │
│ □ Configuration │
│ □ batch_size tuned for workload │
│ □ retry_attempts appropriate for error types │
│ □ Environment-specific configs set up │
│ │
│ □ Error Handling │
│ □ s_safely/s_possibly wrapper in place │
│ □ Input validation implemented │
│ □ Error aggregation and reporting │
│ │
│ □ Session Management │
│ □ Meaningful session_id naming convention │
│ □ Cleanup job scheduled │
│ □ Checkpoint directory has sufficient space │
│ │
│ □ Monitoring │
│ □ Logging implemented │
│ □ Progress tracking visible │
│ □ Failure alerts configured │
│ │
│ □ Testing │
│ □ Tested with representative data │
│ □ Tested recovery from interruption │
│ □ Tested error scenarios │
│ □ Performance benchmarked │
│ │
│ During Operation: │
│ □ Monitor checkpoint directory size │
│ □ Review error rates regularly │
│ □ Update session_id when code changes │
│ □ Run cleanup jobs on schedule │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ Common Mistakes and Solutions │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ❌ Using same session_id for different tasks │
│ ✅ Use unique, descriptive session IDs │
│ │
│ ❌ Very small batch_size for fast operations │
│ ✅ Match batch_size to operation speed │
│ │
│ ❌ Not handling function errors (letting them propagate) │
│ ✅ Use s_safely/s_possibly for graceful error handling │
│ │
│ ❌ Never cleaning up old checkpoints │
│ ✅ Schedule regular cleanup with s_clean_sessions() │
│ │
│ ❌ Ignoring checkpoint after code changes │
│ ✅ Update session_id when algorithm changes │
│ │
│ ❌ Using parallel for simple/fast operations │
│ ✅ Reserve s_future_map for CPU-intensive tasks │
│ │
│ ❌ Storing very large objects in results │
│ ✅ Store references or summaries, write large objects to disk │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
# Quick reference for production use
# 1. Configure appropriately
s_configure(
batch_size = 100, # Tune based on operation speed
retry_attempts = 3 # Higher for network operations
)
# 2. Use descriptive session IDs
session_id <- paste0("prod_task_", format(Sys.time(), "%Y%m%d"))
# 3. Wrap functions for safety (using s_safely for error capture)
my_function <- function(x) x^2
safe_fn <- s_safely(my_function)
# 4. Process with fault tolerance
data <- 1:20
results <- s_map(data, safe_fn, .session_id = session_id)
#> [5%] Processing items 1-20 of 20
#> Completed 20 items
# 5. Handle results properly (s_safely returns list with result/error)
successes <- sum(sapply(results, function(x) is.null(x$error)))
cat("Success rate:", successes / length(results) * 100, "%\n")
#> Success rate: 100 %
# 6. Clean up periodically
s_clean_sessions(older_than_days = 7)
#> No session files foundThese binaries (installable software) and packages are in development.
They may not be fully stable and should be used with caution. We make no claims about them.