full QUIC support
This commit is contained in:
parent
45479a46ff
commit
28fab2509b
18 changed files with 1334 additions and 645 deletions
|
|
@ -19,190 +19,150 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|||
*/
|
||||
|
||||
module;
|
||||
|
||||
#include <stdio.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <stdlib.h>
|
||||
#include <sys/uio.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/wait.h>
|
||||
|
||||
#include <msquic.h>
|
||||
module Crafter.Network:ClientHTTP_impl;
|
||||
import :ClientHTTP;
|
||||
import :ClientQUIC;
|
||||
import :HTTP;
|
||||
import :HTTP3;
|
||||
import Crafter.Thread;
|
||||
import std;
|
||||
|
||||
using namespace Crafter;
|
||||
|
||||
ClientHTTP::ClientHTTP(const char* host, std::uint16_t port): host(host), port(port), client(host, port) {
|
||||
struct ClientHTTP::Impl {
|
||||
ClientQUIC quic;
|
||||
// Outgoing control stream — RFC 9114 §6.2.1: each peer MUST open a
|
||||
// unidirectional control stream and send a SETTINGS frame as its first
|
||||
// frame. Most real h3 servers (cloudflare, nghttp3, lsquic, …) close
|
||||
// the connection with H3_MISSING_SETTINGS if we don't. The stream
|
||||
// stays open for the lifetime of the connection; we never FIN it.
|
||||
QUICStream controlStream;
|
||||
|
||||
}
|
||||
|
||||
ClientHTTP::ClientHTTP(std::string host, std::uint16_t port): ClientHTTP(host.c_str(), port) {
|
||||
|
||||
}
|
||||
|
||||
HTTPResponse ClientHTTP::Send(const char* request, std::uint32_t length) {
|
||||
std::cout << "Send started" << std::endl;
|
||||
client.Send(request, length);
|
||||
std::cout << "Send Complete" << std::endl;
|
||||
std::vector<char> buffer;
|
||||
HTTPResponse response;
|
||||
std::uint32_t i = 0;
|
||||
std::uint32_t statusStart = 0;
|
||||
while(true) {
|
||||
try {
|
||||
buffer = client.RecieveSync();
|
||||
std::cout << "Recieved: " << buffer.size() << std::endl;
|
||||
} catch(const SocketClosedException& e) {
|
||||
std::cout << "Retry" << std::endl;
|
||||
client.Stop();
|
||||
client.Connect();
|
||||
client.Send(request, length);
|
||||
buffer = client.RecieveSync();
|
||||
std::cout << "Recieved: " << buffer.size() << std::endl;
|
||||
}
|
||||
|
||||
for(; i < buffer.size(); i++) {
|
||||
if(buffer[i] == ' ') {
|
||||
statusStart = i;
|
||||
break;
|
||||
Impl(const char* host, std::uint16_t port, QUICClientCredentials creds)
|
||||
: quic(host, port, std::string(HTTP3::kAlpn), creds) {
|
||||
// Drain any unidi streams the server opens to us (its control
|
||||
// stream + optional QPACK encoder/decoder streams). We don't act
|
||||
// on the contents — SETTINGS we accept by defaults, dynamic-table
|
||||
// mutations we discard since we operate with no dynamic table.
|
||||
// Any bidi stream from the server would be a server push, which
|
||||
// we don't support — best-effort drain it as well.
|
||||
quic.OnStream([](QUICStream stream) {
|
||||
try {
|
||||
while (true) (void)stream.RecieveSync();
|
||||
} catch (...) {
|
||||
// Stream / connection closed. Done.
|
||||
}
|
||||
}
|
||||
for(; i < buffer.size(); i++) {
|
||||
if(buffer[i] == '\r') {
|
||||
response.status.assign(buffer.data()+statusStart+1, i-statusStart-1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
i+=2;
|
||||
while(i < buffer.size()) {
|
||||
std::uint32_t headerStart = i;
|
||||
std::string headerName;
|
||||
for(; i < buffer.size(); i++) {
|
||||
if(buffer[i] == ':') {
|
||||
headerName.assign(buffer.data()+headerStart, i-headerStart);
|
||||
std::transform(headerName.begin(), headerName.end(), headerName.begin(), [](unsigned char c){ return std::tolower(c); });
|
||||
i+=2;
|
||||
break;
|
||||
}
|
||||
}
|
||||
headerStart = i;
|
||||
std::string headerValue;
|
||||
for(; i < buffer.size(); i++) {
|
||||
if(buffer[i] == '\r' && buffer[i+1] == '\n') {
|
||||
headerValue.assign(buffer.data()+headerStart, i-headerStart);
|
||||
response.headers.insert({headerName, headerValue});
|
||||
if(buffer[i+2] == '\r'){
|
||||
goto headersComplete;
|
||||
} else{
|
||||
i+=2;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
i = 0;
|
||||
controlStream = quic.OpenStream(/*unidirectional=*/true);
|
||||
auto prelude = HTTP3::BuildControlStreamPrelude();
|
||||
controlStream.SendSync(prelude.data(),
|
||||
static_cast<std::uint32_t>(prelude.size()),
|
||||
/*finish=*/false);
|
||||
}
|
||||
headersComplete:;
|
||||
std::cout << "Header complete" << std::endl;
|
||||
i+=4;
|
||||
std::unordered_map<std::string, std::string>::iterator it = response.headers.find("content-length");
|
||||
if(it != response.headers.end())
|
||||
{
|
||||
const int lenght = std::stoi(it->second);
|
||||
std::cout << "Content lenght: " << lenght << std::endl;
|
||||
response.body.resize(lenght, 0);
|
||||
if(i < buffer.size()){
|
||||
std::memcpy(&response.body[0], buffer.data()+i, buffer.size()-i);
|
||||
}
|
||||
const int remaining = lenght-(buffer.size()-i);
|
||||
std::cout << "Remain: " << remaining << std::endl;
|
||||
if(remaining > 0){
|
||||
std::vector<char> bodyBuffer = client.RecieveUntilFullSync(remaining);
|
||||
std::memcpy(&response.body[ buffer.size()-i], bodyBuffer.data(), bodyBuffer.size());
|
||||
std::cout << "Recieved: " << bodyBuffer.size() << std::endl;
|
||||
}
|
||||
} else {
|
||||
std::cout << "No Content Lenght" << std::endl;
|
||||
std::unordered_map<std::string, std::string>::iterator it = response.headers.find("transfer-encoding");
|
||||
if(it != response.headers.end() && it->second == "chunked") {
|
||||
std::cout << "Chunked" << std::endl;
|
||||
while(i < buffer.size()){
|
||||
std::string lenght;
|
||||
int lenghtStart = i;
|
||||
for(; i < buffer.size(); i++) {
|
||||
if(buffer[i] == '\r') {
|
||||
lenght.assign(buffer.data()+lenghtStart, i-lenghtStart);
|
||||
break;
|
||||
}
|
||||
}
|
||||
i+=2;
|
||||
int lenghtInt = stoi(lenght, 0, 8);
|
||||
if(lenghtInt != 0){
|
||||
int oldSize = response.body.size();
|
||||
response.body.resize(oldSize+lenghtInt, 0);
|
||||
if(buffer.size() < lenghtInt) {
|
||||
std::memcpy(&response.body[oldSize], buffer.data()+i, buffer.size()-i);
|
||||
std::vector<char> bodyBuffer2 = client.RecieveUntilFullSync(lenghtInt-buffer.size());
|
||||
std::memcpy(&response.body[oldSize+(buffer.size()-i)], buffer.data(), buffer.size());
|
||||
} else {
|
||||
std::memcpy(&response.body[oldSize], buffer.data()+i, lenghtInt);
|
||||
i+=lenghtInt;
|
||||
}
|
||||
} else{
|
||||
goto bodyFinished;
|
||||
}
|
||||
};
|
||||
|
||||
ClientHTTP::ClientHTTP(const char* host, std::uint16_t port, QUICClientCredentials creds)
|
||||
: host(host), port(port), impl(std::make_unique<Impl>(host, port, std::move(creds))) {}
|
||||
|
||||
ClientHTTP::ClientHTTP(std::string host, std::uint16_t port, QUICClientCredentials creds)
|
||||
: ClientHTTP(host.c_str(), port, std::move(creds)) {}
|
||||
|
||||
ClientHTTP::ClientHTTP(ClientHTTP&&) noexcept = default;
|
||||
ClientHTTP::~ClientHTTP() = default;
|
||||
|
||||
namespace {
|
||||
// Parse a sequence of HTTP/3 frames from `bytes`. Populates response from
|
||||
// the first HEADERS frame and concatenates all DATA payloads. Trailing
|
||||
// HEADERS frames (trailers) are decoded but discarded. Throws on
|
||||
// malformed input.
|
||||
HTTPResponse ParseResponseFrames(const std::vector<char>& bytes) {
|
||||
HTTPResponse response;
|
||||
bool sawHeaders = false;
|
||||
std::size_t pos = 0;
|
||||
const auto* p = reinterpret_cast<const std::uint8_t*>(bytes.data());
|
||||
std::size_t avail = bytes.size();
|
||||
|
||||
while (pos < avail) {
|
||||
std::uint64_t frameType = 0, frameLen = 0;
|
||||
std::size_t cn = 0;
|
||||
if (!HTTP3::DecodeVarint(p + pos, avail - pos, frameType, cn)) {
|
||||
throw HTTP3::HTTP3ProtocolError("truncated frame type");
|
||||
}
|
||||
while(true) {
|
||||
std::vector<char> bodyBuffer = client.RecieveSync();
|
||||
int i2 = 0;
|
||||
while(i2 < bodyBuffer.size()){
|
||||
std::string lenght;
|
||||
int lenghtStart = i2;
|
||||
for(; i2 < bodyBuffer.size(); i2++) {
|
||||
if(buffer[i2] == '\r') {
|
||||
lenght.assign(bodyBuffer.data()+lenghtStart, i2-lenghtStart);
|
||||
break;
|
||||
}
|
||||
}
|
||||
i2+=2;
|
||||
int lenghtInt = stoi(lenght, 0, 8);
|
||||
if(lenghtInt != 0){
|
||||
int oldSize = response.body.size();
|
||||
response.body.resize(oldSize+lenghtInt, 0);
|
||||
if(bodyBuffer.size() < lenghtInt) {
|
||||
std::memcpy(&response.body[oldSize], bodyBuffer.data()+i2, bodyBuffer.size()-i2);
|
||||
std::vector<char> bodyBuffer2 = client.RecieveUntilFullSync(lenghtInt-bodyBuffer.size());
|
||||
std::memcpy(&response.body[oldSize+(bodyBuffer.size()-i2)], bodyBuffer2.data(), bodyBuffer2.size());
|
||||
pos += cn;
|
||||
if (!HTTP3::DecodeVarint(p + pos, avail - pos, frameLen, cn)) {
|
||||
throw HTTP3::HTTP3ProtocolError("truncated frame length");
|
||||
}
|
||||
pos += cn;
|
||||
if (pos + frameLen > avail) {
|
||||
throw HTTP3::HTTP3ProtocolError("frame length runs past buffer");
|
||||
}
|
||||
if (frameType == HTTP3::kFrameHeaders) {
|
||||
auto fields = HTTP3::DecodeFieldSection(p + pos, static_cast<std::size_t>(frameLen));
|
||||
if (!sawHeaders) {
|
||||
for (auto& [name, value] : fields) {
|
||||
if (name == ":status") {
|
||||
response.status = std::move(value);
|
||||
} else if (!name.empty() && name[0] == ':') {
|
||||
// Unknown response pseudo-header — ignore.
|
||||
} else {
|
||||
std::memcpy(&response.body[oldSize], bodyBuffer.data()+i2, lenghtInt);
|
||||
i2+=lenghtInt;
|
||||
response.headers.emplace(std::move(name), std::move(value));
|
||||
}
|
||||
} else{
|
||||
goto bodyFinished;
|
||||
}
|
||||
sawHeaders = true;
|
||||
}
|
||||
// Trailer HEADERS frames are skipped; the field section was
|
||||
// already decoded above and the contents discarded.
|
||||
} else if (frameType == HTTP3::kFrameData) {
|
||||
response.body.append(reinterpret_cast<const char*>(p + pos),
|
||||
static_cast<std::size_t>(frameLen));
|
||||
} else {
|
||||
// Unknown frame types are reserved/extensions — RFC 9114 §9
|
||||
// says skip them.
|
||||
}
|
||||
bodyFinished:;
|
||||
} else {
|
||||
std::cout << "Recv until close" << std::endl;
|
||||
std::vector<char> bodyBuffer = client.RecieveUntilCloseSync();
|
||||
response.body.resize((buffer.size()-i)+(bodyBuffer.size()), 0);
|
||||
if(i < buffer.size()){
|
||||
std::memcpy(&response.body[0], buffer.data()+i, buffer.size()-i);
|
||||
}
|
||||
std::memcpy(&response.body[buffer.size()-i], bodyBuffer.data(), bodyBuffer.size());
|
||||
std::cout << "Closed" << std::endl;
|
||||
pos += static_cast<std::size_t>(frameLen);
|
||||
}
|
||||
if (!sawHeaders) {
|
||||
throw HTTP3::HTTP3ProtocolError("response stream had no HEADERS frame");
|
||||
}
|
||||
return response;
|
||||
}
|
||||
std::cout << "Response recieved" << std::endl;
|
||||
return response;
|
||||
}
|
||||
HTTPResponse ClientHTTP::Send(std::string request) {
|
||||
return Send(request.c_str(), request.size());
|
||||
|
||||
HTTPResponse ClientHTTP::Send(const HTTPRequest& request) {
|
||||
QUICStream stream = impl->quic.OpenStream();
|
||||
|
||||
// Pseudo-headers MUST appear before regular fields (RFC 9114 §4.3).
|
||||
std::vector<std::pair<std::string, std::string>> fields;
|
||||
fields.reserve(4 + request.headers.size());
|
||||
fields.emplace_back(":method", request.method.empty() ? std::string("GET") : request.method);
|
||||
fields.emplace_back(":scheme", request.scheme.empty() ? std::string("https") : request.scheme);
|
||||
fields.emplace_back(":authority",
|
||||
request.authority.empty() ? (host + ":" + std::to_string(port)) : request.authority);
|
||||
fields.emplace_back(":path", request.path.empty() ? std::string("/") : request.path);
|
||||
for (const auto& [name, value] : request.headers) {
|
||||
// HTTP/3 forbids uppercase in field names — lowercase defensively.
|
||||
std::string lower = name;
|
||||
std::ranges::transform(lower, lower.begin(),
|
||||
[](unsigned char c){ return static_cast<char>(std::tolower(c)); });
|
||||
fields.emplace_back(std::move(lower), value);
|
||||
}
|
||||
|
||||
auto encoded = HTTP3::EncodeFieldSection(fields);
|
||||
|
||||
std::vector<std::uint8_t> wire;
|
||||
HTTP3::WriteFrame(wire, HTTP3::kFrameHeaders, encoded.data(), encoded.size());
|
||||
if (!request.body.empty()) {
|
||||
HTTP3::WriteFrame(wire, HTTP3::kFrameData,
|
||||
reinterpret_cast<const std::uint8_t*>(request.body.data()),
|
||||
request.body.size());
|
||||
}
|
||||
|
||||
// Send the entire request and FIN our send-side. HTTP/3 servers need FIN
|
||||
// to know the request is complete — there's no Content-Length signal.
|
||||
stream.SendSync(wire.data(), static_cast<std::uint32_t>(wire.size()), /*finish=*/true);
|
||||
|
||||
auto raw = stream.RecieveUntilCloseSync();
|
||||
return ParseResponseFrames(raw);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -150,6 +150,8 @@ struct QUICStream::Impl {
|
|||
}
|
||||
};
|
||||
|
||||
QUICStream::QUICStream() = default;
|
||||
|
||||
QUICStream::QUICStream(HQUIC handle, ClientQUIC* connection)
|
||||
: handle(handle), connection(connection), impl(std::make_unique<Impl>())
|
||||
{
|
||||
|
|
@ -159,7 +161,9 @@ QUICStream::QUICStream(HQUIC handle, ClientQUIC* connection)
|
|||
}
|
||||
|
||||
QUICStream::QUICStream(QUICStream&& other) noexcept
|
||||
: handle(other.handle), connection(other.connection), impl(std::move(other.impl))
|
||||
: handle(other.handle), connection(other.connection),
|
||||
canSend(other.canSend), canReceive(other.canReceive),
|
||||
impl(std::move(other.impl))
|
||||
{
|
||||
other.handle = nullptr;
|
||||
other.connection = nullptr;
|
||||
|
|
@ -170,6 +174,8 @@ QUICStream& QUICStream::operator=(QUICStream&& other) noexcept {
|
|||
Stop();
|
||||
handle = other.handle;
|
||||
connection = other.connection;
|
||||
canSend = other.canSend;
|
||||
canReceive = other.canReceive;
|
||||
impl = std::move(other.impl);
|
||||
other.handle = nullptr;
|
||||
other.connection = nullptr;
|
||||
|
|
@ -183,12 +189,26 @@ QUICStream::~QUICStream() {
|
|||
|
||||
void QUICStream::Stop() {
|
||||
if (!handle) return;
|
||||
Runtime().api->StreamShutdown(handle, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0);
|
||||
// If the stream's SHUTDOWN_COMPLETE event has already fired, msquic has
|
||||
// internally called StreamClose for us (see Impl::Callback) and the
|
||||
// handle is no longer valid — calling StreamShutdown on it trips a
|
||||
// quic_bugcheck inside msquic. Skip in that case. This is the common
|
||||
// path for short-lived request/response streams where both peers FIN
|
||||
// before the wrapper is destroyed.
|
||||
bool alreadyClosed = false;
|
||||
if (impl) {
|
||||
std::lock_guard lk(impl->mtx);
|
||||
alreadyClosed = impl->shutdownComplete;
|
||||
}
|
||||
if (!alreadyClosed) {
|
||||
Runtime().api->StreamShutdown(handle, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0);
|
||||
}
|
||||
handle = nullptr;
|
||||
if (impl) impl->handle = nullptr;
|
||||
}
|
||||
|
||||
void QUICStream::SendSync(const void* buffer, std::uint32_t size, bool finish) {
|
||||
if (!handle) throw QUICClosedException();
|
||||
if (!handle || !canSend) throw QUICClosedException();
|
||||
auto* copy = new char[size];
|
||||
std::memcpy(copy, buffer, size);
|
||||
QUIC_BUFFER quicBuf{};
|
||||
|
|
@ -210,7 +230,7 @@ void QUICStream::SendSync(const void* buffer, std::uint32_t size, bool finish) {
|
|||
}
|
||||
|
||||
std::vector<char> QUICStream::RecieveSync() {
|
||||
if (!handle) throw QUICClosedException();
|
||||
if (!handle || !canReceive) throw QUICClosedException();
|
||||
std::unique_lock lk(impl->mtx);
|
||||
impl->cv.wait(lk, [&]{ return !impl->pending.empty() || impl->peerSendClosed || impl->shutdownComplete; });
|
||||
if (!impl->pending.empty()) {
|
||||
|
|
@ -222,7 +242,7 @@ std::vector<char> QUICStream::RecieveSync() {
|
|||
}
|
||||
|
||||
std::vector<char> QUICStream::RecieveUntilCloseSync() {
|
||||
if (!handle) throw QUICClosedException();
|
||||
if (!handle || !canReceive) throw QUICClosedException();
|
||||
std::vector<char> out;
|
||||
while (true) {
|
||||
std::unique_lock lk(impl->mtx);
|
||||
|
|
@ -237,7 +257,7 @@ std::vector<char> QUICStream::RecieveUntilCloseSync() {
|
|||
}
|
||||
|
||||
std::vector<char> QUICStream::RecieveUntilFullSync(std::uint32_t bufferSize) {
|
||||
if (!handle) throw QUICClosedException();
|
||||
if (!handle || !canReceive) throw QUICClosedException();
|
||||
std::vector<char> out;
|
||||
out.reserve(bufferSize);
|
||||
while (out.size() < bufferSize) {
|
||||
|
|
@ -285,6 +305,12 @@ struct ClientQUIC::Impl {
|
|||
std::function<void(QUICStream)> onStream;
|
||||
std::function<void(std::vector<char>)> onDatagram;
|
||||
std::deque<std::vector<char>> datagramQueue;
|
||||
// Streams the peer started before the user installed an OnStream
|
||||
// handler. Without this backlog the early streams (e.g. an h3 server's
|
||||
// control stream right after handshake) would be aborted in the
|
||||
// PEER_STREAM_STARTED branch and the connection would die with
|
||||
// H3_MISSING_SETTINGS on the peer side.
|
||||
std::deque<QUICStream> pendingStreams;
|
||||
|
||||
ClientQUIC* outer = nullptr;
|
||||
|
||||
|
|
@ -325,18 +351,30 @@ struct ClientQUIC::Impl {
|
|||
}
|
||||
case QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED: {
|
||||
HQUIC streamHandle = ev->PEER_STREAM_STARTED.Stream;
|
||||
bool unidirectional = (ev->PEER_STREAM_STARTED.Flags
|
||||
& QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL) != 0;
|
||||
QUICStream stream(streamHandle, self->outer);
|
||||
if (self->onStream) {
|
||||
auto cb = self->onStream;
|
||||
auto* shared = new QUICStream(std::move(stream));
|
||||
ThreadPool::Enqueue([cb, shared]{
|
||||
cb(std::move(*shared));
|
||||
delete shared;
|
||||
});
|
||||
} else {
|
||||
// No handler: shut down to avoid leaking a stream.
|
||||
Runtime().api->StreamShutdown(streamHandle, QUIC_STREAM_SHUTDOWN_FLAG_ABORT, 0);
|
||||
if (unidirectional) {
|
||||
// Peer-initiated unidi: peer sends, we read; we cannot send.
|
||||
stream.canSend = false;
|
||||
stream.canReceive = true;
|
||||
}
|
||||
std::function<void(QUICStream)> cb;
|
||||
{
|
||||
std::lock_guard lk(self->mtx);
|
||||
cb = self->onStream;
|
||||
if (!cb) {
|
||||
// Buffer until OnStream is installed; OnStream's
|
||||
// setter drains this queue.
|
||||
self->pendingStreams.push_back(std::move(stream));
|
||||
return QUIC_STATUS_SUCCESS;
|
||||
}
|
||||
}
|
||||
auto* shared = new QUICStream(std::move(stream));
|
||||
ThreadPool::Enqueue([cb, shared]{
|
||||
cb(std::move(*shared));
|
||||
delete shared;
|
||||
});
|
||||
return QUIC_STATUS_SUCCESS;
|
||||
}
|
||||
case QUIC_CONNECTION_EVENT_DATAGRAM_RECEIVED: {
|
||||
|
|
@ -382,6 +420,16 @@ static HQUIC OpenClientConfiguration(const std::string& alpn, const QUICClientCr
|
|||
settings.IdleTimeoutMs = 30'000;
|
||||
settings.IsSet.DatagramReceiveEnabled = 1;
|
||||
settings.DatagramReceiveEnabled = 1;
|
||||
// Allow the server to open unidi/bidi streams to us. msquic defaults
|
||||
// both peer-stream-count limits to 0; with that, the server's HTTP/3
|
||||
// control stream + QPACK encoder/decoder streams can't be created and
|
||||
// most h3 servers will close the connection after handshake. We don't
|
||||
// currently use server push (h3 pushes ride on unidi 0x01 streams) but
|
||||
// the bidi cap is harmless to grant.
|
||||
settings.IsSet.PeerUnidiStreamCount = 1;
|
||||
settings.PeerUnidiStreamCount = 16;
|
||||
settings.IsSet.PeerBidiStreamCount = 1;
|
||||
settings.PeerBidiStreamCount = 16;
|
||||
|
||||
HQUIC cfg = nullptr;
|
||||
QUIC_STATUS s = Runtime().api->ConfigurationOpen(Runtime().registration, &alpnBuffer, 1,
|
||||
|
|
@ -470,18 +518,26 @@ void ClientQUIC::Stop() {
|
|||
Runtime().api->ConnectionShutdown(impl->connection, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0);
|
||||
}
|
||||
|
||||
QUICStream ClientQUIC::OpenStream() {
|
||||
QUICStream ClientQUIC::OpenStream(bool unidirectional) {
|
||||
HQUIC streamHandle = nullptr;
|
||||
QUICStream stream;
|
||||
stream.impl = std::make_unique<QUICStream::Impl>();
|
||||
stream.impl->connection = this;
|
||||
QUIC_STATUS s = Runtime().api->StreamOpen(impl->connection, QUIC_STREAM_OPEN_FLAG_NONE,
|
||||
QUIC_STREAM_OPEN_FLAGS openFlags = unidirectional
|
||||
? QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL
|
||||
: QUIC_STREAM_OPEN_FLAG_NONE;
|
||||
QUIC_STATUS s = Runtime().api->StreamOpen(impl->connection, openFlags,
|
||||
reinterpret_cast<QUIC_STREAM_CALLBACK_HANDLER>(&QUICStream::Impl::Callback),
|
||||
stream.impl.get(), &streamHandle);
|
||||
if (QUIC_FAILED(s)) throw QUICException(std::format("StreamOpen failed: 0x{:x}", static_cast<unsigned>(s)));
|
||||
stream.handle = streamHandle;
|
||||
stream.connection = this;
|
||||
stream.impl->handle = streamHandle;
|
||||
if (unidirectional) {
|
||||
// We initiated the unidi stream: we send, peer reads.
|
||||
stream.canSend = true;
|
||||
stream.canReceive = false;
|
||||
}
|
||||
s = Runtime().api->StreamStart(streamHandle, QUIC_STREAM_START_FLAG_NONE);
|
||||
if (QUIC_FAILED(s)) {
|
||||
Runtime().api->StreamClose(streamHandle);
|
||||
|
|
@ -510,7 +566,21 @@ void ClientQUIC::SendDatagram(const void* buffer, std::uint32_t size) {
|
|||
}
|
||||
|
||||
void ClientQUIC::OnStream(std::function<void(QUICStream)> cb) {
|
||||
impl->onStream = std::move(cb);
|
||||
std::deque<QUICStream> backlog;
|
||||
{
|
||||
std::lock_guard lk(impl->mtx);
|
||||
impl->onStream = cb;
|
||||
std::swap(backlog, impl->pendingStreams);
|
||||
}
|
||||
while (!backlog.empty()) {
|
||||
auto* shared = new QUICStream(std::move(backlog.front()));
|
||||
backlog.pop_front();
|
||||
auto handler = cb;
|
||||
ThreadPool::Enqueue([handler, shared]{
|
||||
handler(std::move(*shared));
|
||||
delete shared;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void ClientQUIC::OnDatagram(std::function<void(std::vector<char>)> cb) {
|
||||
|
|
|
|||
|
|
@ -19,224 +19,228 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|||
*/
|
||||
|
||||
module;
|
||||
|
||||
#include <stdio.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <stdlib.h>
|
||||
#include <sys/uio.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/wait.h>
|
||||
#include <strings.h>
|
||||
#include <cerrno>
|
||||
#include <cstring>
|
||||
|
||||
#include <msquic.h>
|
||||
module Crafter.Network:ListenerHTTP_impl;
|
||||
import :ListenerHTTP;
|
||||
import :ClientTCP;
|
||||
import std;
|
||||
import :ListenerQUIC;
|
||||
import :ClientQUIC;
|
||||
import :HTTP;
|
||||
import :HTTP3;
|
||||
import Crafter.Thread;
|
||||
import std;
|
||||
|
||||
using namespace Crafter;
|
||||
|
||||
ListenerHTTP::ListenerHTTP(std::uint16_t port, std::unordered_map<std::string, std::function<std::string(const HTTPRequest&)>> routes): routes(routes) {
|
||||
sockaddr_in servAddr;
|
||||
bzero((char*)&servAddr, sizeof(servAddr));
|
||||
servAddr.sin_family = AF_INET;
|
||||
servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
|
||||
servAddr.sin_port = htons(port);
|
||||
namespace {
|
||||
// Parse a complete request stream's bytes into an HTTPRequest. The stream
|
||||
// is closed by the peer with FIN, so we read until close and then
|
||||
// frame-walk the bytes (HEADERS [+ DATA]*).
|
||||
HTTPRequest ParseRequestFrames(const std::vector<char>& bytes) {
|
||||
HTTPRequest request;
|
||||
bool sawHeaders = false;
|
||||
std::size_t pos = 0;
|
||||
const auto* p = reinterpret_cast<const std::uint8_t*>(bytes.data());
|
||||
std::size_t avail = bytes.size();
|
||||
|
||||
s = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if (s < 0) {
|
||||
throw std::runtime_error("Error establishing the server socket");
|
||||
while (pos < avail) {
|
||||
std::uint64_t frameType = 0, frameLen = 0;
|
||||
std::size_t cn = 0;
|
||||
if (!HTTP3::DecodeVarint(p + pos, avail - pos, frameType, cn)) {
|
||||
throw HTTP3::HTTP3ProtocolError("truncated frame type");
|
||||
}
|
||||
pos += cn;
|
||||
if (!HTTP3::DecodeVarint(p + pos, avail - pos, frameLen, cn)) {
|
||||
throw HTTP3::HTTP3ProtocolError("truncated frame length");
|
||||
}
|
||||
pos += cn;
|
||||
if (pos + frameLen > avail) {
|
||||
throw HTTP3::HTTP3ProtocolError("frame length runs past buffer");
|
||||
}
|
||||
if (frameType == HTTP3::kFrameHeaders) {
|
||||
auto fields = HTTP3::DecodeFieldSection(p + pos, static_cast<std::size_t>(frameLen));
|
||||
if (!sawHeaders) {
|
||||
for (auto& [name, value] : fields) {
|
||||
if (name == ":method") request.method = std::move(value);
|
||||
else if (name == ":scheme") request.scheme = std::move(value);
|
||||
else if (name == ":authority") request.authority = std::move(value);
|
||||
else if (name == ":path") request.path = std::move(value);
|
||||
else if (!name.empty() && name[0] == ':') {
|
||||
// Unknown request pseudo-header — ignore.
|
||||
} else {
|
||||
request.headers.emplace(std::move(name), std::move(value));
|
||||
}
|
||||
}
|
||||
sawHeaders = true;
|
||||
}
|
||||
} else if (frameType == HTTP3::kFrameData) {
|
||||
request.body.append(reinterpret_cast<const char*>(p + pos),
|
||||
static_cast<std::size_t>(frameLen));
|
||||
} else {
|
||||
// Skip unknown frames (RFC 9114 §9 — reserved/extension frame
|
||||
// types are silently ignored).
|
||||
}
|
||||
pos += static_cast<std::size_t>(frameLen);
|
||||
}
|
||||
if (!sawHeaders) {
|
||||
throw HTTP3::HTTP3ProtocolError("request stream had no HEADERS frame");
|
||||
}
|
||||
return request;
|
||||
}
|
||||
|
||||
int opt = 1;
|
||||
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
|
||||
throw std::runtime_error("Error setting SO_REUSEADDR");
|
||||
}
|
||||
// Serialise a response into HEADERS [+ DATA] frames.
|
||||
std::vector<std::uint8_t> SerializeResponse(const HTTPResponse& response) {
|
||||
std::vector<std::pair<std::string, std::string>> fields;
|
||||
fields.reserve(1 + response.headers.size());
|
||||
fields.emplace_back(":status", response.status.empty() ? std::string("200") : response.status);
|
||||
for (const auto& [name, value] : response.headers) {
|
||||
std::string lower = name;
|
||||
std::ranges::transform(lower, lower.begin(),
|
||||
[](unsigned char c){ return static_cast<char>(std::tolower(c)); });
|
||||
fields.emplace_back(std::move(lower), value);
|
||||
}
|
||||
auto encoded = HTTP3::EncodeFieldSection(fields);
|
||||
|
||||
int bindStatus = bind(s, (struct sockaddr*)&servAddr, sizeof(servAddr));
|
||||
if (bindStatus < 0) {
|
||||
throw std::runtime_error(std::format("Error binding the server socket: {}", std::strerror(errno)));
|
||||
}
|
||||
|
||||
if (listen(s, 5) < 0) {
|
||||
throw std::runtime_error("Error starting to listen on the server socket");
|
||||
std::vector<std::uint8_t> wire;
|
||||
HTTP3::WriteFrame(wire, HTTP3::kFrameHeaders, encoded.data(), encoded.size());
|
||||
if (!response.body.empty()) {
|
||||
HTTP3::WriteFrame(wire, HTTP3::kFrameData,
|
||||
reinterpret_cast<const std::uint8_t*>(response.body.data()),
|
||||
response.body.size());
|
||||
}
|
||||
return wire;
|
||||
}
|
||||
}
|
||||
|
||||
// Per-peer state for an accepted connection. Holds the connection wrapper
|
||||
// and the server-side control stream alive for the lifetime of the peer.
|
||||
struct PeerState {
|
||||
std::unique_ptr<ClientQUIC> quic;
|
||||
QUICStream controlStream;
|
||||
};
|
||||
|
||||
struct ListenerHTTP::Impl {
|
||||
std::unique_ptr<ListenerQUIC> listener;
|
||||
std::mutex peersMtx;
|
||||
std::vector<std::unique_ptr<PeerState>> peers;
|
||||
bool running = true;
|
||||
};
|
||||
|
||||
ListenerHTTP::ListenerHTTP(std::uint16_t port,
|
||||
QUICServerCredentials creds,
|
||||
std::unordered_map<std::string, std::function<HTTPResponse(const HTTPRequest&)>> r)
|
||||
: routes(std::move(r))
|
||||
, alpn(HTTP3::kAlpn)
|
||||
, impl(std::make_unique<Impl>())
|
||||
{
|
||||
// The connect callback wires up an OnStream handler that splits unidi
|
||||
// streams (control / QPACK) from bidi streams (request streams) and
|
||||
// sends our own SETTINGS frame on a freshly-opened control stream.
|
||||
auto onConnect = [this](ClientQUIC* peer) {
|
||||
auto state = std::make_unique<PeerState>();
|
||||
state->quic.reset(peer);
|
||||
|
||||
peer->OnStream([this](QUICStream stream) {
|
||||
if (!stream.canSend) {
|
||||
// Peer-initiated unidi: client's control stream + optional
|
||||
// QPACK encoder/decoder streams. Drain — we honour SETTINGS
|
||||
// by accepting defaults, and we don't track QPACK dynamic-
|
||||
// table mutations because we don't use the dynamic table.
|
||||
try {
|
||||
while (true) (void)stream.RecieveSync();
|
||||
} catch (...) {}
|
||||
return;
|
||||
}
|
||||
|
||||
// Bidi stream: a request. Drive a single request/response cycle.
|
||||
try {
|
||||
auto raw = stream.RecieveUntilCloseSync();
|
||||
HTTPRequest request = ParseRequestFrames(raw);
|
||||
|
||||
HTTPResponse response;
|
||||
auto it = routes.find(request.path);
|
||||
if (it != routes.end()) {
|
||||
response = it->second(request);
|
||||
} else {
|
||||
response.status = "404";
|
||||
response.body = "Not Found";
|
||||
}
|
||||
|
||||
auto wire = SerializeResponse(response);
|
||||
stream.SendSync(wire.data(),
|
||||
static_cast<std::uint32_t>(wire.size()),
|
||||
/*finish=*/true);
|
||||
} catch (const std::exception& e) {
|
||||
// Best-effort 500 if we can still send. Stream may already
|
||||
// be closed; swallow further errors silently.
|
||||
try {
|
||||
HTTPResponse err;
|
||||
err.status = "500";
|
||||
err.body = e.what();
|
||||
auto wire = SerializeResponse(err);
|
||||
stream.SendSync(wire.data(),
|
||||
static_cast<std::uint32_t>(wire.size()),
|
||||
/*finish=*/true);
|
||||
} catch (...) {}
|
||||
}
|
||||
});
|
||||
|
||||
// Open our outgoing control stream and write the SETTINGS prelude.
|
||||
// Do this AFTER OnStream is registered so any client-initiated
|
||||
// unidi stream that races in is handled. The control stream must
|
||||
// remain open for the connection's lifetime — we never FIN it.
|
||||
try {
|
||||
state->controlStream = peer->OpenStream(/*unidirectional=*/true);
|
||||
auto prelude = HTTP3::BuildControlStreamPrelude();
|
||||
state->controlStream.SendSync(prelude.data(),
|
||||
static_cast<std::uint32_t>(prelude.size()),
|
||||
/*finish=*/false);
|
||||
} catch (...) {
|
||||
// If the connection died mid-handshake we land here; the peer
|
||||
// gets dropped via destruction below.
|
||||
}
|
||||
|
||||
std::lock_guard lk(impl->peersMtx);
|
||||
impl->peers.push_back(std::move(state));
|
||||
};
|
||||
|
||||
impl->listener = std::make_unique<ListenerQUIC>(port,
|
||||
std::string(HTTP3::kAlpn),
|
||||
std::move(creds),
|
||||
onConnect);
|
||||
}
|
||||
|
||||
ListenerHTTP::ListenerHTTP(ListenerHTTP&&) noexcept = default;
|
||||
|
||||
ListenerHTTP::~ListenerHTTP() {
|
||||
if(s != -1) {
|
||||
Stop();
|
||||
}
|
||||
if (impl) Stop();
|
||||
}
|
||||
|
||||
void ListenerHTTP::Stop() {
|
||||
running = false;
|
||||
shutdown(s, SHUT_RDWR);
|
||||
close(s);
|
||||
s = -1;
|
||||
for(ListenerHTTPClient* client : clients) {
|
||||
client->client.Stop();
|
||||
client->thread.join();
|
||||
delete client;
|
||||
}
|
||||
if (!impl) return;
|
||||
impl->running = false;
|
||||
if (impl->listener) impl->listener->Stop();
|
||||
}
|
||||
|
||||
void ListenerHTTP::Listen() {
|
||||
while(running) {
|
||||
sockaddr_in newSockAddr;
|
||||
socklen_t newSockAddrSize = sizeof(newSockAddr);
|
||||
int client = accept(s, (sockaddr*)&newSockAddr, &newSockAddrSize);
|
||||
if(!running) {
|
||||
return;
|
||||
}
|
||||
if (client > 0) {
|
||||
clients.push_back(new ListenerHTTPClient(this, client));
|
||||
} else {
|
||||
std::cerr << "Error accepting request from client!" << std::endl;
|
||||
}
|
||||
std::erase_if(clients, [](ListenerHTTPClient* client) {
|
||||
if (client->disconnected.load()) {
|
||||
client->thread.join();
|
||||
delete client;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
if (!impl || !impl->listener) return;
|
||||
// ListenSyncAsync runs the accept loop on this thread and dispatches the
|
||||
// per-connection callback (control-stream open + OnStream wiring) on the
|
||||
// ThreadPool. That keeps route handlers off the accept thread.
|
||||
impl->listener->ListenSyncAsync();
|
||||
}
|
||||
|
||||
ListenerHTTPClient::ListenerHTTPClient(ListenerHTTP* server, int s) : server(server), client(s), thread(&ListenerHTTPClient::ListenRoutes, this), disconnected(false) {
|
||||
|
||||
}
|
||||
|
||||
void ListenerHTTPClient::ListenRoutes() {
|
||||
try {
|
||||
while(true) {
|
||||
std::vector<char> buffer;
|
||||
HTTPRequest request;
|
||||
std::string route;
|
||||
std::uint32_t i = 0;
|
||||
std::uint32_t routeStart = 0;
|
||||
while(true) {
|
||||
buffer = client.RecieveSync();
|
||||
while(true) {
|
||||
std::string str(buffer.begin(), buffer.end());
|
||||
for(; i < buffer.size(); i++) {
|
||||
if(buffer[i] == ' ') {
|
||||
request.method.assign(buffer.data(), i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
for(; i < buffer.size(); i++) {
|
||||
if(buffer[i] == '/') {
|
||||
routeStart = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
for(; i < buffer.size(); i++) {
|
||||
if(buffer[i] == ' ') {
|
||||
route.assign(buffer.data()+routeStart, i-routeStart);
|
||||
break;
|
||||
}
|
||||
}
|
||||
for(; i < buffer.size(); i++) {
|
||||
if(buffer[i] == '\r' && buffer[i+1] == '\n') {
|
||||
break;
|
||||
}
|
||||
}
|
||||
i+=2;
|
||||
while(i < buffer.size()) {
|
||||
std::uint32_t headerStart = i;
|
||||
std::string headerName;
|
||||
for(; i < buffer.size(); i++) {
|
||||
if(buffer[i] == ':') {
|
||||
headerName.assign(buffer.data()+headerStart, i-headerStart);
|
||||
std::transform(headerName.begin(), headerName.end(), headerName.begin(), [](unsigned char c){ return std::tolower(c); });
|
||||
i++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
headerStart = i;
|
||||
std::string headerValue;
|
||||
for(; i < buffer.size(); i++) {
|
||||
if(buffer[i] == '\r' && buffer[i+1] == '\n') {
|
||||
headerValue.assign(buffer.data()+headerStart, i-headerStart);
|
||||
request.headers.insert({headerName, headerValue});
|
||||
if(buffer[i+2] == '\r'){
|
||||
goto headersComplete;
|
||||
} else{
|
||||
i+=2;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
i = 0;
|
||||
}
|
||||
headersComplete:;
|
||||
i+=4;
|
||||
std::unordered_map<std::string, std::string>::iterator it = request.headers.find("content-length");
|
||||
if(it != request.headers.end()) {
|
||||
const int lenght = std::stoi(it->second);
|
||||
request.body.resize(lenght, 0);
|
||||
if(lenght > 0 ){
|
||||
std::int_fast32_t remaining = lenght-(buffer.size()-i);
|
||||
if(remaining < 0) {
|
||||
std::memcpy(&request.body[0], buffer.data()+i, lenght);
|
||||
std::string response = server->routes.at(route)(request);
|
||||
client.Send(&response[0], response.size());
|
||||
i+=lenght;
|
||||
} else if(remaining == 0){
|
||||
std::memcpy(&request.body[0], buffer.data()+i, lenght);
|
||||
std::string response = server->routes.at(route)(request);
|
||||
client.Send(&response[0], response.size());
|
||||
break;
|
||||
} else {
|
||||
std::memcpy(&request.body[0], buffer.data()+i, buffer.size()-i);
|
||||
std::vector<char> bodyBuffer = client.RecieveUntilFullSync(remaining);
|
||||
std::memcpy(&request.body[buffer.size()-i], bodyBuffer.data(), remaining);
|
||||
std::string response = server->routes.at(route)(request);
|
||||
client.Send(&response[0], response.size());
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
std::string response = server->routes.at(route)(request);
|
||||
client.Send(&response[0], response.size());
|
||||
if(i == buffer.size()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
std::string response = server->routes.at(route)(request);
|
||||
client.Send(&response[0], response.size());
|
||||
if(i == buffer.size()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch(SocketClosedException& e) {
|
||||
disconnected.store(true);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ListenerAsyncHTTP::ListenerAsyncHTTP(std::uint16_t port, std::unordered_map<std::string, std::function<std::string(const HTTPRequest&)>> routes): listener(port, routes), thread(&ListenerHTTP::Listen, &listener) {
|
||||
|
||||
}
|
||||
ListenerAsyncHTTP::ListenerAsyncHTTP(std::uint16_t port,
|
||||
QUICServerCredentials creds,
|
||||
std::unordered_map<std::string, std::function<HTTPResponse(const HTTPRequest&)>> routes)
|
||||
: listener(port, std::move(creds), std::move(routes))
|
||||
, thread(&ListenerHTTP::Listen, &listener)
|
||||
{}
|
||||
|
||||
ListenerAsyncHTTP::~ListenerAsyncHTTP() {
|
||||
if(listener.s != -1) {
|
||||
Stop();
|
||||
}
|
||||
Stop();
|
||||
}
|
||||
|
||||
void ListenerAsyncHTTP::Stop() {
|
||||
listener.Stop();
|
||||
thread.join();
|
||||
}
|
||||
listener.Stop();
|
||||
if (thread.joinable()) thread.join();
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue