The hardware and bandwidth for this mirror is donated by METANET, the Webhosting and Full Service-Cloud Provider.
If you wish to report a bug, or if you are interested in having us mirror your free-software or open-source project, please feel free to contact us at mirror[@]metanet.ch.
nanonext provides bindings to NNG (Nanomsg Next Gen), a high-performance messaging library for building distributed systems.
This is a cheatsheet. Refer to the other vignettes for detailed introductions:
inproc:// (in-process), ipc:// (inter-process), tcp://, ws://, wss://, tls+tcp://send_aio() / recv_aio() return immediately; access results via $data or $result"serial" (R objects), "raw" (bytes), "double", "integer", "character", etc.cv() for zero-latency event synchronisationlibrary(nanonext)
# Functional interface
s <- socket("pair")
listen(s, "tcp://127.0.0.1:5555")
dial(s, "tcp://127.0.0.1:5555")
# Object-oriented interface
n <- nano("pair", listen = "tcp://127.0.0.1:5555")
n$dial("tcp://127.0.0.1:5556")
# Close when done
close(s)
n$close()
| Protocol | Description | Socket Types |
|---|---|---|
| Pair | 1-to-1 bidirectional | "pair" |
| Poly | Polyamorous pair | "poly" |
| Pipeline | One-way data flow | "push", "pull" |
| Req/Rep | RPC pattern | "req", "rep" |
| Pub/Sub | Broadcast/subscribe | "pub", "sub" |
| Survey | Query all peers | "surveyor", "respondent" |
| Bus | Many-to-many mesh | "bus" |
| URL Scheme | Description |
|---|---|
inproc://name |
In-process (fastest, same process) |
ipc:///path |
Inter-process (Unix socket / named pipe) |
tcp://host:port |
TCP/IP network |
ws://host:port/path |
WebSocket |
wss://host:port/path |
WebSocket over TLS |
tls+tcp://host:port |
TLS encrypted TCP |
# Send R object (serialized)
send(s, data.frame(a = 1, b = 2))
# Receive R object
recv(s)
# Send raw bytes (for cross-language exchange)
send(s, c(1.1, 2.2, 3.3), mode = "raw")
# Receive as specific type
recv(s, mode = "double")
recv(s, mode = "character")
recv(s, mode = "raw")
| Mode | Description |
|---|---|
"serial" / 1 |
R serialization (default) |
"character" / 2 |
Coerce to character |
"complex" / 3 |
Coerce to complex |
"double" / 4 |
Coerce to double |
"integer" / 5 |
Coerce to integer |
"logical" / 6 |
Coerce to logical |
"numeric" / 7 |
Coerce to numeric |
"raw" / 8 |
Raw bytes |
"string" / 9 |
Fast option for length-1 character |
# Async send - returns immediately
res <- send_aio(s, data)
res$result # 0 = success, error code otherwise
# Async receive - returns immediately
msg <- recv_aio(s)
msg$data # Value when resolved, 'unresolved' NA otherwise
# Check if resolved
unresolved(msg) # TRUE while pending
# Wait for resolution
call_aio(msg) # Blocks, returns Aio object
collect_aio(msg) # Blocks, returns value directly
msg[] # Blocks (user-interruptible), returns value
# Poll while doing other work
while (unresolved(msg)) {
# do other tasks
}
result <- msg$data
# Multiple async operations
msg1 <- recv_aio(s1)
msg2 <- recv_aio(s2)
# Both run concurrently
# Create condition variable
cv <- cv()
# Check/signal
cv_value(cv) # Get counter value
cv_signal(cv) # Increment counter
cv_reset(cv) # Reset to zero
# Wait (blocks until counter > 0, then decrements)
wait(cv)
# Wait with timeout (ms), returns FALSE on timeout
until(cv, 1000)
# Signal on connection/disconnection
pipe_notify(socket, cv = cv, add = TRUE, remove = TRUE)
# Distinguish message vs disconnect with flag
pipe_notify(socket, cv = cv, remove = TRUE, flag = TRUE)
r <- recv_aio(socket, cv = cv)
wait(cv) || stop("disconnected") # FALSE = pipe event
cv <- cv()
msg <- recv_aio(s, cv = cv)
wait(cv) # Wake on receive completion
msg$data
rep <- socket("rep", listen = "tcp://127.0.0.1:5555")
ctx <- context(rep)
# reply() blocks, waiting for request
reply(ctx, execute = my_function, send_mode = "raw")
close(rep)
req <- socket("req", dial = "tcp://127.0.0.1:5555")
ctx <- context(req)
# request() returns immediately
aio <- request(ctx, data = args, recv_mode = "double")
# Do other work while server processes...
# Get result when needed
result <- aio[]
close(req)
pub <- socket("pub", listen = "inproc://pubsub")
sub <- socket("sub", dial = "inproc://pubsub")
# Subscribe to topic (prefix matching)
subscribe(sub, topic = "news")
subscribe(sub, topic = NULL) # All topics
# Unsubscribe
unsubscribe(sub, topic = "news")
# Publish (topic is message prefix)
send(pub, c("news", "headline"), mode = "raw")
# Receive (includes topic)
recv(sub, mode = "character")
close(pub)
close(sub)
sur <- socket("surveyor", listen = "inproc://survey")
res1 <- socket("respondent", dial = "inproc://survey")
res2 <- socket("respondent", dial = "inproc://survey")
# Set survey timeout (ms)
survey_time(sur, 500)
# Broadcast survey
send(sur, "ping")
# Collect responses (async)
aio1 <- recv_aio(sur)
aio2 <- recv_aio(sur)
# Respondents reply
recv(res1)
send(res1, "pong1")
# Late/missing responses timeout (errorValue 5)
msleep(500)
aio2$data # errorValue if no response
close(sur)
close(res1)
close(res2)
# Generate certificate (cn must match URL host exactly)
cert <- write_cert(cn = "127.0.0.1")
# Create TLS configs
server_tls <- tls_config(server = cert$server)
client_tls <- tls_config(client = cert$client)
# Use with tls+tcp:// or wss://
s1 <- socket(listen = "tls+tcp://127.0.0.1:5555", tls = server_tls)
s2 <- socket(dial = "tls+tcp://127.0.0.1:5555", tls = client_tls)
# Client with CA cert file
client_tls <- tls_config(client = "/path/to/ca-cert.pem")
# Server with cert + key
server_tls <- tls_config(server = c("/path/to/cert.pem", "/path/to/key.pem"))
# Delayed start for configuration
s <- socket(listen = "tcp://127.0.0.1:5555", autostart = FALSE)
# Get option
opt(s$listener[[1]], "recv-size-max")
# Set option
opt(s$listener[[1]], "recv-size-max") <- 8192L
# Start after configuration
start(s$listener[[1]])
| Option | Description |
|---|---|
"recv-size-max" |
Max message size (0 = unlimited) |
"send-timeout" |
Send timeout (ms) |
"recv-timeout" |
Receive timeout (ms) |
"reconnect-time-min" |
Min reconnect interval (ms) |
"reconnect-time-max" |
Max reconnect interval (ms) |
"req:resend-time" |
Request retry interval |
"sub:prefnew" |
Prefer newer messages |
# Register custom serializer for a class
serial <- serial_config(
"class_name",
function(x) serialize(x, NULL), # serialize
unserialize # unserialize
)
opt(socket, "serial") <- serial
stat(socket, "pipes") # Active connections
stat(listener, "accept") # Connection attempts
stat(dialer, "reject") # Rejected connections
Contexts enable concurrent operations on a single socket (for req/rep, surveyor/respondent).
s <- socket("req", dial = "tcp://127.0.0.1:5555")
# Create independent contexts
ctx1 <- context(s)
ctx2 <- context(s)
# Concurrent requests
aio1 <- request(ctx1, data1)
aio2 <- request(ctx2, data2)
# Close contexts (or they close with socket)
close(ctx1)
close(ctx2)
close(s)
# R: send raw doubles
n <- nano("pair", dial = "ipc:///tmp/nanonext")
n$send(c(1.1, 2.2, 3.3), mode = "raw")
result <- n$recv(mode = "double")
# Python: receive as NumPy array
import numpy as np
import pynng
socket = pynng.Pair0(listen="ipc:///tmp/nanonext")
array = np.frombuffer(socket.recv())
socket.send(array.tobytes())
# Errors return as 'errorValue' class
result <- recv(s, block = FALSE)
# Check for errors
is_error_value(result)
# Error codes
# 5 = Timed out
# 6 = Connection refused
# 8 = Try again (non-blocking, no message)
# Get error message
nng_error(5) # "Timed out"
# Sleep (uninterruptible, ms)
msleep(100)
# Random bytes
random(8) # 8 random bytes as hex string
random(8, convert = FALSE) # As raw vector
# Parse URL
parse_url("tcp://127.0.0.1:5555")
These binaries (installable software) and packages are in development.
They may not be fully stable and should be used with caution. We make no claims about them.