fix(listener): stop dropping a peer's streams that arrive during connection setup
ListenerQUIC installed only a no-op bootstrap connection callback in the NEW_CONNECTION handler and deferred the real ClientQUIC callback to the ThreadPool, alongside per-connection onConnect setup. An HTTP/3 peer (notably Chromium) opens its control + QPACK + request streams the instant the QUIC handshake completes — potentially before that deferred task ran. Those early PEER_STREAM_STARTED events were delivered to the bootstrap and silently dropped, so the session never completed. Over the network this surfaced as an intermittent "WebTransport connection rejected" that cleared on retry. Construct the ClientQUIC (and thus install its real connection callback) synchronously inside NEW_CONNECTION, before the handler returns and before msquic delivers any further events. pendingAccepted now holds the constructed ClientQUIC*; the accept loops just dispatch it, and the destructor cleans up any peer accepted but never dispatched. Also park WT data streams that arrive before their CONNECT session is registered (the stream demux races the CONNECT handler) and drain them on registration, instead of dropping them. Tests: - New ShouldNotDropEarlyStreams reproduces the race deterministically by saturating the ThreadPool so onConnect is gated while the client opens its request stream; fails on the pre-fix build, passes after. - Give ShouldEchoWebTransport its own port (8085) so it no longer collides with ShouldSendRecieveKeepaliveHTTP (8083) under the parallel test runner. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
f5f142f993
commit
43fdd7fb53
5 changed files with 259 additions and 24 deletions
|
|
@ -156,6 +156,13 @@ struct PeerState {
|
|||
// Session id == CONNECT stream's QUIC stream id, supplied by the peer
|
||||
// as the first varint of every WT data stream.
|
||||
std::unordered_map<std::uint64_t, std::unique_ptr<WTSessionEntry>> wtSessions;
|
||||
// WT data streams that arrived before their CONNECT session was registered.
|
||||
// The stream demux can race the CONNECT handler (both run on the pool), so
|
||||
// a data stream's leading session-id varint may name a session that doesn't
|
||||
// exist *yet*. Park such streams here keyed by session id; the CONNECT
|
||||
// handler drains them the moment it registers the session. Both paths
|
||||
// synchronise on wtMtx, so there's no lost-wakeup window.
|
||||
std::unordered_map<std::uint64_t, std::vector<QUICStream>> pendingWtStreams;
|
||||
};
|
||||
|
||||
struct ListenerHTTP::Impl {
|
||||
|
|
@ -189,17 +196,25 @@ namespace {
|
|||
// ── WT bidi data stream — second varint is session id.
|
||||
std::uint64_t sessionId = ReadVarintFromStream(stream, peeked, cursor);
|
||||
std::vector<char> remaining(peeked.begin() + cursor, peeked.end());
|
||||
// Put the bytes we read past the session id back so the
|
||||
// stream is ready to hand off as-is, whether we deliver it
|
||||
// now or park it for the CONNECT handler to drain later.
|
||||
if (!remaining.empty()) {
|
||||
stream.PrependReceived(std::move(remaining));
|
||||
}
|
||||
|
||||
WebTransportSession* session = nullptr;
|
||||
{
|
||||
std::lock_guard lk(peerState->wtMtx);
|
||||
auto it = peerState->wtSessions.find(sessionId);
|
||||
if (it == peerState->wtSessions.end()) return;
|
||||
if (it == peerState->wtSessions.end()) {
|
||||
// CONNECT for this session hasn't registered yet —
|
||||
// park rather than drop. Drained on registration.
|
||||
peerState->pendingWtStreams[sessionId].push_back(std::move(stream));
|
||||
return;
|
||||
}
|
||||
session = it->second->session.get();
|
||||
}
|
||||
if (!remaining.empty()) {
|
||||
stream.PrependReceived(std::move(remaining));
|
||||
}
|
||||
WebTransportDeliverStream(*session, std::move(stream));
|
||||
return;
|
||||
}
|
||||
|
|
@ -267,9 +282,21 @@ namespace {
|
|||
std::move(stream),
|
||||
sessionId,
|
||||
request.path);
|
||||
std::vector<QUICStream> parked;
|
||||
{
|
||||
std::lock_guard lk(peerState->wtMtx);
|
||||
peerState->wtSessions.emplace(sessionId, std::move(entry));
|
||||
// Collect any data streams that arrived before us.
|
||||
auto pit = peerState->pendingWtStreams.find(sessionId);
|
||||
if (pit != peerState->pendingWtStreams.end()) {
|
||||
parked = std::move(pit->second);
|
||||
peerState->pendingWtStreams.erase(pit);
|
||||
}
|
||||
}
|
||||
// Deliver parked streams (in arrival order) now that the
|
||||
// session exists — outside the lock, as delivery may block.
|
||||
for (auto& parkedStream : parked) {
|
||||
WebTransportDeliverStream(*sessionPtr, std::move(parkedStream));
|
||||
}
|
||||
auto handler = wtIt->second;
|
||||
ThreadPool::Enqueue([handler, sessionPtr]{
|
||||
|
|
|
|||
|
|
@ -171,10 +171,12 @@ struct ListenerQUIC::Impl {
|
|||
ListenerQUIC* outer = nullptr;
|
||||
|
||||
// Backlog used by ListenSync* methods to convert msquic's callback model
|
||||
// to a blocking accept(2)-style loop.
|
||||
// to a blocking accept(2)-style loop. Entries are fully-constructed
|
||||
// ClientQUIC objects (see NEW_CONNECTION below) — ownership passes to the
|
||||
// user's connectCallback when the accept loop dispatches them.
|
||||
std::mutex mtx;
|
||||
std::condition_variable cv;
|
||||
std::deque<HQUIC> pendingAccepted;
|
||||
std::deque<ClientQUIC*> pendingAccepted;
|
||||
bool stopRequested = false;
|
||||
|
||||
// Holds the std::thread spawned by ListenAsync* so the destructor can
|
||||
|
|
@ -182,28 +184,42 @@ struct ListenerQUIC::Impl {
|
|||
// by raw pointer; running it past the destructor is a use-after-free).
|
||||
std::thread acceptLoop;
|
||||
|
||||
static QUIC_STATUS QUIC_API ConnectionCallbackBootstrap(HQUIC, void*, QUIC_CONNECTION_EVENT*) {
|
||||
// Real callbacks are installed by ClientQUIC's constructor for the
|
||||
// server-side branch. This stub exists only for the brief window
|
||||
// between NEW_CONNECTION and ConnectionSetConfiguration.
|
||||
return QUIC_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
static QUIC_STATUS QUIC_API Callback(HQUIC, void* ctx, QUIC_LISTENER_EVENT* ev) {
|
||||
auto* self = static_cast<Impl*>(ctx);
|
||||
switch (ev->Type) {
|
||||
case QUIC_LISTENER_EVENT_NEW_CONNECTION: {
|
||||
HQUIC conn = ev->NEW_CONNECTION.Connection;
|
||||
Runtime().api->SetCallbackHandler(conn,
|
||||
reinterpret_cast<void*>(&ConnectionCallbackBootstrap), nullptr);
|
||||
|
||||
// Construct the ClientQUIC *here*, on the listener-callback
|
||||
// thread, so its real connection-callback handler is installed
|
||||
// before this handler returns — i.e. before msquic delivers any
|
||||
// further events for the connection (handshake completion and,
|
||||
// crucially, the peer's first PEER_STREAM_STARTED events: an
|
||||
// HTTP/3 peer opens its control + QPACK streams the instant the
|
||||
// handshake finishes).
|
||||
//
|
||||
// Previously the real handler was deferred to the accept loop /
|
||||
// ThreadPool while a no-op bootstrap stub held the connection.
|
||||
// On a fast (e.g. loopback) path the handshake and the peer's
|
||||
// early streams could arrive before the deferred construction
|
||||
// ran, so those PEER_STREAM_STARTED events were silently
|
||||
// dropped — intermittently breaking WebTransport/HTTP3 session
|
||||
// establishment in a way that "cleared" on retry.
|
||||
//
|
||||
// msquic's documented order is SetCallbackHandler (done by the
|
||||
// ClientQUIC ctor) then ConnectionSetConfiguration; no events
|
||||
// are delivered until this listener callback returns, so doing
|
||||
// both synchronously here closes the window entirely.
|
||||
ClientQUIC* peer = new ClientQUIC(conn, self->configuration,
|
||||
self->outer->alpn);
|
||||
QUIC_STATUS s = Runtime().api->ConnectionSetConfiguration(conn, self->configuration);
|
||||
if (QUIC_FAILED(s)) {
|
||||
Runtime().api->ConnectionClose(conn);
|
||||
delete peer; // ~ClientQUIC shuts down + closes the conn
|
||||
return s;
|
||||
}
|
||||
{
|
||||
std::lock_guard lk(self->mtx);
|
||||
self->pendingAccepted.push_back(conn);
|
||||
self->pendingAccepted.push_back(peer);
|
||||
}
|
||||
self->cv.notify_all();
|
||||
return QUIC_STATUS_SUCCESS;
|
||||
|
|
@ -318,6 +334,14 @@ ListenerQUIC::~ListenerQUIC() {
|
|||
if (!impl) return;
|
||||
Stop();
|
||||
if (impl->acceptLoop.joinable()) impl->acceptLoop.join();
|
||||
// Drop any connections accepted by the NEW_CONNECTION handler but never
|
||||
// dispatched to connectCallback (we stopped before the accept loop drained
|
||||
// them). ~ClientQUIC shuts down + closes each one.
|
||||
{
|
||||
std::lock_guard lk(impl->mtx);
|
||||
for (ClientQUIC* peer : impl->pendingAccepted) delete peer;
|
||||
impl->pendingAccepted.clear();
|
||||
}
|
||||
if (impl->listener) {
|
||||
Runtime().api->ListenerClose(impl->listener);
|
||||
impl->listener = nullptr;
|
||||
|
|
@ -343,10 +367,12 @@ void ListenerQUIC::ListenSyncSync() {
|
|||
std::unique_lock lk(impl->mtx);
|
||||
impl->cv.wait(lk, [&]{ return !impl->pendingAccepted.empty() || impl->stopRequested; });
|
||||
if (impl->stopRequested) return;
|
||||
HQUIC conn = impl->pendingAccepted.front();
|
||||
// The ClientQUIC is already constructed (with its real callback
|
||||
// handler installed) by the NEW_CONNECTION handler; just hand it off.
|
||||
ClientQUIC* peer = impl->pendingAccepted.front();
|
||||
impl->pendingAccepted.pop_front();
|
||||
lk.unlock();
|
||||
connectCallback(new ClientQUIC(conn, impl->configuration, alpn));
|
||||
connectCallback(peer);
|
||||
++totalClientCounter;
|
||||
}
|
||||
}
|
||||
|
|
@ -356,13 +382,11 @@ void ListenerQUIC::ListenSyncAsync() {
|
|||
std::unique_lock lk(impl->mtx);
|
||||
impl->cv.wait(lk, [&]{ return !impl->pendingAccepted.empty() || impl->stopRequested; });
|
||||
if (impl->stopRequested) return;
|
||||
HQUIC conn = impl->pendingAccepted.front();
|
||||
ClientQUIC* peer = impl->pendingAccepted.front();
|
||||
impl->pendingAccepted.pop_front();
|
||||
lk.unlock();
|
||||
std::string a = alpn;
|
||||
HQUIC cfg = impl->configuration;
|
||||
auto cb = connectCallback;
|
||||
ThreadPool::Enqueue([conn, cfg, a, cb]{ cb(new ClientQUIC(conn, cfg, a)); });
|
||||
ThreadPool::Enqueue([peer, cb]{ cb(peer); });
|
||||
++totalClientCounter;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -111,6 +111,7 @@ extern "C" Configuration CrafterBuildProject(std::span<const std::string_view> a
|
|||
// crafter-network static lib via .Dependencies({ &cfg }).
|
||||
if (cfg.target == "x86_64-pc-linux-gnu") {
|
||||
cfg.AddTest("ShouldEchoWebTransport").Dependencies({ &cfg });
|
||||
cfg.AddTest("ShouldNotDropEarlyStreams").Dependencies({ &cfg });
|
||||
cfg.AddTest("ShouldSend").Dependencies({ &cfg });
|
||||
cfg.AddTest("ShouldSendRecieveHTTP").Dependencies({ &cfg });
|
||||
cfg.AddTest("ShouldSendRecieveKeepaliveHTTP").Dependencies({ &cfg });
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ int main() {
|
|||
ThreadPool::Start();
|
||||
|
||||
constexpr std::string_view kPayload = "hello-webtransport";
|
||||
constexpr std::uint16_t kPort = 8083;
|
||||
constexpr std::uint16_t kPort = 8085;
|
||||
|
||||
// ── Server ────────────────────────────────────────────────────────
|
||||
QUICServerCredentials serverCreds;
|
||||
|
|
|
|||
183
tests/ShouldNotDropEarlyStreams/main.cpp
Normal file
183
tests/ShouldNotDropEarlyStreams/main.cpp
Normal file
|
|
@ -0,0 +1,183 @@
|
|||
/*
|
||||
Crafter® Build
|
||||
Copyright (C) 2026 Catcrafts®
|
||||
Catcrafts.net
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU Lesser General Public
|
||||
License version 3.0 as published by the Free Software Foundation;
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public
|
||||
License along with this library; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
||||
*/
|
||||
|
||||
// Regression test for the dropped-early-stream race in ListenerHTTP/ListenerQUIC.
|
||||
//
|
||||
// The server's connection callback (which receives PEER_STREAM_STARTED) is
|
||||
// installed in the listener's NEW_CONNECTION handler, but the per-connection
|
||||
// onConnect setup (which installs the OnStream handler + sends the server's
|
||||
// SETTINGS) is dispatched to the Crafter ThreadPool. An HTTP/3 peer opens its
|
||||
// control + request streams the instant the QUIC handshake completes — i.e.
|
||||
// potentially BEFORE onConnect has run.
|
||||
//
|
||||
// The bug: the original code installed only a no-op bootstrap connection
|
||||
// callback in NEW_CONNECTION and deferred the *real* callback to the same
|
||||
// ThreadPool task as onConnect. Any PEER_STREAM_STARTED that arrived before
|
||||
// that task ran was delivered to the bootstrap and silently dropped — the
|
||||
// stream was never adopted, so the request was never answered. With Chromium
|
||||
// (which opens streams aggressively right after the handshake) this surfaced
|
||||
// as an intermittent "WebTransport connection rejected" that cleared on retry.
|
||||
//
|
||||
// This test forces that window deterministically: it SATURATES the ThreadPool
|
||||
// before connecting, so onConnect cannot run until we release it. Meanwhile the
|
||||
// client opens its request stream immediately. The connection callback runs on
|
||||
// the msquic thread (not the ThreadPool), so with the fix it still buffers the
|
||||
// early stream; on the buggy build the bootstrap drops it. After we release the
|
||||
// pool, onConnect drains the buffer and the request is answered — or, on the
|
||||
// buggy build, the request stream is gone and the read below hangs until the
|
||||
// watchdog fails the test.
|
||||
|
||||
import Crafter.Network;
|
||||
import Crafter.Thread;
|
||||
import std;
|
||||
using namespace Crafter;
|
||||
|
||||
namespace {
|
||||
// Read one HTTP/3 frame off `stream` → (frameType, payload). Unconsumed
|
||||
// tail bytes are PrependReceived'd back. (Same helper shape as the
|
||||
// ShouldEchoWebTransport test.)
|
||||
std::pair<std::uint64_t, std::vector<char>> ReadFrame(QUICStream& stream) {
|
||||
std::vector<char> buf;
|
||||
std::uint64_t type = 0; std::size_t cn = 0;
|
||||
while (true) {
|
||||
const auto* p = reinterpret_cast<const std::uint8_t*>(buf.data());
|
||||
if (HTTP3::DecodeVarint(p, buf.size(), type, cn)) break;
|
||||
auto chunk = stream.RecieveSync();
|
||||
buf.insert(buf.end(), chunk.begin(), chunk.end());
|
||||
}
|
||||
buf = std::vector<char>(buf.begin() + cn, buf.end());
|
||||
|
||||
std::uint64_t len = 0; std::size_t lc = 0;
|
||||
while (true) {
|
||||
const auto* p = reinterpret_cast<const std::uint8_t*>(buf.data());
|
||||
if (HTTP3::DecodeVarint(p, buf.size(), len, lc)) break;
|
||||
auto chunk = stream.RecieveSync();
|
||||
buf.insert(buf.end(), chunk.begin(), chunk.end());
|
||||
}
|
||||
buf = std::vector<char>(buf.begin() + lc, buf.end());
|
||||
|
||||
while (buf.size() < len) {
|
||||
auto chunk = stream.RecieveSync();
|
||||
buf.insert(buf.end(), chunk.begin(), chunk.end());
|
||||
}
|
||||
std::vector<char> payload(buf.begin(), buf.begin() + len);
|
||||
std::vector<char> tail(buf.begin() + len, buf.end());
|
||||
if (!tail.empty()) stream.PrependReceived(std::move(tail));
|
||||
return {type, std::move(payload)};
|
||||
}
|
||||
}
|
||||
|
||||
int main() {
|
||||
ThreadPool::Start();
|
||||
|
||||
constexpr std::uint16_t kPort = 8086;
|
||||
|
||||
// Watchdog: on the buggy build the request stream is dropped and the
|
||||
// response read below blocks forever. Fail the test instead of hanging
|
||||
// the whole suite.
|
||||
std::thread watchdog([] {
|
||||
std::this_thread::sleep_for(std::chrono::seconds(8));
|
||||
std::println("timed out waiting for response — early request stream was dropped");
|
||||
std::cout.flush();
|
||||
std::_Exit(1);
|
||||
});
|
||||
watchdog.detach();
|
||||
|
||||
// ── Server: a single HTTP route. ─────────────────────────────────────
|
||||
QUICServerCredentials serverCreds;
|
||||
serverCreds.selfSigned = true;
|
||||
std::unordered_map<std::string, std::function<HTTPResponse(const HTTPRequest&)>> routes = {
|
||||
{"/", [](const HTTPRequest&) { return CreateResponseHTTP("200", "ok"); }},
|
||||
};
|
||||
ListenerAsyncHTTP listener(kPort, serverCreds, std::move(routes));
|
||||
|
||||
// ── Saturate the ThreadPool so the server's onConnect can't run yet. ──
|
||||
// The connection callback (PEER_STREAM_STARTED) runs on the msquic thread
|
||||
// and is unaffected; only onConnect (OnStream install + SETTINGS) is gated.
|
||||
std::atomic<bool> release{false};
|
||||
std::atomic<int> parked{0};
|
||||
for (int i = 0; i < 256; ++i) {
|
||||
ThreadPool::Enqueue([&] {
|
||||
parked.fetch_add(1);
|
||||
while (!release.load()) std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||
});
|
||||
}
|
||||
// Give the pool a moment to actually pick up and block on those tasks.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
|
||||
try {
|
||||
// ── Client: open the request stream immediately after the handshake,
|
||||
// i.e. while onConnect is still gated behind the saturated pool. ────
|
||||
QUICClientCredentials clientCreds;
|
||||
clientCreds.insecureNoServerValidation = true;
|
||||
ClientQUIC quic("localhost", kPort, std::string(HTTP3::kAlpn), clientCreds);
|
||||
|
||||
// Drain peer-initiated (server) unidi streams once they appear.
|
||||
quic.OnStream([](QUICStream stream) {
|
||||
try { while (true) (void)stream.RecieveSync(); } catch (...) {}
|
||||
});
|
||||
|
||||
// Client control stream + SETTINGS.
|
||||
QUICStream control = quic.OpenStream(/*unidirectional=*/true);
|
||||
auto prelude = HTTP3::BuildControlStreamPrelude();
|
||||
control.SendSync(prelude.data(), static_cast<std::uint32_t>(prelude.size()),
|
||||
/*finish=*/false);
|
||||
|
||||
// GET / on a fresh bidi request stream, FIN immediately.
|
||||
QUICStream req = quic.OpenStream(/*unidirectional=*/false);
|
||||
std::vector<std::pair<std::string, std::string>> fields = {
|
||||
{":method", "GET"},
|
||||
{":scheme", "https"},
|
||||
{":authority", "localhost"},
|
||||
{":path", "/"},
|
||||
};
|
||||
auto headerPayload = HTTP3::EncodeFieldSection(fields);
|
||||
std::vector<std::uint8_t> wire;
|
||||
HTTP3::WriteFrame(wire, HTTP3::kFrameHeaders, headerPayload.data(), headerPayload.size());
|
||||
req.SendSync(wire.data(), static_cast<std::uint32_t>(wire.size()), /*finish=*/true);
|
||||
|
||||
// Let the early streams reach the server and be buffered (fix) or
|
||||
// dropped (bug) while onConnect is still gated.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
|
||||
// Now release the pool — onConnect runs, and with the fix it drains the
|
||||
// buffered request stream and answers it.
|
||||
release.store(true);
|
||||
|
||||
// Read the response HEADERS frame and check the status.
|
||||
auto [respType, respPayload] = ReadFrame(req);
|
||||
if (respType != HTTP3::kFrameHeaders) {
|
||||
std::println("expected HEADERS response frame, got type {}", respType);
|
||||
return 1;
|
||||
}
|
||||
auto respFields = HTTP3::DecodeFieldSection(
|
||||
reinterpret_cast<const std::uint8_t*>(respPayload.data()), respPayload.size());
|
||||
std::string status;
|
||||
for (auto& [k, v] : respFields) if (k == ":status") status = v;
|
||||
if (status != "200") {
|
||||
std::println("expected status 200, got '{}'", status);
|
||||
return 1;
|
||||
}
|
||||
std::_Exit(0);
|
||||
} catch (std::exception& e) {
|
||||
release.store(true);
|
||||
std::println("client failed: {}", e.what());
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue