diff --git a/implementations/Crafter.Network-ListenerHTTP.cpp b/implementations/Crafter.Network-ListenerHTTP.cpp index 6514659..95eb5b2 100644 --- a/implementations/Crafter.Network-ListenerHTTP.cpp +++ b/implementations/Crafter.Network-ListenerHTTP.cpp @@ -156,6 +156,13 @@ struct PeerState { // Session id == CONNECT stream's QUIC stream id, supplied by the peer // as the first varint of every WT data stream. std::unordered_map> wtSessions; + // WT data streams that arrived before their CONNECT session was registered. + // The stream demux can race the CONNECT handler (both run on the pool), so + // a data stream's leading session-id varint may name a session that doesn't + // exist *yet*. Park such streams here keyed by session id; the CONNECT + // handler drains them the moment it registers the session. Both paths + // synchronise on wtMtx, so there's no lost-wakeup window. + std::unordered_map> pendingWtStreams; }; struct ListenerHTTP::Impl { @@ -189,17 +196,25 @@ namespace { // ── WT bidi data stream — second varint is session id. std::uint64_t sessionId = ReadVarintFromStream(stream, peeked, cursor); std::vector remaining(peeked.begin() + cursor, peeked.end()); + // Put the bytes we read past the session id back so the + // stream is ready to hand off as-is, whether we deliver it + // now or park it for the CONNECT handler to drain later. + if (!remaining.empty()) { + stream.PrependReceived(std::move(remaining)); + } WebTransportSession* session = nullptr; { std::lock_guard lk(peerState->wtMtx); auto it = peerState->wtSessions.find(sessionId); - if (it == peerState->wtSessions.end()) return; + if (it == peerState->wtSessions.end()) { + // CONNECT for this session hasn't registered yet — + // park rather than drop. Drained on registration. + peerState->pendingWtStreams[sessionId].push_back(std::move(stream)); + return; + } session = it->second->session.get(); } - if (!remaining.empty()) { - stream.PrependReceived(std::move(remaining)); - } WebTransportDeliverStream(*session, std::move(stream)); return; } @@ -267,9 +282,21 @@ namespace { std::move(stream), sessionId, request.path); + std::vector parked; { std::lock_guard lk(peerState->wtMtx); peerState->wtSessions.emplace(sessionId, std::move(entry)); + // Collect any data streams that arrived before us. + auto pit = peerState->pendingWtStreams.find(sessionId); + if (pit != peerState->pendingWtStreams.end()) { + parked = std::move(pit->second); + peerState->pendingWtStreams.erase(pit); + } + } + // Deliver parked streams (in arrival order) now that the + // session exists — outside the lock, as delivery may block. + for (auto& parkedStream : parked) { + WebTransportDeliverStream(*sessionPtr, std::move(parkedStream)); } auto handler = wtIt->second; ThreadPool::Enqueue([handler, sessionPtr]{ diff --git a/implementations/Crafter.Network-ListenerQUIC.cpp b/implementations/Crafter.Network-ListenerQUIC.cpp index c9e0c9f..c6a6081 100644 --- a/implementations/Crafter.Network-ListenerQUIC.cpp +++ b/implementations/Crafter.Network-ListenerQUIC.cpp @@ -171,10 +171,12 @@ struct ListenerQUIC::Impl { ListenerQUIC* outer = nullptr; // Backlog used by ListenSync* methods to convert msquic's callback model - // to a blocking accept(2)-style loop. + // to a blocking accept(2)-style loop. Entries are fully-constructed + // ClientQUIC objects (see NEW_CONNECTION below) — ownership passes to the + // user's connectCallback when the accept loop dispatches them. std::mutex mtx; std::condition_variable cv; - std::deque pendingAccepted; + std::deque pendingAccepted; bool stopRequested = false; // Holds the std::thread spawned by ListenAsync* so the destructor can @@ -182,28 +184,42 @@ struct ListenerQUIC::Impl { // by raw pointer; running it past the destructor is a use-after-free). std::thread acceptLoop; - static QUIC_STATUS QUIC_API ConnectionCallbackBootstrap(HQUIC, void*, QUIC_CONNECTION_EVENT*) { - // Real callbacks are installed by ClientQUIC's constructor for the - // server-side branch. This stub exists only for the brief window - // between NEW_CONNECTION and ConnectionSetConfiguration. - return QUIC_STATUS_SUCCESS; - } - static QUIC_STATUS QUIC_API Callback(HQUIC, void* ctx, QUIC_LISTENER_EVENT* ev) { auto* self = static_cast(ctx); switch (ev->Type) { case QUIC_LISTENER_EVENT_NEW_CONNECTION: { HQUIC conn = ev->NEW_CONNECTION.Connection; - Runtime().api->SetCallbackHandler(conn, - reinterpret_cast(&ConnectionCallbackBootstrap), nullptr); + + // Construct the ClientQUIC *here*, on the listener-callback + // thread, so its real connection-callback handler is installed + // before this handler returns — i.e. before msquic delivers any + // further events for the connection (handshake completion and, + // crucially, the peer's first PEER_STREAM_STARTED events: an + // HTTP/3 peer opens its control + QPACK streams the instant the + // handshake finishes). + // + // Previously the real handler was deferred to the accept loop / + // ThreadPool while a no-op bootstrap stub held the connection. + // On a fast (e.g. loopback) path the handshake and the peer's + // early streams could arrive before the deferred construction + // ran, so those PEER_STREAM_STARTED events were silently + // dropped — intermittently breaking WebTransport/HTTP3 session + // establishment in a way that "cleared" on retry. + // + // msquic's documented order is SetCallbackHandler (done by the + // ClientQUIC ctor) then ConnectionSetConfiguration; no events + // are delivered until this listener callback returns, so doing + // both synchronously here closes the window entirely. + ClientQUIC* peer = new ClientQUIC(conn, self->configuration, + self->outer->alpn); QUIC_STATUS s = Runtime().api->ConnectionSetConfiguration(conn, self->configuration); if (QUIC_FAILED(s)) { - Runtime().api->ConnectionClose(conn); + delete peer; // ~ClientQUIC shuts down + closes the conn return s; } { std::lock_guard lk(self->mtx); - self->pendingAccepted.push_back(conn); + self->pendingAccepted.push_back(peer); } self->cv.notify_all(); return QUIC_STATUS_SUCCESS; @@ -318,6 +334,14 @@ ListenerQUIC::~ListenerQUIC() { if (!impl) return; Stop(); if (impl->acceptLoop.joinable()) impl->acceptLoop.join(); + // Drop any connections accepted by the NEW_CONNECTION handler but never + // dispatched to connectCallback (we stopped before the accept loop drained + // them). ~ClientQUIC shuts down + closes each one. + { + std::lock_guard lk(impl->mtx); + for (ClientQUIC* peer : impl->pendingAccepted) delete peer; + impl->pendingAccepted.clear(); + } if (impl->listener) { Runtime().api->ListenerClose(impl->listener); impl->listener = nullptr; @@ -343,10 +367,12 @@ void ListenerQUIC::ListenSyncSync() { std::unique_lock lk(impl->mtx); impl->cv.wait(lk, [&]{ return !impl->pendingAccepted.empty() || impl->stopRequested; }); if (impl->stopRequested) return; - HQUIC conn = impl->pendingAccepted.front(); + // The ClientQUIC is already constructed (with its real callback + // handler installed) by the NEW_CONNECTION handler; just hand it off. + ClientQUIC* peer = impl->pendingAccepted.front(); impl->pendingAccepted.pop_front(); lk.unlock(); - connectCallback(new ClientQUIC(conn, impl->configuration, alpn)); + connectCallback(peer); ++totalClientCounter; } } @@ -356,13 +382,11 @@ void ListenerQUIC::ListenSyncAsync() { std::unique_lock lk(impl->mtx); impl->cv.wait(lk, [&]{ return !impl->pendingAccepted.empty() || impl->stopRequested; }); if (impl->stopRequested) return; - HQUIC conn = impl->pendingAccepted.front(); + ClientQUIC* peer = impl->pendingAccepted.front(); impl->pendingAccepted.pop_front(); lk.unlock(); - std::string a = alpn; - HQUIC cfg = impl->configuration; auto cb = connectCallback; - ThreadPool::Enqueue([conn, cfg, a, cb]{ cb(new ClientQUIC(conn, cfg, a)); }); + ThreadPool::Enqueue([peer, cb]{ cb(peer); }); ++totalClientCounter; } } diff --git a/project.cpp b/project.cpp index 8181ea3..6188b3e 100644 --- a/project.cpp +++ b/project.cpp @@ -111,6 +111,7 @@ extern "C" Configuration CrafterBuildProject(std::span a // crafter-network static lib via .Dependencies({ &cfg }). if (cfg.target == "x86_64-pc-linux-gnu") { cfg.AddTest("ShouldEchoWebTransport").Dependencies({ &cfg }); + cfg.AddTest("ShouldNotDropEarlyStreams").Dependencies({ &cfg }); cfg.AddTest("ShouldSend").Dependencies({ &cfg }); cfg.AddTest("ShouldSendRecieveHTTP").Dependencies({ &cfg }); cfg.AddTest("ShouldSendRecieveKeepaliveHTTP").Dependencies({ &cfg }); diff --git a/tests/ShouldEchoWebTransport/main.cpp b/tests/ShouldEchoWebTransport/main.cpp index d1daa22..3b38b4c 100644 --- a/tests/ShouldEchoWebTransport/main.cpp +++ b/tests/ShouldEchoWebTransport/main.cpp @@ -77,7 +77,7 @@ int main() { ThreadPool::Start(); constexpr std::string_view kPayload = "hello-webtransport"; - constexpr std::uint16_t kPort = 8083; + constexpr std::uint16_t kPort = 8085; // ── Server ──────────────────────────────────────────────────────── QUICServerCredentials serverCreds; diff --git a/tests/ShouldNotDropEarlyStreams/main.cpp b/tests/ShouldNotDropEarlyStreams/main.cpp new file mode 100644 index 0000000..be80c80 --- /dev/null +++ b/tests/ShouldNotDropEarlyStreams/main.cpp @@ -0,0 +1,183 @@ +/* +Crafter® Build +Copyright (C) 2026 Catcrafts® +Catcrafts.net + +This library is free software; you can redistribute it and/or +modify it under the terms of the GNU Lesser General Public +License version 3.0 as published by the Free Software Foundation; + +This library is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public +License along with this library; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +// Regression test for the dropped-early-stream race in ListenerHTTP/ListenerQUIC. +// +// The server's connection callback (which receives PEER_STREAM_STARTED) is +// installed in the listener's NEW_CONNECTION handler, but the per-connection +// onConnect setup (which installs the OnStream handler + sends the server's +// SETTINGS) is dispatched to the Crafter ThreadPool. An HTTP/3 peer opens its +// control + request streams the instant the QUIC handshake completes — i.e. +// potentially BEFORE onConnect has run. +// +// The bug: the original code installed only a no-op bootstrap connection +// callback in NEW_CONNECTION and deferred the *real* callback to the same +// ThreadPool task as onConnect. Any PEER_STREAM_STARTED that arrived before +// that task ran was delivered to the bootstrap and silently dropped — the +// stream was never adopted, so the request was never answered. With Chromium +// (which opens streams aggressively right after the handshake) this surfaced +// as an intermittent "WebTransport connection rejected" that cleared on retry. +// +// This test forces that window deterministically: it SATURATES the ThreadPool +// before connecting, so onConnect cannot run until we release it. Meanwhile the +// client opens its request stream immediately. The connection callback runs on +// the msquic thread (not the ThreadPool), so with the fix it still buffers the +// early stream; on the buggy build the bootstrap drops it. After we release the +// pool, onConnect drains the buffer and the request is answered — or, on the +// buggy build, the request stream is gone and the read below hangs until the +// watchdog fails the test. + +import Crafter.Network; +import Crafter.Thread; +import std; +using namespace Crafter; + +namespace { + // Read one HTTP/3 frame off `stream` → (frameType, payload). Unconsumed + // tail bytes are PrependReceived'd back. (Same helper shape as the + // ShouldEchoWebTransport test.) + std::pair> ReadFrame(QUICStream& stream) { + std::vector buf; + std::uint64_t type = 0; std::size_t cn = 0; + while (true) { + const auto* p = reinterpret_cast(buf.data()); + if (HTTP3::DecodeVarint(p, buf.size(), type, cn)) break; + auto chunk = stream.RecieveSync(); + buf.insert(buf.end(), chunk.begin(), chunk.end()); + } + buf = std::vector(buf.begin() + cn, buf.end()); + + std::uint64_t len = 0; std::size_t lc = 0; + while (true) { + const auto* p = reinterpret_cast(buf.data()); + if (HTTP3::DecodeVarint(p, buf.size(), len, lc)) break; + auto chunk = stream.RecieveSync(); + buf.insert(buf.end(), chunk.begin(), chunk.end()); + } + buf = std::vector(buf.begin() + lc, buf.end()); + + while (buf.size() < len) { + auto chunk = stream.RecieveSync(); + buf.insert(buf.end(), chunk.begin(), chunk.end()); + } + std::vector payload(buf.begin(), buf.begin() + len); + std::vector tail(buf.begin() + len, buf.end()); + if (!tail.empty()) stream.PrependReceived(std::move(tail)); + return {type, std::move(payload)}; + } +} + +int main() { + ThreadPool::Start(); + + constexpr std::uint16_t kPort = 8086; + + // Watchdog: on the buggy build the request stream is dropped and the + // response read below blocks forever. Fail the test instead of hanging + // the whole suite. + std::thread watchdog([] { + std::this_thread::sleep_for(std::chrono::seconds(8)); + std::println("timed out waiting for response — early request stream was dropped"); + std::cout.flush(); + std::_Exit(1); + }); + watchdog.detach(); + + // ── Server: a single HTTP route. ───────────────────────────────────── + QUICServerCredentials serverCreds; + serverCreds.selfSigned = true; + std::unordered_map> routes = { + {"/", [](const HTTPRequest&) { return CreateResponseHTTP("200", "ok"); }}, + }; + ListenerAsyncHTTP listener(kPort, serverCreds, std::move(routes)); + + // ── Saturate the ThreadPool so the server's onConnect can't run yet. ── + // The connection callback (PEER_STREAM_STARTED) runs on the msquic thread + // and is unaffected; only onConnect (OnStream install + SETTINGS) is gated. + std::atomic release{false}; + std::atomic parked{0}; + for (int i = 0; i < 256; ++i) { + ThreadPool::Enqueue([&] { + parked.fetch_add(1); + while (!release.load()) std::this_thread::sleep_for(std::chrono::milliseconds(1)); + }); + } + // Give the pool a moment to actually pick up and block on those tasks. + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + try { + // ── Client: open the request stream immediately after the handshake, + // i.e. while onConnect is still gated behind the saturated pool. ──── + QUICClientCredentials clientCreds; + clientCreds.insecureNoServerValidation = true; + ClientQUIC quic("localhost", kPort, std::string(HTTP3::kAlpn), clientCreds); + + // Drain peer-initiated (server) unidi streams once they appear. + quic.OnStream([](QUICStream stream) { + try { while (true) (void)stream.RecieveSync(); } catch (...) {} + }); + + // Client control stream + SETTINGS. + QUICStream control = quic.OpenStream(/*unidirectional=*/true); + auto prelude = HTTP3::BuildControlStreamPrelude(); + control.SendSync(prelude.data(), static_cast(prelude.size()), + /*finish=*/false); + + // GET / on a fresh bidi request stream, FIN immediately. + QUICStream req = quic.OpenStream(/*unidirectional=*/false); + std::vector> fields = { + {":method", "GET"}, + {":scheme", "https"}, + {":authority", "localhost"}, + {":path", "/"}, + }; + auto headerPayload = HTTP3::EncodeFieldSection(fields); + std::vector wire; + HTTP3::WriteFrame(wire, HTTP3::kFrameHeaders, headerPayload.data(), headerPayload.size()); + req.SendSync(wire.data(), static_cast(wire.size()), /*finish=*/true); + + // Let the early streams reach the server and be buffered (fix) or + // dropped (bug) while onConnect is still gated. + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + // Now release the pool — onConnect runs, and with the fix it drains the + // buffered request stream and answers it. + release.store(true); + + // Read the response HEADERS frame and check the status. + auto [respType, respPayload] = ReadFrame(req); + if (respType != HTTP3::kFrameHeaders) { + std::println("expected HEADERS response frame, got type {}", respType); + return 1; + } + auto respFields = HTTP3::DecodeFieldSection( + reinterpret_cast(respPayload.data()), respPayload.size()); + std::string status; + for (auto& [k, v] : respFields) if (k == ":status") status = v; + if (status != "200") { + std::println("expected status 200, got '{}'", status); + return 1; + } + std::_Exit(0); + } catch (std::exception& e) { + release.store(true); + std::println("client failed: {}", e.what()); + return 1; + } +}