/* Crafter®.Network Copyright (C) 2026 Catcrafts® Catcrafts.net This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ module; #include #include module Crafter.Network:ClientQUIC_impl; import :ClientQUIC; import Crafter.Thread; import std; using namespace Crafter; namespace { // Process-wide msquic API table + registration. Initialised lazily on // first ClientQUIC/ListenerQUIC construction; tear-down happens at // process exit via the destructor of the static object. struct MsQuicRuntime { const QUIC_API_TABLE* api = nullptr; HQUIC registration = nullptr; std::mutex initMutex; bool initialised = false; void Ensure() { std::lock_guard lock(initMutex); if (initialised) return; QUIC_STATUS s = MsQuicOpen2(&api); if (QUIC_FAILED(s)) { throw QUICException(std::format("MsQuicOpen2 failed: 0x{:x}", static_cast(s))); } QUIC_REGISTRATION_CONFIG regConfig{ "crafter.network", QUIC_EXECUTION_PROFILE_LOW_LATENCY }; s = api->RegistrationOpen(®Config, ®istration); if (QUIC_FAILED(s)) { MsQuicClose(api); api = nullptr; throw QUICException(std::format("RegistrationOpen failed: 0x{:x}", static_cast(s))); } initialised = true; } ~MsQuicRuntime() { if (registration) api->RegistrationClose(registration); if (api) MsQuicClose(api); } }; MsQuicRuntime& Runtime() { static MsQuicRuntime r; r.Ensure(); return r; } // Encode an ALPN string into the wire format msquic expects: a length // byte followed by the ASCII characters. Lifetime of the returned buffer // matches the caller's storage in `out`. QUIC_BUFFER MakeAlpn(const std::string& alpn, std::vector& out) { if (alpn.size() > 255) throw QUICException("ALPN string too long (max 255)"); out.assign(alpn.begin(), alpn.end()); QUIC_BUFFER b{}; b.Length = static_cast(out.size()); b.Buffer = out.data(); return b; } } // ---------------- QUICStream::Impl ---------------- struct QUICStream::Impl { HQUIC handle = nullptr; ClientQUIC* connection = nullptr; std::mutex mtx; std::condition_variable cv; std::deque> pending; bool peerSendClosed = false; bool shutdownComplete = false; bool sendInFlight = false; static QUIC_STATUS QUIC_API Callback(HQUIC stream, void* ctx, QUIC_STREAM_EVENT* ev) { auto* self = static_cast(ctx); switch (ev->Type) { case QUIC_STREAM_EVENT_RECEIVE: { std::vector chunk; std::uint64_t total = 0; for (std::uint32_t i = 0; i < ev->RECEIVE.BufferCount; ++i) { total += ev->RECEIVE.Buffers[i].Length; } chunk.reserve(static_cast(total)); for (std::uint32_t i = 0; i < ev->RECEIVE.BufferCount; ++i) { const QUIC_BUFFER& b = ev->RECEIVE.Buffers[i]; chunk.insert(chunk.end(), b.Buffer, b.Buffer + b.Length); } { std::lock_guard lk(self->mtx); if (!chunk.empty()) self->pending.push_back(std::move(chunk)); } self->cv.notify_all(); return QUIC_STATUS_SUCCESS; } case QUIC_STREAM_EVENT_SEND_COMPLETE: { { std::lock_guard lk(self->mtx); self->sendInFlight = false; } if (ev->SEND_COMPLETE.ClientContext) { delete[] static_cast(ev->SEND_COMPLETE.ClientContext); } self->cv.notify_all(); return QUIC_STATUS_SUCCESS; } case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN: case QUIC_STREAM_EVENT_PEER_SEND_ABORTED: { { std::lock_guard lk(self->mtx); self->peerSendClosed = true; } self->cv.notify_all(); return QUIC_STATUS_SUCCESS; } case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE: { { std::lock_guard lk(self->mtx); self->peerSendClosed = true; self->shutdownComplete = true; } self->cv.notify_all(); Runtime().api->SetCallbackHandler(stream, nullptr, nullptr); Runtime().api->StreamClose(stream); return QUIC_STATUS_SUCCESS; } default: return QUIC_STATUS_SUCCESS; } } }; QUICStream::QUICStream(HQUIC handle, ClientQUIC* connection) : handle(handle), connection(connection), impl(std::make_unique()) { impl->handle = handle; impl->connection = connection; Runtime().api->SetCallbackHandler(handle, reinterpret_cast(&Impl::Callback), impl.get()); } QUICStream::QUICStream(QUICStream&& other) noexcept : handle(other.handle), connection(other.connection), impl(std::move(other.impl)) { other.handle = nullptr; other.connection = nullptr; } QUICStream& QUICStream::operator=(QUICStream&& other) noexcept { if (this != &other) { Stop(); handle = other.handle; connection = other.connection; impl = std::move(other.impl); other.handle = nullptr; other.connection = nullptr; } return *this; } QUICStream::~QUICStream() { Stop(); } void QUICStream::Stop() { if (!handle) return; Runtime().api->StreamShutdown(handle, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0); handle = nullptr; } void QUICStream::SendSync(const void* buffer, std::uint32_t size, bool finish) { if (!handle) throw QUICClosedException(); auto* copy = new char[size]; std::memcpy(copy, buffer, size); QUIC_BUFFER quicBuf{}; quicBuf.Buffer = reinterpret_cast(copy); quicBuf.Length = size; { std::lock_guard lk(impl->mtx); impl->sendInFlight = true; } QUIC_SEND_FLAGS flags = finish ? QUIC_SEND_FLAG_FIN : QUIC_SEND_FLAG_NONE; QUIC_STATUS s = Runtime().api->StreamSend(handle, &quicBuf, 1, flags, copy); if (QUIC_FAILED(s)) { delete[] copy; throw QUICException(std::format("StreamSend failed: 0x{:x}", static_cast(s))); } std::unique_lock lk(impl->mtx); impl->cv.wait(lk, [&]{ return !impl->sendInFlight || impl->shutdownComplete; }); if (impl->shutdownComplete) throw QUICClosedException(); } std::vector QUICStream::RecieveSync() { if (!handle) throw QUICClosedException(); std::unique_lock lk(impl->mtx); impl->cv.wait(lk, [&]{ return !impl->pending.empty() || impl->peerSendClosed || impl->shutdownComplete; }); if (!impl->pending.empty()) { auto out = std::move(impl->pending.front()); impl->pending.pop_front(); return out; } throw QUICClosedException(); } std::vector QUICStream::RecieveUntilCloseSync() { if (!handle) throw QUICClosedException(); std::vector out; while (true) { std::unique_lock lk(impl->mtx); impl->cv.wait(lk, [&]{ return !impl->pending.empty() || impl->peerSendClosed || impl->shutdownComplete; }); while (!impl->pending.empty()) { auto& chunk = impl->pending.front(); out.insert(out.end(), chunk.begin(), chunk.end()); impl->pending.pop_front(); } if (impl->peerSendClosed || impl->shutdownComplete) return out; } } std::vector QUICStream::RecieveUntilFullSync(std::uint32_t bufferSize) { if (!handle) throw QUICClosedException(); std::vector out; out.reserve(bufferSize); while (out.size() < bufferSize) { std::unique_lock lk(impl->mtx); impl->cv.wait(lk, [&]{ return !impl->pending.empty() || impl->peerSendClosed || impl->shutdownComplete; }); while (!impl->pending.empty() && out.size() < bufferSize) { auto& chunk = impl->pending.front(); std::size_t want = std::min(chunk.size(), bufferSize - out.size()); out.insert(out.end(), chunk.begin(), chunk.begin() + want); if (want == chunk.size()) { impl->pending.pop_front(); } else { chunk.erase(chunk.begin(), chunk.begin() + want); } } if (out.size() < bufferSize && (impl->peerSendClosed || impl->shutdownComplete)) { throw QUICClosedException(); } } return out; } void QUICStream::RecieveAsync(std::function)> cb) { ThreadPool::Enqueue([this, cb]{ cb(this->RecieveSync()); }); } void QUICStream::RecieveUntilCloseAsync(std::function)> cb) { ThreadPool::Enqueue([this, cb]{ cb(this->RecieveUntilCloseSync()); }); } void QUICStream::RecieveUntilFullAsync(std::uint32_t bufferSize, std::function)> cb) { ThreadPool::Enqueue([this, bufferSize, cb]{ cb(this->RecieveUntilFullSync(bufferSize)); }); } // ---------------- ClientQUIC::Impl ---------------- struct ClientQUIC::Impl { HQUIC connection = nullptr; HQUIC configuration = nullptr; bool ownsConfiguration = true; std::mutex mtx; std::condition_variable cv; bool connected = false; bool closed = false; QUIC_STATUS shutdownStatus = QUIC_STATUS_SUCCESS; std::function onStream; std::function)> onDatagram; std::deque> datagramQueue; ClientQUIC* outer = nullptr; static QUIC_STATUS QUIC_API Callback(HQUIC conn, void* ctx, QUIC_CONNECTION_EVENT* ev) { auto* self = static_cast(ctx); switch (ev->Type) { case QUIC_CONNECTION_EVENT_CONNECTED: { { std::lock_guard lk(self->mtx); self->connected = true; } self->cv.notify_all(); return QUIC_STATUS_SUCCESS; } case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT: case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_PEER: { { std::lock_guard lk(self->mtx); self->closed = true; if (ev->Type == QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT) { self->shutdownStatus = ev->SHUTDOWN_INITIATED_BY_TRANSPORT.Status; } } self->cv.notify_all(); return QUIC_STATUS_SUCCESS; } case QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE: { { std::lock_guard lk(self->mtx); self->closed = true; } self->cv.notify_all(); if (ev->SHUTDOWN_COMPLETE.AppCloseInProgress == 0) { Runtime().api->ConnectionClose(conn); self->connection = nullptr; } return QUIC_STATUS_SUCCESS; } case QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED: { HQUIC streamHandle = ev->PEER_STREAM_STARTED.Stream; QUICStream stream(streamHandle, self->outer); if (self->onStream) { auto cb = self->onStream; auto* shared = new QUICStream(std::move(stream)); ThreadPool::Enqueue([cb, shared]{ cb(std::move(*shared)); delete shared; }); } else { // No handler: shut down to avoid leaking a stream. Runtime().api->StreamShutdown(streamHandle, QUIC_STREAM_SHUTDOWN_FLAG_ABORT, 0); } return QUIC_STATUS_SUCCESS; } case QUIC_CONNECTION_EVENT_DATAGRAM_RECEIVED: { std::vector chunk(ev->DATAGRAM_RECEIVED.Buffer->Buffer, ev->DATAGRAM_RECEIVED.Buffer->Buffer + ev->DATAGRAM_RECEIVED.Buffer->Length); if (self->onDatagram) { auto cb = self->onDatagram; ThreadPool::Enqueue([cb, chunk = std::move(chunk)]() mutable { cb(std::move(chunk)); }); } else { std::lock_guard lk(self->mtx); self->datagramQueue.push_back(std::move(chunk)); self->cv.notify_all(); } return QUIC_STATUS_SUCCESS; } case QUIC_CONNECTION_EVENT_DATAGRAM_SEND_STATE_CHANGED: { // msquic fires this event multiple times per datagram (e.g. // SENT -> ACKNOWLEDGED). Free the combined QUIC_BUFFER+payload // allocation only on a terminal state — SENT and LOST_SUSPECT // are intermediate and may be followed by another transition. auto state = ev->DATAGRAM_SEND_STATE_CHANGED.State; if (ev->DATAGRAM_SEND_STATE_CHANGED.ClientContext && (state == QUIC_DATAGRAM_SEND_LOST_DISCARDED || state == QUIC_DATAGRAM_SEND_ACKNOWLEDGED || state == QUIC_DATAGRAM_SEND_ACKNOWLEDGED_SPURIOUS || state == QUIC_DATAGRAM_SEND_CANCELED)) { ::operator delete(ev->DATAGRAM_SEND_STATE_CHANGED.ClientContext); } return QUIC_STATUS_SUCCESS; } default: return QUIC_STATUS_SUCCESS; } } }; static HQUIC OpenClientConfiguration(const std::string& alpn, const QUICClientCredentials& creds) { std::vector alpnBuf; QUIC_BUFFER alpnBuffer = MakeAlpn(alpn, alpnBuf); QUIC_SETTINGS settings{}; settings.IsSet.IdleTimeoutMs = 1; settings.IdleTimeoutMs = 30'000; settings.IsSet.DatagramReceiveEnabled = 1; settings.DatagramReceiveEnabled = 1; HQUIC cfg = nullptr; QUIC_STATUS s = Runtime().api->ConfigurationOpen(Runtime().registration, &alpnBuffer, 1, &settings, sizeof(settings), nullptr, &cfg); if (QUIC_FAILED(s)) throw QUICException(std::format("ConfigurationOpen failed: 0x{:x}", static_cast(s))); QUIC_CREDENTIAL_CONFIG cc{}; cc.Type = QUIC_CREDENTIAL_TYPE_NONE; cc.Flags = QUIC_CREDENTIAL_FLAG_CLIENT; if (creds.insecureNoServerValidation) { cc.Flags |= QUIC_CREDENTIAL_FLAG_NO_CERTIFICATE_VALIDATION; } s = Runtime().api->ConfigurationLoadCredential(cfg, &cc); if (QUIC_FAILED(s)) { Runtime().api->ConfigurationClose(cfg); throw QUICException(std::format("ConfigurationLoadCredential failed: 0x{:x}", static_cast(s))); } return cfg; } ClientQUIC::ClientQUIC(const char* host, std::uint16_t port, std::string alpnIn, QUICClientCredentials creds) : alpn(std::move(alpnIn)), impl(std::make_unique()) { impl->outer = this; impl->configuration = OpenClientConfiguration(alpn, creds); QUIC_STATUS s = Runtime().api->ConnectionOpen(Runtime().registration, reinterpret_cast(&Impl::Callback), impl.get(), &impl->connection); if (QUIC_FAILED(s)) { Runtime().api->ConfigurationClose(impl->configuration); throw QUICException(std::format("ConnectionOpen failed: 0x{:x}", static_cast(s))); } s = Runtime().api->ConnectionStart(impl->connection, impl->configuration, QUIC_ADDRESS_FAMILY_UNSPEC, host, port); if (QUIC_FAILED(s)) { Runtime().api->ConnectionClose(impl->connection); Runtime().api->ConfigurationClose(impl->configuration); throw QUICException(std::format("ConnectionStart failed: 0x{:x}", static_cast(s))); } std::unique_lock lk(impl->mtx); impl->cv.wait(lk, [&]{ return impl->connected || impl->closed; }); if (!impl->connected) { throw QUICException(std::format("QUIC handshake failed: 0x{:x}", static_cast(impl->shutdownStatus))); } } ClientQUIC::ClientQUIC(std::string host, std::uint16_t port, std::string alpnIn, QUICClientCredentials creds) : ClientQUIC(host.c_str(), port, std::move(alpnIn), std::move(creds)) {} ClientQUIC::ClientQUIC(HQUIC connectionHandle, HQUIC serverConfiguration, std::string alpnIn) : alpn(std::move(alpnIn)), impl(std::make_unique()) { impl->outer = this; impl->connection = connectionHandle; impl->configuration = serverConfiguration; impl->ownsConfiguration = false; impl->connected = true; Runtime().api->SetCallbackHandler(connectionHandle, reinterpret_cast(&Impl::Callback), impl.get()); } ClientQUIC::ClientQUIC(ClientQUIC&& other) noexcept : alpn(std::move(other.alpn)), impl(std::move(other.impl)) { if (impl) impl->outer = this; } ClientQUIC::~ClientQUIC() { if (!impl) return; if (impl->connection) { Runtime().api->ConnectionShutdown(impl->connection, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0); Runtime().api->ConnectionClose(impl->connection); impl->connection = nullptr; } if (impl->configuration && impl->ownsConfiguration) { Runtime().api->ConfigurationClose(impl->configuration); impl->configuration = nullptr; } } void ClientQUIC::Stop() { if (!impl || !impl->connection) return; Runtime().api->ConnectionShutdown(impl->connection, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0); } QUICStream ClientQUIC::OpenStream() { HQUIC streamHandle = nullptr; QUICStream stream; stream.impl = std::make_unique(); stream.impl->connection = this; QUIC_STATUS s = Runtime().api->StreamOpen(impl->connection, QUIC_STREAM_OPEN_FLAG_NONE, reinterpret_cast(&QUICStream::Impl::Callback), stream.impl.get(), &streamHandle); if (QUIC_FAILED(s)) throw QUICException(std::format("StreamOpen failed: 0x{:x}", static_cast(s))); stream.handle = streamHandle; stream.connection = this; stream.impl->handle = streamHandle; s = Runtime().api->StreamStart(streamHandle, QUIC_STREAM_START_FLAG_NONE); if (QUIC_FAILED(s)) { Runtime().api->StreamClose(streamHandle); throw QUICException(std::format("StreamStart failed: 0x{:x}", static_cast(s))); } return stream; } void ClientQUIC::SendDatagram(const void* buffer, std::uint32_t size) { // msquic stores the QUIC_BUFFER pointer (not a copy) on the send queue // and serialises async on a worker thread. Both the QUIC_BUFFER and the // payload it points at must outlive the call until DATAGRAM_SEND_STATE // reports a terminal state. Pack them together in a single allocation. auto* mem = static_cast(::operator new(sizeof(QUIC_BUFFER) + size)); auto* hdr = reinterpret_cast(mem); auto* payload = mem + sizeof(QUIC_BUFFER); std::memcpy(payload, buffer, size); hdr->Buffer = payload; hdr->Length = size; QUIC_STATUS s = Runtime().api->DatagramSend(impl->connection, hdr, 1, QUIC_SEND_FLAG_NONE, mem); if (QUIC_FAILED(s)) { ::operator delete(mem); throw QUICException(std::format("DatagramSend failed: 0x{:x}", static_cast(s))); } } void ClientQUIC::OnStream(std::function cb) { impl->onStream = std::move(cb); } void ClientQUIC::OnDatagram(std::function)> cb) { impl->onDatagram = std::move(cb); } std::vector ClientQUIC::RecieveDatagramSync() { std::unique_lock lk(impl->mtx); impl->cv.wait(lk, [&]{ return !impl->datagramQueue.empty() || impl->closed; }); if (!impl->datagramQueue.empty()) { auto out = std::move(impl->datagramQueue.front()); impl->datagramQueue.pop_front(); return out; } throw QUICClosedException(); } HQUIC ClientQUIC::GetHandle() const { return impl ? impl->connection : nullptr; }