ListenerQUIC installed only a no-op bootstrap connection callback in the NEW_CONNECTION handler and deferred the real ClientQUIC callback to the ThreadPool, alongside per-connection onConnect setup. An HTTP/3 peer (notably Chromium) opens its control + QPACK + request streams the instant the QUIC handshake completes — potentially before that deferred task ran. Those early PEER_STREAM_STARTED events were delivered to the bootstrap and silently dropped, so the session never completed. Over the network this surfaced as an intermittent "WebTransport connection rejected" that cleared on retry. Construct the ClientQUIC (and thus install its real connection callback) synchronously inside NEW_CONNECTION, before the handler returns and before msquic delivers any further events. pendingAccepted now holds the constructed ClientQUIC*; the accept loops just dispatch it, and the destructor cleans up any peer accepted but never dispatched. Also park WT data streams that arrive before their CONNECT session is registered (the stream demux races the CONNECT handler) and drain them on registration, instead of dropping them. Tests: - New ShouldNotDropEarlyStreams reproduces the race deterministically by saturating the ThreadPool so onConnect is gated while the client opens its request stream; fails on the pre-fix build, passes after. - Give ShouldEchoWebTransport its own port (8085) so it no longer collides with ShouldSendRecieveKeepaliveHTTP (8083) under the parallel test runner. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
182 lines
7.9 KiB
C++
182 lines
7.9 KiB
C++
/*
|
|
Crafter® Build
|
|
Copyright (C) 2026 Catcrafts®
|
|
Catcrafts.net
|
|
|
|
This library is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU Lesser General Public
|
|
License version 3.0 as published by the Free Software Foundation;
|
|
|
|
This library is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
Lesser General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Lesser General Public
|
|
License along with this library; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
*/
|
|
|
|
// End-to-end WebTransport echo. Spins up a ListenerHTTP with a wtRoutes
|
|
// handler that echoes back whatever the peer sent on each new bidi
|
|
// stream, then drives a hand-rolled client side using raw ClientQUIC +
|
|
// HTTP3 framing (there's no ClientWebTransport class yet; that's Phase 4).
|
|
// Verifies:
|
|
// - extended CONNECT is accepted, 200 OK delivered without FIN
|
|
// - WT_STREAM bidi framing parses correctly on both sides
|
|
// - echoed payload round-trips byte-for-byte
|
|
|
|
import Crafter.Network;
|
|
import Crafter.Thread;
|
|
import std;
|
|
using namespace Crafter;
|
|
|
|
namespace {
|
|
// Helper: read one HTTP/3 frame off `stream` into a freshly-allocated
|
|
// buffer. Returns (frameType, payload). The peeked-but-unconsumed
|
|
// tail bytes (e.g. start of the next frame) are PrependReceived'd
|
|
// back onto the stream.
|
|
std::pair<std::uint64_t, std::vector<char>> ReadFrame(QUICStream& stream) {
|
|
std::vector<char> buf;
|
|
// First varint = frame type.
|
|
std::uint64_t type = 0; std::size_t cn = 0;
|
|
while (true) {
|
|
const auto* p = reinterpret_cast<const std::uint8_t*>(buf.data());
|
|
if (HTTP3::DecodeVarint(p, buf.size(), type, cn)) break;
|
|
auto chunk = stream.RecieveSync();
|
|
buf.insert(buf.end(), chunk.begin(), chunk.end());
|
|
}
|
|
std::vector<char> afterType(buf.begin() + cn, buf.end());
|
|
buf = std::move(afterType);
|
|
|
|
// Second varint = frame length.
|
|
std::uint64_t len = 0; std::size_t lc = 0;
|
|
while (true) {
|
|
const auto* p = reinterpret_cast<const std::uint8_t*>(buf.data());
|
|
if (HTTP3::DecodeVarint(p, buf.size(), len, lc)) break;
|
|
auto chunk = stream.RecieveSync();
|
|
buf.insert(buf.end(), chunk.begin(), chunk.end());
|
|
}
|
|
std::vector<char> afterLen(buf.begin() + lc, buf.end());
|
|
buf = std::move(afterLen);
|
|
|
|
// Read enough for the full payload.
|
|
while (buf.size() < len) {
|
|
auto chunk = stream.RecieveSync();
|
|
buf.insert(buf.end(), chunk.begin(), chunk.end());
|
|
}
|
|
|
|
std::vector<char> payload(buf.begin(), buf.begin() + len);
|
|
std::vector<char> tail(buf.begin() + len, buf.end());
|
|
if (!tail.empty()) stream.PrependReceived(std::move(tail));
|
|
return {type, std::move(payload)};
|
|
}
|
|
}
|
|
|
|
int main() {
|
|
ThreadPool::Start();
|
|
|
|
constexpr std::string_view kPayload = "hello-webtransport";
|
|
constexpr std::uint16_t kPort = 8085;
|
|
|
|
// ── Server ────────────────────────────────────────────────────────
|
|
QUICServerCredentials serverCreds;
|
|
serverCreds.selfSigned = true;
|
|
|
|
std::unordered_map<std::string, std::function<HTTPResponse(const HTTPRequest&)>> httpRoutes = {};
|
|
std::unordered_map<std::string, std::function<void(WebTransportSession&)>> wtRoutes = {
|
|
{"/echo", [](WebTransportSession& session) {
|
|
session.OnStream([](QUICStream peerStream) {
|
|
try {
|
|
auto bytes = peerStream.RecieveUntilCloseSync();
|
|
peerStream.SendSync(bytes.data(),
|
|
static_cast<std::uint32_t>(bytes.size()),
|
|
/*finish=*/true);
|
|
} catch (...) {}
|
|
});
|
|
}},
|
|
};
|
|
ListenerAsyncHTTP listener(kPort, serverCreds, std::move(httpRoutes), std::move(wtRoutes));
|
|
|
|
try {
|
|
// ── Client (hand-rolled WT bring-up over raw ClientQUIC) ──────
|
|
QUICClientCredentials clientCreds;
|
|
clientCreds.insecureNoServerValidation = true;
|
|
ClientQUIC quic("localhost", kPort, std::string(HTTP3::kAlpn), clientCreds);
|
|
|
|
// Drain peer-initiated unidi streams (its control + QPACK streams).
|
|
// Without this they'd back up and msquic might abort the connection.
|
|
quic.OnStream([](QUICStream stream) {
|
|
try { while (true) (void)stream.RecieveSync(); } catch (...) {}
|
|
});
|
|
|
|
// Our outgoing control stream + WT-aware SETTINGS prelude.
|
|
QUICStream controlStream = quic.OpenStream(/*unidirectional=*/true);
|
|
auto prelude = HTTP3::BuildWebTransportControlStreamPrelude(/*maxSessions=*/1);
|
|
controlStream.SendSync(prelude.data(),
|
|
static_cast<std::uint32_t>(prelude.size()),
|
|
/*finish=*/false);
|
|
|
|
// CONNECT request stream. Send HEADERS, do NOT FIN — the stream
|
|
// is the session-control stream and stays open for its lifetime.
|
|
QUICStream connectStream = quic.OpenStream(/*unidirectional=*/false);
|
|
std::vector<std::pair<std::string, std::string>> connectFields = {
|
|
{":method", "CONNECT"},
|
|
{":scheme", "https"},
|
|
{":authority", "localhost"},
|
|
{":path", "/echo"},
|
|
{":protocol", "webtransport"},
|
|
};
|
|
auto headerPayload = HTTP3::EncodeFieldSection(connectFields);
|
|
std::vector<std::uint8_t> connectWire;
|
|
HTTP3::WriteFrame(connectWire, HTTP3::kFrameHeaders,
|
|
headerPayload.data(), headerPayload.size());
|
|
connectStream.SendSync(connectWire.data(),
|
|
static_cast<std::uint32_t>(connectWire.size()),
|
|
/*finish=*/false);
|
|
|
|
// Read the response HEADERS frame.
|
|
auto [respType, respPayload] = ReadFrame(connectStream);
|
|
if (respType != HTTP3::kFrameHeaders) {
|
|
std::println("bad response frame type: {}", respType);
|
|
return 1;
|
|
}
|
|
auto respFields = HTTP3::DecodeFieldSection(
|
|
reinterpret_cast<const std::uint8_t*>(respPayload.data()),
|
|
respPayload.size());
|
|
std::string status;
|
|
for (auto& [k, v] : respFields) if (k == ":status") status = v;
|
|
if (status != "200") {
|
|
std::println("CONNECT rejected with status {}", status);
|
|
return 1;
|
|
}
|
|
|
|
// Session is ready. session_id equals the CONNECT stream's QUIC
|
|
// stream id — same number on both ends of the wire.
|
|
std::uint64_t sessionId = connectStream.GetStreamId();
|
|
|
|
// Open a WT data bidi stream. Prefix: varint(0x41) varint(sessionId).
|
|
QUICStream wtStream = quic.OpenStream(/*unidirectional=*/false);
|
|
auto prefix = HTTP3::BuildWtBidiPrefix(sessionId);
|
|
|
|
std::vector<std::uint8_t> wire;
|
|
wire.insert(wire.end(), prefix.begin(), prefix.end());
|
|
wire.insert(wire.end(), kPayload.begin(), kPayload.end());
|
|
wtStream.SendSync(wire.data(),
|
|
static_cast<std::uint32_t>(wire.size()),
|
|
/*finish=*/true);
|
|
|
|
// Server echoes the payload (the prefix has already been stripped
|
|
// server-side; the bytes we read here are pure echo).
|
|
auto echoed = wtStream.RecieveUntilCloseSync();
|
|
std::string got(echoed.begin(), echoed.end());
|
|
if (got == kPayload) {
|
|
std::_Exit(0);
|
|
}
|
|
std::println("payload mismatch: expected '{}', got '{}'", kPayload, got);
|
|
return 1;
|
|
} catch (std::exception& e) {
|
|
std::println("client failed: {}", e.what());
|
|
return 1;
|
|
}
|
|
}
|