NAME EV::Nats - High-performance asynchronous NATS client using EV SYNOPSIS use EV; use EV::Nats; my $nats = EV::Nats->new( host => '127.0.0.1', port => 4222, reconnect => 1, on_error => sub { warn "nats: $_[0]\n" }, on_connect => sub { warn "connected\n" }, ); # Subscribe (plain or queue group) my $sid = $nats->subscribe('foo.>', sub { my ($subject, $payload, $reply, $headers) = @_; print "[$subject] $payload\n"; }); $nats->subscribe('work.>', sub { ... }, 'workers'); # Publish (fire-and-forget) and headered publish $nats->publish('foo.bar', 'hello world'); $nats->hpublish('foo.bar', "NATS/1.0\r\nX-Trace: 42\r\n\r\n", 'body'); # Request / reply $nats->request('service.echo', 'ping', sub { my ($response, $err) = @_; die $err if $err; print "reply: $response\n"; }, 5000); # 5s timeout $nats->unsubscribe($sid); EV::run; DESCRIPTION EV::Nats is an async NATS client that implements the protocol directly in XS on top of EV. There is no external C library dependency. Protocol Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG, PING/PONG), including headered publish/receive, wildcard subjects ("*", ">"), queue groups, and request/reply with an automatic shared inbox subscription. Connectivity TCP and Unix-domain sockets; TCP keepalive; connect timeout; auto reconnect with exponential backoff and jitter; subscription and auto-unsub state restored on reconnect; cluster failover from INFO "connect_urls"; lame-duck-mode (leaf node graceful shutdown) callback; graceful "drain". Auth Token, user/pass, NKey/JWT (Ed25519 via OpenSSL). TLS Optional, auto-detected at build time. STARTTLS-style upgrade after INFO; full hostname verification (DNS or IP literal) by default; opt-out "tls_skip_verify"; custom CA via "tls_ca_file". Performance Write coalescing via "ev_prepare" (one write() per loop iteration); O(1) subscription lookup; per-publish allocation-free fast path; explicit "batch" mode for tight loops; per-connection stats counters. Higher-level APIs EV::Nats::JetStream, EV::Nats::KV, EV::Nats::ObjectStore. Note: DNS resolution via "getaddrinfo" is blocking. Use numeric IP addresses for latency-sensitive applications. METHODS new(%options) Create an EV::Nats instance. If "host" or "path" is supplied, connection is initiated immediately and the "on_connect" callback fires once the CONNECT/PONG handshake completes. my $nats = EV::Nats->new( host => '127.0.0.1', port => 4222, reconnect => 1, on_error => sub { warn "nats: $_[0]\n" }, on_connect => sub { warn "ready\n" }, ); Connection options host => Str Server hostname (numeric IP recommended; see "CAVEATS"). When set, connection starts immediately. port => Int (default 4222) Server port. path => Str Unix-domain socket path. Mutually exclusive with "host". connect_timeout => Int (ms; 0 = none) How long to wait for the TCP/TLS handshake before giving up. keepalive => Int (seconds) If set, enables "SO_KEEPALIVE" with this idle interval. priority => Int (-2 .. +2) EV watcher priority for the I/O watchers on this connection. loop => EV::Loop (default "EV::default_loop") The EV loop to attach watchers to. name => Str Client name advertised in CONNECT. Auth options user => Str / pass => Str Username/password authentication. JSON-escaped in CONNECT. token => Str Token authentication. nkey_seed => Str NATS NKey seed (the "SU..." form). Requires the build to have OpenSSL ("EV::Nats::HAS_NKEY"). jwt => Str User JWT, paired with "nkey_seed" for decentralized auth. See also "creds_file". tls => Bool / tls_ca_file => Str / tls_skip_verify => Bool See "tls" for details. Protocol options verbose => Bool (default 0) Request "+OK" acknowledgments after each command. pedantic => Bool (default 0) Server-side strict subject checking. echo => Bool (default 1) Receive messages this client itself publishes. no_responders => Bool (default 0) Ask the server to send a 503 status reply when a request has no responders, surfaced as the "no responders" error in "request". ping_interval => Int (ms, default 120000; 0 = disabled) Client-initiated PING interval for keep-alive. max_pings_outstanding => Int (default 2) Maximum unacked PINGs before the connection is declared stale. Reconnect options reconnect => Bool (default 0) Enable automatic reconnection. reconnect_delay => Int (ms, default 2000) Initial delay between reconnect attempts; subsequent attempts use exponential backoff with jitter, capped by "max_reconnect_delay". max_reconnect_delay => Int (ms, default 30000) Upper bound on the backoff delay. max_reconnect_attempts => Int (default 60; 0 = unlimited) Give up after this many consecutive failures. Callback options All callbacks fire on the EV loop, never inline. on_connect => sub { } Called after the CONNECT/PONG handshake completes. on_disconnect => sub { } Called when the connection drops, before any auto-reconnect attempt. on_error => sub { my ($err) = @_ } Receives a string. If unset, errors "croak". on_lame_duck => sub { } Called once when the server signals lame-duck-mode shutdown via INFO "ldm:true". on_slow_consumer => sub { my ($pending_bytes) = @_ } See "slow_consumer". connect($host, [$port]) Initiate a TCP connection. Port defaults to 4222. Croaks if already connected or in the middle of connecting; otherwise returns immediately and signals completion via "on_connect". connect_unix($path) Initiate a Unix-domain-socket connection. Same async semantics as "connect". disconnect Cancel any pending reconnect, drop queued writes, close the socket, and fire "on_disconnect". "intentional_disconnect" is set so no auto-reconnect is scheduled. For a clean shutdown that flushes pending writes first, see "drain". is_connected True if the CONNECT/PONG handshake has completed and no disconnect or reconnect is in progress. publish($subject, [$payload], [$reply_to]) Publish a message. Alias: "pub". $nats->publish('foo', 'hello'); $nats->publish('foo', 'hello', 'reply.subject'); hpublish($subject, $headers, [$payload], [$reply_to]) Publish with headers. Alias: "hpub". $nats->hpublish('foo', "NATS/1.0\r\nX-Key: val\r\n\r\n", 'body'); subscribe($subject, $cb, [$queue_group]) Subscribe to a subject. Returns subscription ID. Alias: "sub". my $sid = $nats->subscribe('foo.*', sub { my ($subject, $payload, $reply, $headers) = @_; }); Queue groups are preserved across reconnects. Callback receives: $subject - actual subject the message was published to $payload - message body $reply - reply-to subject (undef if none) $headers - raw headers string (only for HMSG) subscribe_max($subject, $cb, $max_msgs, [$queue_group]) Convenience: "subscribe" followed by an auto-unsubscribe after $max_msgs messages have been delivered. unsubscribe($sid, [$max_msgs]) Unsubscribe. With $max_msgs, the server is told to deliver that many more messages and then drop the subscription. The auto-unsub state is restored on reconnect (so the partial count survives a disconnect). Alias: "unsub". request($subject, $payload, $cb, [$timeout_ms]) Request/reply. Uses automatic inbox subscription. Alias: "req". $nats->request('service', 'data', sub { my ($response, $err) = @_; die $err if $err; print "got: $response\n"; }, 5000); Callback receives "($response, $error)". For replies that include NATS message headers (HMSG), a third argument $headers with the raw header block is also passed. Error is set on timeout ("request timeout") or no responders ("no responders"). drain([$cb]) Graceful shutdown: sends UNSUB for all subscriptions, flushes pending writes with a PING fence, fires $cb when the server confirms with PONG, then disconnects. No new messages will be received after drain is initiated. $cb receives a single argument: "undef" on clean drain, or an error string (e.g. "disconnected") if the connection dropped before the PONG arrived. $nats->drain(sub { my ($err) = @_; die "drain failed: $err" if $err; print "drained, safe to exit\n"; }); ping Send PING to server. flush([$cb]) Send PING as a write fence; the subsequent PONG guarantees all prior messages were processed by the server. If $cb is given, it is invoked when the PONG arrives. The callback receives a single argument: "undef" on success, or an error string (e.g. "disconnected") if the connection dropped before the PONG arrived. creds_file($path) Read a NATS ".creds" file and apply the embedded JWT and NKey seed via "jwt" and "nkey_seed". Apply this BEFORE "connect" so the credentials are available during the CONNECT handshake. Dies if the file is unreadable or missing either the "USER JWT" or "USER NKEY SEED" block. new_inbox Returns a fresh subject suitable for use as a private reply target ("_INBOX.."). Each call burns a slot from the same counter that "request" uses, so manual subscribers must treat the returned subject as opaque. subscription_count Returns the number of currently-registered subscriptions, including the implicit "_INBOX.>" subscription used by "request". server_info Returns the raw JSON string of the most recent INFO frame received from the server (or "undef" before the first INFO). Useful for inspecting "server_id", "version", "cluster", "connect_urls", etc. max_payload([$limit]) Server-advertised maximum payload size in bytes. Returns the current value; with an argument, overrides it (publishes above this croak locally before reaching the wire). waiting_count Number of writes queued locally during connect or reconnect (i.e. "publish"/"request" calls made while the connection is not yet ready). They flush when the handshake completes. skip_waiting Drop all queued writes without sending them. Useful before "disconnect" if reconnect is enabled and you don't want stale publishes replayed. reconnect($enable, [$delay_ms], [$max_attempts]) Configure reconnection. $delay_ms and $max_attempts are only written when supplied; omitted args leave the existing value unchanged. reconnect_enabled Returns true if reconnect is enabled. connect_timeout([$ms]) Get/set connect timeout. ping_interval([$ms]) Get/set PING interval. max_pings_outstanding([$num]) Get/set max outstanding PINGs. priority([$num]) Get/set EV watcher priority. keepalive([$seconds]) Get/set TCP keepalive. batch($coderef) Batch multiple publishes into a single write. Suppresses per-publish write scheduling; all buffered data is flushed after the coderef returns. $nats->batch(sub { $nats->publish("foo.$_", "msg-$_") for 1..1000; }); slow_consumer($bytes_threshold, [$cb]) Enable slow consumer detection. When the write buffer exceeds $bytes_threshold bytes, $cb is called with the current buffer size. $nats->slow_consumer(1024*1024, sub { my ($pending_bytes) = @_; warn "slow consumer: ${pending_bytes}B pending\n"; }); on_lame_duck([$cb]) Get/set the lame-duck callback. Fires once when the server signals shutdown (leaf node, rolling restart) via INFO "ldm:true". Use this to migrate work to another server before the grace period elapses. nkey_seed($seed) Set the NKey seed (the "SU..." base32-encoded form) for Ed25519 authentication. Requires the build to have OpenSSL (see "HAS_NKEY" in EV::Nats). The server nonce from INFO is automatically signed during CONNECT. May also be passed to "new" as "nkey_seed => ...". jwt($token) Set the user JWT. Combine with "nkey_seed" for NATS decentralized auth. May also be passed to "new". See "creds_file" for the common case of loading both from a ".creds" file. EV::Nats->nkey_generate_user_seed Class method. Returns a fresh, valid NATS User NKey seed (the "SU..." form). Useful for tests and provisioning scripts that don't have the "nk" CLI available. Requires "HAS_NKEY"; croaks otherwise. EV::Nats->nkey_public_from_seed($seed) Class method. Derives the matching public key (the "U..." form) from a User NKey seed. Croaks on an invalid seed. Pair with "nkey_generate_user_seed" to provision the server with the public key while the client keeps the seed. tls($enable, [$ca_file], [$skip_verify]) Configure TLS. Requires OpenSSL at build time (see "HAS_TLS" in EV::Nats). $nats->tls(1); # system CA $nats->tls(1, '/path/to/ca.pem'); # custom CA $nats->tls(1, undef, 1); # skip verification When verification is enabled (the default), the server certificate's SAN must match either the resolved IP literal or the DNS hostname passed to "connect". May also be passed to "new" as "tls => 1, tls_ca_file => $path". stats Returns a hash of connection counters: my %s = $nats->stats; # ( msgs_in, msgs_out, bytes_in, bytes_out ) reset_stats Zero all counters returned by "stats". on_error([$cb]) on_connect([$cb]) on_disconnect([$cb]) Get/set the corresponding callback at runtime. With no argument, returns the current value (or "undef"). With an argument, replaces it; pass "undef" to clear. BUILD-TIME FEATURES EV::Nats::HAS_TLS True if compiled with OpenSSL (TLS supported). EV::Nats::HAS_NKEY True if NKey/JWT signing is available (also requires OpenSSL). BENCHMARKS Measured on Linux with TCP loopback, Perl 5.40, nats-server 2.12, 100-byte payloads ("bench/benchmark.pl"): 100K msgs 200K msgs PUB fire-and-forget 4.7M 5.0M msgs/sec PUB + SUB (loopback) 1.8M 1.6M msgs/sec PUB + SUB (8B payload) 2.2M 1.9M msgs/sec REQ/REP (pipelined, 64) 334K msgs/sec Connected-path publish appends directly to the write buffer with no per-message allocation. Write coalescing via "ev_prepare" batches all publishes per event-loop iteration into a single write() syscall. Run "perl bench/benchmark.pl" for full results. Set "BENCH_MESSAGES", "BENCH_PAYLOAD", "BENCH_HOST", "BENCH_PORT" to customize. NATS PROTOCOL This module implements the NATS client protocol directly in XS. The protocol is text-based with CRLF-delimited control lines and binary payloads. Connection flow: server sends INFO, client sends CONNECT + PING, server responds with PONG to confirm. All subscriptions (including queue groups and auto-unsub state) are automatically restored on reconnect. Request/reply uses a single wildcard inbox subscription ("_INBOX..*") for all requests, with unique suffixes per request. CAVEATS * DNS resolution via "getaddrinfo" is blocking. Use numeric IP addresses for latency-sensitive applications. * TLS requires OpenSSL headers at build time (auto-detected). * NKey auth requires OpenSSL with Ed25519 support (1.1.1+). * The module handles all data as bytes. Encode UTF-8 strings before passing them. * Do not let the "EV::Nats" instance go out of scope (or be explicitly "undef"-ed) from inside a callback while that callback is still executing. The callback closure normally references $nats (via "$nats->publish(...)" etc.), which keeps it alive; if you write a callback that does not capture $nats and you "undef" the last outer reference inside that callback, Perl will run "DESTROY" mid-callback and free the underlying state. Any subsequent operation on $nats in that callback is undefined behavior. * Cluster URL discovery (the "connect_urls" field of INFO) is trusted by default. On failover the client connects to whatever hostnames the previous server advertised, and TLS hostname verification is performed against those names. Use a private CA ("tls_ca_file") to restrict which certificates are acceptable, or do not enable "tls" on public-CA topologies where any holder of a valid cert could redirect clients. ENVIRONMENT TEST_NATS_HOST, TEST_NATS_PORT Set these to run the test suite against a NATS server (default: 127.0.0.1:4222). SEE ALSO EV::Nats::JetStream, EV::Nats::KV, EV::Nats::ObjectStore, EV, NATS protocol , nats-server . AUTHOR vividsnow LICENSE This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.