/* Crafter®.Network Copyright (C) 2026 Catcrafts® Catcrafts.net This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ module; #include #include module Crafter.Network:ClientQUIC_impl; import :ClientQUIC; import Crafter.Thread; import std; using namespace Crafter; namespace { // Process-wide msquic API table + registration. Initialised lazily on // first ClientQUIC/ListenerQUIC construction; tear-down happens at // process exit via the destructor of the static object. struct MsQuicRuntime { const QUIC_API_TABLE* api = nullptr; HQUIC registration = nullptr; std::mutex initMutex; bool initialised = false; void Ensure() { std::lock_guard lock(initMutex); if (initialised) return; QUIC_STATUS s = MsQuicOpen2(&api); if (QUIC_FAILED(s)) { throw QUICException(std::format("MsQuicOpen2 failed: 0x{:x}", static_cast(s))); } QUIC_REGISTRATION_CONFIG regConfig{ "crafter.network", QUIC_EXECUTION_PROFILE_LOW_LATENCY }; s = api->RegistrationOpen(®Config, ®istration); if (QUIC_FAILED(s)) { MsQuicClose(api); api = nullptr; throw QUICException(std::format("RegistrationOpen failed: 0x{:x}", static_cast(s))); } initialised = true; } ~MsQuicRuntime() { if (registration) api->RegistrationClose(registration); if (api) MsQuicClose(api); } }; MsQuicRuntime& Runtime() { static MsQuicRuntime r; r.Ensure(); return r; } // Encode an ALPN string into the wire format msquic expects: a length // byte followed by the ASCII characters. Lifetime of the returned buffer // matches the caller's storage in `out`. QUIC_BUFFER MakeAlpn(const std::string& alpn, std::vector& out) { if (alpn.size() > 255) throw QUICException("ALPN string too long (max 255)"); out.assign(alpn.begin(), alpn.end()); QUIC_BUFFER b{}; b.Length = static_cast(out.size()); b.Buffer = out.data(); return b; } } // ---------------- QUICStream::Impl ---------------- struct QUICStream::Impl { HQUIC handle = nullptr; ClientQUIC* connection = nullptr; std::mutex mtx; std::condition_variable cv; std::deque> pending; bool peerSendClosed = false; bool shutdownComplete = false; bool sendInFlight = false; static QUIC_STATUS QUIC_API Callback(HQUIC stream, void* ctx, QUIC_STREAM_EVENT* ev) { auto* self = static_cast(ctx); switch (ev->Type) { case QUIC_STREAM_EVENT_RECEIVE: { std::vector chunk; std::uint64_t total = 0; for (std::uint32_t i = 0; i < ev->RECEIVE.BufferCount; ++i) { total += ev->RECEIVE.Buffers[i].Length; } chunk.reserve(static_cast(total)); for (std::uint32_t i = 0; i < ev->RECEIVE.BufferCount; ++i) { const QUIC_BUFFER& b = ev->RECEIVE.Buffers[i]; chunk.insert(chunk.end(), b.Buffer, b.Buffer + b.Length); } { std::lock_guard lk(self->mtx); if (!chunk.empty()) self->pending.push_back(std::move(chunk)); } self->cv.notify_all(); return QUIC_STATUS_SUCCESS; } case QUIC_STREAM_EVENT_SEND_COMPLETE: { { std::lock_guard lk(self->mtx); self->sendInFlight = false; } if (ev->SEND_COMPLETE.ClientContext) { delete[] static_cast(ev->SEND_COMPLETE.ClientContext); } self->cv.notify_all(); return QUIC_STATUS_SUCCESS; } case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN: case QUIC_STREAM_EVENT_PEER_SEND_ABORTED: { { std::lock_guard lk(self->mtx); self->peerSendClosed = true; } self->cv.notify_all(); return QUIC_STATUS_SUCCESS; } case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE: { { std::lock_guard lk(self->mtx); self->peerSendClosed = true; self->shutdownComplete = true; } self->cv.notify_all(); Runtime().api->SetCallbackHandler(stream, nullptr, nullptr); Runtime().api->StreamClose(stream); return QUIC_STATUS_SUCCESS; } default: return QUIC_STATUS_SUCCESS; } } }; QUICStream::QUICStream() = default; QUICStream::QUICStream(HQUIC handle, ClientQUIC* connection) : handle(handle), connection(connection), impl(std::make_unique()) { impl->handle = handle; impl->connection = connection; Runtime().api->SetCallbackHandler(handle, reinterpret_cast(&Impl::Callback), impl.get()); } QUICStream::QUICStream(QUICStream&& other) noexcept : handle(other.handle), connection(other.connection), canSend(other.canSend), canReceive(other.canReceive), impl(std::move(other.impl)) { other.handle = nullptr; other.connection = nullptr; } QUICStream& QUICStream::operator=(QUICStream&& other) noexcept { if (this != &other) { Stop(); handle = other.handle; connection = other.connection; canSend = other.canSend; canReceive = other.canReceive; impl = std::move(other.impl); other.handle = nullptr; other.connection = nullptr; } return *this; } QUICStream::~QUICStream() { Stop(); } void QUICStream::Stop() { if (!handle) return; // If the stream's SHUTDOWN_COMPLETE event has already fired, msquic has // internally called StreamClose for us (see Impl::Callback) and the // handle is no longer valid — calling StreamShutdown on it trips a // quic_bugcheck inside msquic. Skip in that case. This is the common // path for short-lived request/response streams where both peers FIN // before the wrapper is destroyed. bool alreadyClosed = false; if (impl) { std::lock_guard lk(impl->mtx); alreadyClosed = impl->shutdownComplete; } if (!alreadyClosed) { Runtime().api->StreamShutdown(handle, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0); } handle = nullptr; if (impl) impl->handle = nullptr; } void QUICStream::SendSync(const void* buffer, std::uint32_t size, bool finish) { if (!handle || !canSend) throw QUICClosedException(); auto* copy = new char[size]; std::memcpy(copy, buffer, size); QUIC_BUFFER quicBuf{}; quicBuf.Buffer = reinterpret_cast(copy); quicBuf.Length = size; { std::lock_guard lk(impl->mtx); impl->sendInFlight = true; } QUIC_SEND_FLAGS flags = finish ? QUIC_SEND_FLAG_FIN : QUIC_SEND_FLAG_NONE; QUIC_STATUS s = Runtime().api->StreamSend(handle, &quicBuf, 1, flags, copy); if (QUIC_FAILED(s)) { delete[] copy; throw QUICException(std::format("StreamSend failed: 0x{:x}", static_cast(s))); } std::unique_lock lk(impl->mtx); impl->cv.wait(lk, [&]{ return !impl->sendInFlight || impl->shutdownComplete; }); if (impl->shutdownComplete) throw QUICClosedException(); } std::vector QUICStream::RecieveSync() { if (!handle || !canReceive) throw QUICClosedException(); std::unique_lock lk(impl->mtx); impl->cv.wait(lk, [&]{ return !impl->pending.empty() || impl->peerSendClosed || impl->shutdownComplete; }); if (!impl->pending.empty()) { auto out = std::move(impl->pending.front()); impl->pending.pop_front(); return out; } throw QUICClosedException(); } std::vector QUICStream::RecieveUntilCloseSync() { if (!handle || !canReceive) throw QUICClosedException(); std::vector out; while (true) { std::unique_lock lk(impl->mtx); impl->cv.wait(lk, [&]{ return !impl->pending.empty() || impl->peerSendClosed || impl->shutdownComplete; }); while (!impl->pending.empty()) { auto& chunk = impl->pending.front(); out.insert(out.end(), chunk.begin(), chunk.end()); impl->pending.pop_front(); } if (impl->peerSendClosed || impl->shutdownComplete) return out; } } std::vector QUICStream::RecieveUntilFullSync(std::uint32_t bufferSize) { if (!handle || !canReceive) throw QUICClosedException(); std::vector out; out.reserve(bufferSize); while (out.size() < bufferSize) { std::unique_lock lk(impl->mtx); impl->cv.wait(lk, [&]{ return !impl->pending.empty() || impl->peerSendClosed || impl->shutdownComplete; }); while (!impl->pending.empty() && out.size() < bufferSize) { auto& chunk = impl->pending.front(); std::size_t want = std::min(chunk.size(), bufferSize - out.size()); out.insert(out.end(), chunk.begin(), chunk.begin() + want); if (want == chunk.size()) { impl->pending.pop_front(); } else { chunk.erase(chunk.begin(), chunk.begin() + want); } } if (out.size() < bufferSize && (impl->peerSendClosed || impl->shutdownComplete)) { throw QUICClosedException(); } } return out; } void QUICStream::SendAsync(const void* buffer, std::uint32_t size, bool finish, std::function onSent) { // Copy now: the caller's buffer may not outlive the enqueued task. std::vector copy(static_cast(buffer), static_cast(buffer) + size); ThreadPool::Enqueue([this, copy = std::move(copy), finish, onSent = std::move(onSent)]() mutable { try { this->SendSync(copy.data(), static_cast(copy.size()), finish); } catch (...) { /* swallowed — callback still fires so the caller can move on */ } if (onSent) onSent(); }); } void QUICStream::RecieveAsync(std::function)> cb) { ThreadPool::Enqueue([this, cb]{ cb(this->RecieveSync()); }); } void QUICStream::RecieveUntilCloseAsync(std::function)> cb) { ThreadPool::Enqueue([this, cb]{ cb(this->RecieveUntilCloseSync()); }); } void QUICStream::RecieveUntilFullAsync(std::uint32_t bufferSize, std::function)> cb) { ThreadPool::Enqueue([this, bufferSize, cb]{ cb(this->RecieveUntilFullSync(bufferSize)); }); } void QUICStream::PrependReceived(std::vector bytes) { if (bytes.empty() || !impl) return; { std::lock_guard lk(impl->mtx); impl->pending.push_front(std::move(bytes)); } impl->cv.notify_all(); } std::uint64_t QUICStream::GetStreamId() const { if (!handle) throw QUICException("GetStreamId: stream is not open"); QUIC_UINT62 id = 0; std::uint32_t size = sizeof(id); QUIC_STATUS s = Runtime().api->GetParam(handle, QUIC_PARAM_STREAM_ID, &size, &id); if (QUIC_FAILED(s)) { throw QUICException(std::format("GetParam(QUIC_PARAM_STREAM_ID) failed: 0x{:x}", static_cast(s))); } return static_cast(id); } // ---------------- ClientQUIC::Impl ---------------- struct ClientQUIC::Impl { HQUIC connection = nullptr; HQUIC configuration = nullptr; bool ownsConfiguration = true; std::mutex mtx; std::condition_variable cv; bool connected = false; bool closed = false; QUIC_STATUS shutdownStatus = QUIC_STATUS_SUCCESS; std::function onStream; std::function)> onDatagram; std::deque> datagramQueue; // Streams the peer started before the user installed an OnStream // handler. Without this backlog the early streams (e.g. an h3 server's // control stream right after handshake) would be aborted in the // PEER_STREAM_STARTED branch and the connection would die with // H3_MISSING_SETTINGS on the peer side. std::deque pendingStreams; ClientQUIC* outer = nullptr; static QUIC_STATUS QUIC_API Callback(HQUIC conn, void* ctx, QUIC_CONNECTION_EVENT* ev) { auto* self = static_cast(ctx); switch (ev->Type) { case QUIC_CONNECTION_EVENT_CONNECTED: { { std::lock_guard lk(self->mtx); self->connected = true; } self->cv.notify_all(); return QUIC_STATUS_SUCCESS; } case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT: case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_PEER: { { std::lock_guard lk(self->mtx); self->closed = true; if (ev->Type == QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT) { self->shutdownStatus = ev->SHUTDOWN_INITIATED_BY_TRANSPORT.Status; } } self->cv.notify_all(); return QUIC_STATUS_SUCCESS; } case QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE: { { std::lock_guard lk(self->mtx); self->closed = true; } self->cv.notify_all(); if (ev->SHUTDOWN_COMPLETE.AppCloseInProgress == 0) { Runtime().api->ConnectionClose(conn); self->connection = nullptr; } return QUIC_STATUS_SUCCESS; } case QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED: { HQUIC streamHandle = ev->PEER_STREAM_STARTED.Stream; bool unidirectional = (ev->PEER_STREAM_STARTED.Flags & QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL) != 0; QUICStream stream(streamHandle, self->outer); if (unidirectional) { // Peer-initiated unidi: peer sends, we read; we cannot send. stream.canSend = false; stream.canReceive = true; } std::function cb; { std::lock_guard lk(self->mtx); cb = self->onStream; if (!cb) { // Buffer until OnStream is installed; OnStream's // setter drains this queue. self->pendingStreams.push_back(std::move(stream)); return QUIC_STATUS_SUCCESS; } } auto* shared = new QUICStream(std::move(stream)); ThreadPool::Enqueue([cb, shared]{ cb(std::move(*shared)); delete shared; }); return QUIC_STATUS_SUCCESS; } case QUIC_CONNECTION_EVENT_DATAGRAM_RECEIVED: { std::vector chunk(ev->DATAGRAM_RECEIVED.Buffer->Buffer, ev->DATAGRAM_RECEIVED.Buffer->Buffer + ev->DATAGRAM_RECEIVED.Buffer->Length); if (self->onDatagram) { auto cb = self->onDatagram; ThreadPool::Enqueue([cb, chunk = std::move(chunk)]() mutable { cb(std::move(chunk)); }); } else { std::lock_guard lk(self->mtx); self->datagramQueue.push_back(std::move(chunk)); self->cv.notify_all(); } return QUIC_STATUS_SUCCESS; } case QUIC_CONNECTION_EVENT_DATAGRAM_SEND_STATE_CHANGED: { // msquic fires this event multiple times per datagram (e.g. // SENT -> ACKNOWLEDGED). Free the combined QUIC_BUFFER+payload // allocation only on a terminal state — SENT and LOST_SUSPECT // are intermediate and may be followed by another transition. auto state = ev->DATAGRAM_SEND_STATE_CHANGED.State; if (ev->DATAGRAM_SEND_STATE_CHANGED.ClientContext && (state == QUIC_DATAGRAM_SEND_LOST_DISCARDED || state == QUIC_DATAGRAM_SEND_ACKNOWLEDGED || state == QUIC_DATAGRAM_SEND_ACKNOWLEDGED_SPURIOUS || state == QUIC_DATAGRAM_SEND_CANCELED)) { ::operator delete(ev->DATAGRAM_SEND_STATE_CHANGED.ClientContext); } return QUIC_STATUS_SUCCESS; } default: return QUIC_STATUS_SUCCESS; } } }; static HQUIC OpenClientConfiguration(const std::string& alpn, const QUICClientCredentials& creds) { std::vector alpnBuf; QUIC_BUFFER alpnBuffer = MakeAlpn(alpn, alpnBuf); QUIC_SETTINGS settings{}; settings.IsSet.IdleTimeoutMs = 1; settings.IdleTimeoutMs = 30'000; settings.IsSet.DatagramReceiveEnabled = 1; settings.DatagramReceiveEnabled = 1; // Allow the server to open unidi/bidi streams to us. msquic defaults // both peer-stream-count limits to 0; with that, the server's HTTP/3 // control stream + QPACK encoder/decoder streams can't be created and // most h3 servers will close the connection after handshake. We don't // currently use server push (h3 pushes ride on unidi 0x01 streams) but // the bidi cap is harmless to grant. settings.IsSet.PeerUnidiStreamCount = 1; settings.PeerUnidiStreamCount = 16; settings.IsSet.PeerBidiStreamCount = 1; settings.PeerBidiStreamCount = 16; HQUIC cfg = nullptr; QUIC_STATUS s = Runtime().api->ConfigurationOpen(Runtime().registration, &alpnBuffer, 1, &settings, sizeof(settings), nullptr, &cfg); if (QUIC_FAILED(s)) throw QUICException(std::format("ConfigurationOpen failed: 0x{:x}", static_cast(s))); QUIC_CREDENTIAL_CONFIG cc{}; cc.Type = QUIC_CREDENTIAL_TYPE_NONE; cc.Flags = QUIC_CREDENTIAL_FLAG_CLIENT; if (creds.insecureNoServerValidation) { cc.Flags |= QUIC_CREDENTIAL_FLAG_NO_CERTIFICATE_VALIDATION; } s = Runtime().api->ConfigurationLoadCredential(cfg, &cc); if (QUIC_FAILED(s)) { Runtime().api->ConfigurationClose(cfg); throw QUICException(std::format("ConfigurationLoadCredential failed: 0x{:x}", static_cast(s))); } return cfg; } ClientQUIC::ClientQUIC(const char* host, std::uint16_t port, std::string alpnIn, QUICClientCredentials creds) : alpn(std::move(alpnIn)), impl(std::make_unique()) { impl->outer = this; impl->configuration = OpenClientConfiguration(alpn, creds); QUIC_STATUS s = Runtime().api->ConnectionOpen(Runtime().registration, reinterpret_cast(&Impl::Callback), impl.get(), &impl->connection); if (QUIC_FAILED(s)) { Runtime().api->ConfigurationClose(impl->configuration); throw QUICException(std::format("ConnectionOpen failed: 0x{:x}", static_cast(s))); } s = Runtime().api->ConnectionStart(impl->connection, impl->configuration, QUIC_ADDRESS_FAMILY_UNSPEC, host, port); if (QUIC_FAILED(s)) { Runtime().api->ConnectionClose(impl->connection); Runtime().api->ConfigurationClose(impl->configuration); throw QUICException(std::format("ConnectionStart failed: 0x{:x}", static_cast(s))); } std::unique_lock lk(impl->mtx); impl->cv.wait(lk, [&]{ return impl->connected || impl->closed; }); if (!impl->connected) { throw QUICException(std::format("QUIC handshake failed: 0x{:x}", static_cast(impl->shutdownStatus))); } } ClientQUIC::ClientQUIC(std::string host, std::uint16_t port, std::string alpnIn, QUICClientCredentials creds) : ClientQUIC(host.c_str(), port, std::move(alpnIn), std::move(creds)) {} ClientQUIC::ClientQUIC(HQUIC connectionHandle, HQUIC serverConfiguration, std::string alpnIn) : alpn(std::move(alpnIn)), impl(std::make_unique()) { impl->outer = this; impl->connection = connectionHandle; impl->configuration = serverConfiguration; impl->ownsConfiguration = false; impl->connected = true; Runtime().api->SetCallbackHandler(connectionHandle, reinterpret_cast(&Impl::Callback), impl.get()); } ClientQUIC::ClientQUIC(ClientQUIC&& other) noexcept : alpn(std::move(other.alpn)), impl(std::move(other.impl)) { if (impl) impl->outer = this; } ClientQUIC::~ClientQUIC() { if (!impl) return; if (impl->connection) { Runtime().api->ConnectionShutdown(impl->connection, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0); Runtime().api->ConnectionClose(impl->connection); impl->connection = nullptr; } if (impl->configuration && impl->ownsConfiguration) { Runtime().api->ConfigurationClose(impl->configuration); impl->configuration = nullptr; } } void ClientQUIC::Stop() { if (!impl || !impl->connection) return; Runtime().api->ConnectionShutdown(impl->connection, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0); } QUICStream ClientQUIC::OpenStream(bool unidirectional) { HQUIC streamHandle = nullptr; QUICStream stream; stream.impl = std::make_unique(); stream.impl->connection = this; QUIC_STREAM_OPEN_FLAGS openFlags = unidirectional ? QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL : QUIC_STREAM_OPEN_FLAG_NONE; QUIC_STATUS s = Runtime().api->StreamOpen(impl->connection, openFlags, reinterpret_cast(&QUICStream::Impl::Callback), stream.impl.get(), &streamHandle); if (QUIC_FAILED(s)) throw QUICException(std::format("StreamOpen failed: 0x{:x}", static_cast(s))); stream.handle = streamHandle; stream.connection = this; stream.impl->handle = streamHandle; if (unidirectional) { // We initiated the unidi stream: we send, peer reads. stream.canSend = true; stream.canReceive = false; } s = Runtime().api->StreamStart(streamHandle, QUIC_STREAM_START_FLAG_NONE); if (QUIC_FAILED(s)) { Runtime().api->StreamClose(streamHandle); throw QUICException(std::format("StreamStart failed: 0x{:x}", static_cast(s))); } return stream; } void ClientQUIC::SendDatagram(const void* buffer, std::uint32_t size) { // msquic stores the QUIC_BUFFER pointer (not a copy) on the send queue // and serialises async on a worker thread. Both the QUIC_BUFFER and the // payload it points at must outlive the call until DATAGRAM_SEND_STATE // reports a terminal state. Pack them together in a single allocation. auto* mem = static_cast(::operator new(sizeof(QUIC_BUFFER) + size)); auto* hdr = reinterpret_cast(mem); auto* payload = mem + sizeof(QUIC_BUFFER); std::memcpy(payload, buffer, size); hdr->Buffer = payload; hdr->Length = size; QUIC_STATUS s = Runtime().api->DatagramSend(impl->connection, hdr, 1, QUIC_SEND_FLAG_NONE, mem); if (QUIC_FAILED(s)) { ::operator delete(mem); throw QUICException(std::format("DatagramSend failed: 0x{:x}", static_cast(s))); } } void ClientQUIC::OnStream(std::function cb) { std::deque backlog; { std::lock_guard lk(impl->mtx); impl->onStream = cb; std::swap(backlog, impl->pendingStreams); } while (!backlog.empty()) { auto* shared = new QUICStream(std::move(backlog.front())); backlog.pop_front(); auto handler = cb; ThreadPool::Enqueue([handler, shared]{ handler(std::move(*shared)); delete shared; }); } } void ClientQUIC::OnDatagram(std::function)> cb) { impl->onDatagram = std::move(cb); } std::vector ClientQUIC::RecieveDatagramSync() { std::unique_lock lk(impl->mtx); impl->cv.wait(lk, [&]{ return !impl->datagramQueue.empty() || impl->closed; }); if (!impl->datagramQueue.empty()) { auto out = std::move(impl->datagramQueue.front()); impl->datagramQueue.pop_front(); return out; } throw QUICClosedException(); } HQUIC ClientQUIC::GetHandle() const { return impl ? impl->connection : nullptr; }