Class: TIMEx::Strategies::IO
Overview
Deadline-aware helpers for non-blocking socket I/O and DNS resolution.
The nested singleton methods implement reusable primitives; #run simply yields the Deadline to the caller for custom protocols.
Constant Summary collapse
- MIN_SOCKET_TIMEOUT =
Minimum seconds passed to SO_RCVTIMEO / SO_SNDTIMEO. A packed timeval of zero often means "disable timeout" on POSIX, which would leave blocking reads unbounded when the remaining budget rounds to 0.
0.001
Class Method Summary collapse
-
.apply_socket_timeouts(sock, deadline:) ⇒ void
Best-effort SO_RCV,SNDTIMEO setter.
-
.connect(host, port, deadline:, apply_timeouts: true) ⇒ ::Socket
Resolves
host(respectingdeadline) and connects via the first working address family. -
.read(io, len, deadline:) ⇒ String
Reads up to
lenbytes using non-blocking reads bounded bydeadline. -
.write(io, buffer, deadline:) ⇒ Integer
Writes the full
buffer, retrying onIO::WaitWritableuntil done or expired.
Methods inherited from Base
Class Method Details
.apply_socket_timeouts(sock, deadline:) ⇒ void
This method returns an undefined value.
Best-effort SO_RCV,SNDTIMEO setter. The native pack format for
struct timeval differs by platform (64-bit POSIX uses two long
fields; Windows uses DWORD milliseconds). When SO_RCVTIMEO_FLOAT
is exposed (Darwin, some BSDs) we prefer it because the option's
value is a raw Float, avoiding the packed-timeval mismatch.
Failures are swallowed so callers can rely on wait_for as the
primary deadline guard.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/timex/strategies/io.rb', line 115 def apply_socket_timeouts(sock, deadline:) deadline = Deadline.coerce(deadline) return if deadline.infinite? remaining = deadline.remaining return if remaining <= 0 remaining = [remaining, MIN_SOCKET_TIMEOUT].max if ::Socket.const_defined?(:SO_RCVTIMEO_FLOAT) && ::Socket.const_defined?(:SO_SNDTIMEO_FLOAT) sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_RCVTIMEO_FLOAT, remaining) sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDTIMEO_FLOAT, remaining) else tv = pack_timeval(remaining) sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_RCVTIMEO, tv) sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDTIMEO, tv) end rescue StandardError nil end |
.connect(host, port, deadline:, apply_timeouts: true) ⇒ ::Socket
Resolves host (respecting deadline) and connects via the first
working address family. Avoids getaddrinfo blocking past the
deadline by delegating to Resolv with the remaining time.
The returned socket also has SO_RCV,SNDTIMEO applied to the
remaining deadline so that subsequent blocking reads don't outlive
the budget if the caller forgets to use read / write. Use
apply_timeouts: false to opt out.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/timex/strategies/io.rb', line 85 def connect(host, port, deadline:, apply_timeouts: true) deadline = Deadline.coerce(deadline) addresses = resolve_host(host, deadline) raise ::SocketError, "could not resolve #{host}" if addresses.empty? last_error = nil addresses.each do |addr| sock = open_socket(addr, port, deadline) apply_socket_timeouts(sock, deadline:) if apply_timeouts return sock rescue Expired raise rescue StandardError => e last_error = e next end raise last_error || Errno::ECONNREFUSED.new("could not connect to #{host}:#{port}") end |
.read(io, len, deadline:) ⇒ String
Reads up to len bytes using non-blocking reads bounded by deadline.
31 32 33 34 35 36 37 38 |
# File 'lib/timex/strategies/io.rb', line 31 def read(io, len, deadline:) deadline = Deadline.coerce(deadline) loop do return io.read_nonblock(len) rescue ::IO::WaitReadable wait_for(io, :read, deadline) end end |
.write(io, buffer, deadline:) ⇒ Integer
Writes the full buffer, retrying on IO::WaitWritable until done or expired.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/timex/strategies/io.rb', line 48 def write(io, buffer, deadline:) deadline = Deadline.coerce(deadline) total = buffer.bytesize offset = 0 while offset < total begin # Avoid the per-iteration `byteslice` alloc on the common path # where we write the whole buffer in one go; only slice once we # know the kernel took a partial write. chunk = offset.zero? ? buffer : buffer.byteslice(offset, total - offset) n = io.write_nonblock(chunk) raise ::IOError, "write_nonblock returned 0 bytes (no progress)" if n.zero? offset += n rescue ::IO::WaitWritable wait_for(io, :write, deadline) end end total end |