634 lines
25 KiB
C++
634 lines
25 KiB
C++
/*
|
|
Crafter®.Network
|
|
Copyright (C) 2026 Catcrafts®
|
|
Catcrafts.net
|
|
|
|
This library is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU Lesser General Public
|
|
License as published by the Free Software Foundation; either
|
|
version 3.0 of the License, or (at your option) any later version.
|
|
|
|
This library is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
Lesser General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Lesser General Public
|
|
License along with this library; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
*/
|
|
|
|
module;
|
|
#include <msquic.h>
|
|
#include <cstring>
|
|
module Crafter.Network:ClientQUIC_impl;
|
|
import :ClientQUIC;
|
|
import Crafter.Thread;
|
|
import std;
|
|
|
|
using namespace Crafter;
|
|
|
|
namespace {
|
|
// Process-wide msquic API table + registration. Initialised lazily on
|
|
// first ClientQUIC/ListenerQUIC construction; tear-down happens at
|
|
// process exit via the destructor of the static object.
|
|
struct MsQuicRuntime {
|
|
const QUIC_API_TABLE* api = nullptr;
|
|
HQUIC registration = nullptr;
|
|
std::mutex initMutex;
|
|
bool initialised = false;
|
|
|
|
void Ensure() {
|
|
std::lock_guard lock(initMutex);
|
|
if (initialised) return;
|
|
QUIC_STATUS s = MsQuicOpen2(&api);
|
|
if (QUIC_FAILED(s)) {
|
|
throw QUICException(std::format("MsQuicOpen2 failed: 0x{:x}", static_cast<unsigned>(s)));
|
|
}
|
|
QUIC_REGISTRATION_CONFIG regConfig{ "crafter.network", QUIC_EXECUTION_PROFILE_LOW_LATENCY };
|
|
s = api->RegistrationOpen(®Config, ®istration);
|
|
if (QUIC_FAILED(s)) {
|
|
MsQuicClose(api);
|
|
api = nullptr;
|
|
throw QUICException(std::format("RegistrationOpen failed: 0x{:x}", static_cast<unsigned>(s)));
|
|
}
|
|
initialised = true;
|
|
}
|
|
|
|
~MsQuicRuntime() {
|
|
if (registration) api->RegistrationClose(registration);
|
|
if (api) MsQuicClose(api);
|
|
}
|
|
};
|
|
|
|
MsQuicRuntime& Runtime() {
|
|
static MsQuicRuntime r;
|
|
r.Ensure();
|
|
return r;
|
|
}
|
|
|
|
// Encode an ALPN string into the wire format msquic expects: a length
|
|
// byte followed by the ASCII characters. Lifetime of the returned buffer
|
|
// matches the caller's storage in `out`.
|
|
QUIC_BUFFER MakeAlpn(const std::string& alpn, std::vector<std::uint8_t>& out) {
|
|
if (alpn.size() > 255) throw QUICException("ALPN string too long (max 255)");
|
|
out.assign(alpn.begin(), alpn.end());
|
|
QUIC_BUFFER b{};
|
|
b.Length = static_cast<std::uint32_t>(out.size());
|
|
b.Buffer = out.data();
|
|
return b;
|
|
}
|
|
}
|
|
|
|
// ---------------- QUICStream::Impl ----------------
|
|
struct QUICStream::Impl {
|
|
HQUIC handle = nullptr;
|
|
ClientQUIC* connection = nullptr;
|
|
|
|
std::mutex mtx;
|
|
std::condition_variable cv;
|
|
std::deque<std::vector<char>> pending;
|
|
bool peerSendClosed = false;
|
|
bool shutdownComplete = false;
|
|
bool sendInFlight = false;
|
|
|
|
static QUIC_STATUS QUIC_API Callback(HQUIC stream, void* ctx, QUIC_STREAM_EVENT* ev) {
|
|
auto* self = static_cast<Impl*>(ctx);
|
|
switch (ev->Type) {
|
|
case QUIC_STREAM_EVENT_RECEIVE: {
|
|
std::vector<char> chunk;
|
|
std::uint64_t total = 0;
|
|
for (std::uint32_t i = 0; i < ev->RECEIVE.BufferCount; ++i) {
|
|
total += ev->RECEIVE.Buffers[i].Length;
|
|
}
|
|
chunk.reserve(static_cast<std::size_t>(total));
|
|
for (std::uint32_t i = 0; i < ev->RECEIVE.BufferCount; ++i) {
|
|
const QUIC_BUFFER& b = ev->RECEIVE.Buffers[i];
|
|
chunk.insert(chunk.end(), b.Buffer, b.Buffer + b.Length);
|
|
}
|
|
{
|
|
std::lock_guard lk(self->mtx);
|
|
if (!chunk.empty()) self->pending.push_back(std::move(chunk));
|
|
}
|
|
self->cv.notify_all();
|
|
return QUIC_STATUS_SUCCESS;
|
|
}
|
|
case QUIC_STREAM_EVENT_SEND_COMPLETE: {
|
|
{
|
|
std::lock_guard lk(self->mtx);
|
|
self->sendInFlight = false;
|
|
}
|
|
if (ev->SEND_COMPLETE.ClientContext) {
|
|
delete[] static_cast<char*>(ev->SEND_COMPLETE.ClientContext);
|
|
}
|
|
self->cv.notify_all();
|
|
return QUIC_STATUS_SUCCESS;
|
|
}
|
|
case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN:
|
|
case QUIC_STREAM_EVENT_PEER_SEND_ABORTED: {
|
|
{
|
|
std::lock_guard lk(self->mtx);
|
|
self->peerSendClosed = true;
|
|
}
|
|
self->cv.notify_all();
|
|
return QUIC_STATUS_SUCCESS;
|
|
}
|
|
case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE: {
|
|
{
|
|
std::lock_guard lk(self->mtx);
|
|
self->peerSendClosed = true;
|
|
self->shutdownComplete = true;
|
|
}
|
|
self->cv.notify_all();
|
|
Runtime().api->SetCallbackHandler(stream, nullptr, nullptr);
|
|
Runtime().api->StreamClose(stream);
|
|
return QUIC_STATUS_SUCCESS;
|
|
}
|
|
default:
|
|
return QUIC_STATUS_SUCCESS;
|
|
}
|
|
}
|
|
};
|
|
|
|
QUICStream::QUICStream() = default;
|
|
|
|
QUICStream::QUICStream(HQUIC handle, ClientQUIC* connection)
|
|
: handle(handle), connection(connection), impl(std::make_unique<Impl>())
|
|
{
|
|
impl->handle = handle;
|
|
impl->connection = connection;
|
|
Runtime().api->SetCallbackHandler(handle, reinterpret_cast<void*>(&Impl::Callback), impl.get());
|
|
}
|
|
|
|
QUICStream::QUICStream(QUICStream&& other) noexcept
|
|
: handle(other.handle), connection(other.connection),
|
|
canSend(other.canSend), canReceive(other.canReceive),
|
|
impl(std::move(other.impl))
|
|
{
|
|
other.handle = nullptr;
|
|
other.connection = nullptr;
|
|
}
|
|
|
|
QUICStream& QUICStream::operator=(QUICStream&& other) noexcept {
|
|
if (this != &other) {
|
|
Stop();
|
|
handle = other.handle;
|
|
connection = other.connection;
|
|
canSend = other.canSend;
|
|
canReceive = other.canReceive;
|
|
impl = std::move(other.impl);
|
|
other.handle = nullptr;
|
|
other.connection = nullptr;
|
|
}
|
|
return *this;
|
|
}
|
|
|
|
QUICStream::~QUICStream() {
|
|
Stop();
|
|
}
|
|
|
|
void QUICStream::Stop() {
|
|
if (!handle) return;
|
|
// If the stream's SHUTDOWN_COMPLETE event has already fired, msquic has
|
|
// internally called StreamClose for us (see Impl::Callback) and the
|
|
// handle is no longer valid — calling StreamShutdown on it trips a
|
|
// quic_bugcheck inside msquic. Skip in that case. This is the common
|
|
// path for short-lived request/response streams where both peers FIN
|
|
// before the wrapper is destroyed.
|
|
bool alreadyClosed = false;
|
|
if (impl) {
|
|
std::lock_guard lk(impl->mtx);
|
|
alreadyClosed = impl->shutdownComplete;
|
|
}
|
|
if (!alreadyClosed) {
|
|
Runtime().api->StreamShutdown(handle, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0);
|
|
}
|
|
handle = nullptr;
|
|
if (impl) impl->handle = nullptr;
|
|
}
|
|
|
|
void QUICStream::SendSync(const void* buffer, std::uint32_t size, bool finish) {
|
|
if (!handle || !canSend) throw QUICClosedException();
|
|
auto* copy = new char[size];
|
|
std::memcpy(copy, buffer, size);
|
|
QUIC_BUFFER quicBuf{};
|
|
quicBuf.Buffer = reinterpret_cast<std::uint8_t*>(copy);
|
|
quicBuf.Length = size;
|
|
{
|
|
std::lock_guard lk(impl->mtx);
|
|
impl->sendInFlight = true;
|
|
}
|
|
QUIC_SEND_FLAGS flags = finish ? QUIC_SEND_FLAG_FIN : QUIC_SEND_FLAG_NONE;
|
|
QUIC_STATUS s = Runtime().api->StreamSend(handle, &quicBuf, 1, flags, copy);
|
|
if (QUIC_FAILED(s)) {
|
|
delete[] copy;
|
|
throw QUICException(std::format("StreamSend failed: 0x{:x}", static_cast<unsigned>(s)));
|
|
}
|
|
std::unique_lock lk(impl->mtx);
|
|
impl->cv.wait(lk, [&]{ return !impl->sendInFlight || impl->shutdownComplete; });
|
|
if (impl->shutdownComplete) throw QUICClosedException();
|
|
}
|
|
|
|
std::vector<char> QUICStream::RecieveSync() {
|
|
if (!handle || !canReceive) throw QUICClosedException();
|
|
std::unique_lock lk(impl->mtx);
|
|
impl->cv.wait(lk, [&]{ return !impl->pending.empty() || impl->peerSendClosed || impl->shutdownComplete; });
|
|
if (!impl->pending.empty()) {
|
|
auto out = std::move(impl->pending.front());
|
|
impl->pending.pop_front();
|
|
return out;
|
|
}
|
|
throw QUICClosedException();
|
|
}
|
|
|
|
std::vector<char> QUICStream::RecieveUntilCloseSync() {
|
|
if (!handle || !canReceive) throw QUICClosedException();
|
|
std::vector<char> out;
|
|
while (true) {
|
|
std::unique_lock lk(impl->mtx);
|
|
impl->cv.wait(lk, [&]{ return !impl->pending.empty() || impl->peerSendClosed || impl->shutdownComplete; });
|
|
while (!impl->pending.empty()) {
|
|
auto& chunk = impl->pending.front();
|
|
out.insert(out.end(), chunk.begin(), chunk.end());
|
|
impl->pending.pop_front();
|
|
}
|
|
if (impl->peerSendClosed || impl->shutdownComplete) return out;
|
|
}
|
|
}
|
|
|
|
std::vector<char> QUICStream::RecieveUntilFullSync(std::uint32_t bufferSize) {
|
|
if (!handle || !canReceive) throw QUICClosedException();
|
|
std::vector<char> out;
|
|
out.reserve(bufferSize);
|
|
while (out.size() < bufferSize) {
|
|
std::unique_lock lk(impl->mtx);
|
|
impl->cv.wait(lk, [&]{ return !impl->pending.empty() || impl->peerSendClosed || impl->shutdownComplete; });
|
|
while (!impl->pending.empty() && out.size() < bufferSize) {
|
|
auto& chunk = impl->pending.front();
|
|
std::size_t want = std::min<std::size_t>(chunk.size(), bufferSize - out.size());
|
|
out.insert(out.end(), chunk.begin(), chunk.begin() + want);
|
|
if (want == chunk.size()) {
|
|
impl->pending.pop_front();
|
|
} else {
|
|
chunk.erase(chunk.begin(), chunk.begin() + want);
|
|
}
|
|
}
|
|
if (out.size() < bufferSize && (impl->peerSendClosed || impl->shutdownComplete)) {
|
|
throw QUICClosedException();
|
|
}
|
|
}
|
|
return out;
|
|
}
|
|
|
|
void QUICStream::SendAsync(const void* buffer, std::uint32_t size, bool finish,
|
|
std::function<void()> onSent) {
|
|
// Copy now: the caller's buffer may not outlive the enqueued task.
|
|
std::vector<char> copy(static_cast<const char*>(buffer),
|
|
static_cast<const char*>(buffer) + size);
|
|
ThreadPool::Enqueue([this, copy = std::move(copy), finish, onSent = std::move(onSent)]() mutable {
|
|
try { this->SendSync(copy.data(), static_cast<std::uint32_t>(copy.size()), finish); }
|
|
catch (...) { /* swallowed — callback still fires so the caller can move on */ }
|
|
if (onSent) onSent();
|
|
});
|
|
}
|
|
|
|
void QUICStream::RecieveAsync(std::function<void(std::vector<char>)> cb) {
|
|
ThreadPool::Enqueue([this, cb]{ cb(this->RecieveSync()); });
|
|
}
|
|
void QUICStream::RecieveUntilCloseAsync(std::function<void(std::vector<char>)> cb) {
|
|
ThreadPool::Enqueue([this, cb]{ cb(this->RecieveUntilCloseSync()); });
|
|
}
|
|
void QUICStream::RecieveUntilFullAsync(std::uint32_t bufferSize, std::function<void(std::vector<char>)> cb) {
|
|
ThreadPool::Enqueue([this, bufferSize, cb]{ cb(this->RecieveUntilFullSync(bufferSize)); });
|
|
}
|
|
|
|
void QUICStream::PrependReceived(std::vector<char> bytes) {
|
|
if (bytes.empty() || !impl) return;
|
|
{
|
|
std::lock_guard lk(impl->mtx);
|
|
impl->pending.push_front(std::move(bytes));
|
|
}
|
|
impl->cv.notify_all();
|
|
}
|
|
|
|
std::uint64_t QUICStream::GetStreamId() const {
|
|
if (!handle) throw QUICException("GetStreamId: stream is not open");
|
|
QUIC_UINT62 id = 0;
|
|
std::uint32_t size = sizeof(id);
|
|
QUIC_STATUS s = Runtime().api->GetParam(handle, QUIC_PARAM_STREAM_ID, &size, &id);
|
|
if (QUIC_FAILED(s)) {
|
|
throw QUICException(std::format("GetParam(QUIC_PARAM_STREAM_ID) failed: 0x{:x}",
|
|
static_cast<unsigned>(s)));
|
|
}
|
|
return static_cast<std::uint64_t>(id);
|
|
}
|
|
|
|
// ---------------- ClientQUIC::Impl ----------------
|
|
struct ClientQUIC::Impl {
|
|
HQUIC connection = nullptr;
|
|
HQUIC configuration = nullptr;
|
|
bool ownsConfiguration = true;
|
|
|
|
std::mutex mtx;
|
|
std::condition_variable cv;
|
|
bool connected = false;
|
|
bool closed = false;
|
|
QUIC_STATUS shutdownStatus = QUIC_STATUS_SUCCESS;
|
|
|
|
std::function<void(QUICStream)> onStream;
|
|
std::function<void(std::vector<char>)> onDatagram;
|
|
std::deque<std::vector<char>> datagramQueue;
|
|
// Streams the peer started before the user installed an OnStream
|
|
// handler. Without this backlog the early streams (e.g. an h3 server's
|
|
// control stream right after handshake) would be aborted in the
|
|
// PEER_STREAM_STARTED branch and the connection would die with
|
|
// H3_MISSING_SETTINGS on the peer side.
|
|
std::deque<QUICStream> pendingStreams;
|
|
|
|
ClientQUIC* outer = nullptr;
|
|
|
|
static QUIC_STATUS QUIC_API Callback(HQUIC conn, void* ctx, QUIC_CONNECTION_EVENT* ev) {
|
|
auto* self = static_cast<Impl*>(ctx);
|
|
switch (ev->Type) {
|
|
case QUIC_CONNECTION_EVENT_CONNECTED: {
|
|
{
|
|
std::lock_guard lk(self->mtx);
|
|
self->connected = true;
|
|
}
|
|
self->cv.notify_all();
|
|
return QUIC_STATUS_SUCCESS;
|
|
}
|
|
case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT:
|
|
case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_PEER: {
|
|
{
|
|
std::lock_guard lk(self->mtx);
|
|
self->closed = true;
|
|
if (ev->Type == QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT) {
|
|
self->shutdownStatus = ev->SHUTDOWN_INITIATED_BY_TRANSPORT.Status;
|
|
}
|
|
}
|
|
self->cv.notify_all();
|
|
return QUIC_STATUS_SUCCESS;
|
|
}
|
|
case QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE: {
|
|
{
|
|
std::lock_guard lk(self->mtx);
|
|
self->closed = true;
|
|
}
|
|
self->cv.notify_all();
|
|
if (ev->SHUTDOWN_COMPLETE.AppCloseInProgress == 0) {
|
|
Runtime().api->ConnectionClose(conn);
|
|
self->connection = nullptr;
|
|
}
|
|
return QUIC_STATUS_SUCCESS;
|
|
}
|
|
case QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED: {
|
|
HQUIC streamHandle = ev->PEER_STREAM_STARTED.Stream;
|
|
bool unidirectional = (ev->PEER_STREAM_STARTED.Flags
|
|
& QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL) != 0;
|
|
QUICStream stream(streamHandle, self->outer);
|
|
if (unidirectional) {
|
|
// Peer-initiated unidi: peer sends, we read; we cannot send.
|
|
stream.canSend = false;
|
|
stream.canReceive = true;
|
|
}
|
|
std::function<void(QUICStream)> cb;
|
|
{
|
|
std::lock_guard lk(self->mtx);
|
|
cb = self->onStream;
|
|
if (!cb) {
|
|
// Buffer until OnStream is installed; OnStream's
|
|
// setter drains this queue.
|
|
self->pendingStreams.push_back(std::move(stream));
|
|
return QUIC_STATUS_SUCCESS;
|
|
}
|
|
}
|
|
auto* shared = new QUICStream(std::move(stream));
|
|
ThreadPool::Enqueue([cb, shared]{
|
|
cb(std::move(*shared));
|
|
delete shared;
|
|
});
|
|
return QUIC_STATUS_SUCCESS;
|
|
}
|
|
case QUIC_CONNECTION_EVENT_DATAGRAM_RECEIVED: {
|
|
std::vector<char> chunk(ev->DATAGRAM_RECEIVED.Buffer->Buffer,
|
|
ev->DATAGRAM_RECEIVED.Buffer->Buffer + ev->DATAGRAM_RECEIVED.Buffer->Length);
|
|
if (self->onDatagram) {
|
|
auto cb = self->onDatagram;
|
|
ThreadPool::Enqueue([cb, chunk = std::move(chunk)]() mutable { cb(std::move(chunk)); });
|
|
} else {
|
|
std::lock_guard lk(self->mtx);
|
|
self->datagramQueue.push_back(std::move(chunk));
|
|
self->cv.notify_all();
|
|
}
|
|
return QUIC_STATUS_SUCCESS;
|
|
}
|
|
case QUIC_CONNECTION_EVENT_DATAGRAM_SEND_STATE_CHANGED: {
|
|
// msquic fires this event multiple times per datagram (e.g.
|
|
// SENT -> ACKNOWLEDGED). Free the combined QUIC_BUFFER+payload
|
|
// allocation only on a terminal state — SENT and LOST_SUSPECT
|
|
// are intermediate and may be followed by another transition.
|
|
auto state = ev->DATAGRAM_SEND_STATE_CHANGED.State;
|
|
if (ev->DATAGRAM_SEND_STATE_CHANGED.ClientContext &&
|
|
(state == QUIC_DATAGRAM_SEND_LOST_DISCARDED
|
|
|| state == QUIC_DATAGRAM_SEND_ACKNOWLEDGED
|
|
|| state == QUIC_DATAGRAM_SEND_ACKNOWLEDGED_SPURIOUS
|
|
|| state == QUIC_DATAGRAM_SEND_CANCELED)) {
|
|
::operator delete(ev->DATAGRAM_SEND_STATE_CHANGED.ClientContext);
|
|
}
|
|
return QUIC_STATUS_SUCCESS;
|
|
}
|
|
default:
|
|
return QUIC_STATUS_SUCCESS;
|
|
}
|
|
}
|
|
};
|
|
|
|
static HQUIC OpenClientConfiguration(const std::string& alpn, const QUICClientCredentials& creds) {
|
|
std::vector<std::uint8_t> alpnBuf;
|
|
QUIC_BUFFER alpnBuffer = MakeAlpn(alpn, alpnBuf);
|
|
|
|
QUIC_SETTINGS settings{};
|
|
settings.IsSet.IdleTimeoutMs = 1;
|
|
settings.IdleTimeoutMs = 30'000;
|
|
settings.IsSet.DatagramReceiveEnabled = 1;
|
|
settings.DatagramReceiveEnabled = 1;
|
|
// Allow the server to open unidi/bidi streams to us. msquic defaults
|
|
// both peer-stream-count limits to 0; with that, the server's HTTP/3
|
|
// control stream + QPACK encoder/decoder streams can't be created and
|
|
// most h3 servers will close the connection after handshake. We don't
|
|
// currently use server push (h3 pushes ride on unidi 0x01 streams) but
|
|
// the bidi cap is harmless to grant.
|
|
settings.IsSet.PeerUnidiStreamCount = 1;
|
|
settings.PeerUnidiStreamCount = 16;
|
|
settings.IsSet.PeerBidiStreamCount = 1;
|
|
settings.PeerBidiStreamCount = 16;
|
|
|
|
HQUIC cfg = nullptr;
|
|
QUIC_STATUS s = Runtime().api->ConfigurationOpen(Runtime().registration, &alpnBuffer, 1,
|
|
&settings, sizeof(settings), nullptr, &cfg);
|
|
if (QUIC_FAILED(s)) throw QUICException(std::format("ConfigurationOpen failed: 0x{:x}", static_cast<unsigned>(s)));
|
|
|
|
QUIC_CREDENTIAL_CONFIG cc{};
|
|
cc.Type = QUIC_CREDENTIAL_TYPE_NONE;
|
|
cc.Flags = QUIC_CREDENTIAL_FLAG_CLIENT;
|
|
if (creds.insecureNoServerValidation) {
|
|
cc.Flags |= QUIC_CREDENTIAL_FLAG_NO_CERTIFICATE_VALIDATION;
|
|
}
|
|
s = Runtime().api->ConfigurationLoadCredential(cfg, &cc);
|
|
if (QUIC_FAILED(s)) {
|
|
Runtime().api->ConfigurationClose(cfg);
|
|
throw QUICException(std::format("ConfigurationLoadCredential failed: 0x{:x}", static_cast<unsigned>(s)));
|
|
}
|
|
return cfg;
|
|
}
|
|
|
|
ClientQUIC::ClientQUIC(const char* host, std::uint16_t port, std::string alpnIn,
|
|
QUICClientCredentials creds)
|
|
: alpn(std::move(alpnIn)), impl(std::make_unique<Impl>())
|
|
{
|
|
impl->outer = this;
|
|
impl->configuration = OpenClientConfiguration(alpn, creds);
|
|
|
|
QUIC_STATUS s = Runtime().api->ConnectionOpen(Runtime().registration,
|
|
reinterpret_cast<QUIC_CONNECTION_CALLBACK_HANDLER>(&Impl::Callback),
|
|
impl.get(), &impl->connection);
|
|
if (QUIC_FAILED(s)) {
|
|
Runtime().api->ConfigurationClose(impl->configuration);
|
|
throw QUICException(std::format("ConnectionOpen failed: 0x{:x}", static_cast<unsigned>(s)));
|
|
}
|
|
s = Runtime().api->ConnectionStart(impl->connection, impl->configuration, QUIC_ADDRESS_FAMILY_UNSPEC, host, port);
|
|
if (QUIC_FAILED(s)) {
|
|
Runtime().api->ConnectionClose(impl->connection);
|
|
Runtime().api->ConfigurationClose(impl->configuration);
|
|
throw QUICException(std::format("ConnectionStart failed: 0x{:x}", static_cast<unsigned>(s)));
|
|
}
|
|
|
|
std::unique_lock lk(impl->mtx);
|
|
impl->cv.wait(lk, [&]{ return impl->connected || impl->closed; });
|
|
if (!impl->connected) {
|
|
throw QUICException(std::format("QUIC handshake failed: 0x{:x}", static_cast<unsigned>(impl->shutdownStatus)));
|
|
}
|
|
}
|
|
|
|
ClientQUIC::ClientQUIC(std::string host, std::uint16_t port, std::string alpnIn, QUICClientCredentials creds)
|
|
: ClientQUIC(host.c_str(), port, std::move(alpnIn), std::move(creds)) {}
|
|
|
|
ClientQUIC::ClientQUIC(HQUIC connectionHandle, HQUIC serverConfiguration, std::string alpnIn)
|
|
: alpn(std::move(alpnIn)), impl(std::make_unique<Impl>())
|
|
{
|
|
impl->outer = this;
|
|
impl->connection = connectionHandle;
|
|
impl->configuration = serverConfiguration;
|
|
impl->ownsConfiguration = false;
|
|
impl->connected = true;
|
|
Runtime().api->SetCallbackHandler(connectionHandle,
|
|
reinterpret_cast<void*>(&Impl::Callback),
|
|
impl.get());
|
|
}
|
|
|
|
ClientQUIC::ClientQUIC(ClientQUIC&& other) noexcept
|
|
: alpn(std::move(other.alpn)), impl(std::move(other.impl))
|
|
{
|
|
if (impl) impl->outer = this;
|
|
}
|
|
|
|
ClientQUIC::~ClientQUIC() {
|
|
if (!impl) return;
|
|
if (impl->connection) {
|
|
Runtime().api->ConnectionShutdown(impl->connection, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0);
|
|
Runtime().api->ConnectionClose(impl->connection);
|
|
impl->connection = nullptr;
|
|
}
|
|
if (impl->configuration && impl->ownsConfiguration) {
|
|
Runtime().api->ConfigurationClose(impl->configuration);
|
|
impl->configuration = nullptr;
|
|
}
|
|
}
|
|
|
|
void ClientQUIC::Stop() {
|
|
if (!impl || !impl->connection) return;
|
|
Runtime().api->ConnectionShutdown(impl->connection, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0);
|
|
}
|
|
|
|
QUICStream ClientQUIC::OpenStream(bool unidirectional) {
|
|
HQUIC streamHandle = nullptr;
|
|
QUICStream stream;
|
|
stream.impl = std::make_unique<QUICStream::Impl>();
|
|
stream.impl->connection = this;
|
|
QUIC_STREAM_OPEN_FLAGS openFlags = unidirectional
|
|
? QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL
|
|
: QUIC_STREAM_OPEN_FLAG_NONE;
|
|
QUIC_STATUS s = Runtime().api->StreamOpen(impl->connection, openFlags,
|
|
reinterpret_cast<QUIC_STREAM_CALLBACK_HANDLER>(&QUICStream::Impl::Callback),
|
|
stream.impl.get(), &streamHandle);
|
|
if (QUIC_FAILED(s)) throw QUICException(std::format("StreamOpen failed: 0x{:x}", static_cast<unsigned>(s)));
|
|
stream.handle = streamHandle;
|
|
stream.connection = this;
|
|
stream.impl->handle = streamHandle;
|
|
if (unidirectional) {
|
|
// We initiated the unidi stream: we send, peer reads.
|
|
stream.canSend = true;
|
|
stream.canReceive = false;
|
|
}
|
|
s = Runtime().api->StreamStart(streamHandle, QUIC_STREAM_START_FLAG_NONE);
|
|
if (QUIC_FAILED(s)) {
|
|
Runtime().api->StreamClose(streamHandle);
|
|
throw QUICException(std::format("StreamStart failed: 0x{:x}", static_cast<unsigned>(s)));
|
|
}
|
|
return stream;
|
|
}
|
|
|
|
void ClientQUIC::SendDatagram(const void* buffer, std::uint32_t size) {
|
|
// msquic stores the QUIC_BUFFER pointer (not a copy) on the send queue
|
|
// and serialises async on a worker thread. Both the QUIC_BUFFER and the
|
|
// payload it points at must outlive the call until DATAGRAM_SEND_STATE
|
|
// reports a terminal state. Pack them together in a single allocation.
|
|
auto* mem = static_cast<std::uint8_t*>(::operator new(sizeof(QUIC_BUFFER) + size));
|
|
auto* hdr = reinterpret_cast<QUIC_BUFFER*>(mem);
|
|
auto* payload = mem + sizeof(QUIC_BUFFER);
|
|
std::memcpy(payload, buffer, size);
|
|
hdr->Buffer = payload;
|
|
hdr->Length = size;
|
|
QUIC_STATUS s = Runtime().api->DatagramSend(impl->connection, hdr, 1,
|
|
QUIC_SEND_FLAG_NONE, mem);
|
|
if (QUIC_FAILED(s)) {
|
|
::operator delete(mem);
|
|
throw QUICException(std::format("DatagramSend failed: 0x{:x}", static_cast<unsigned>(s)));
|
|
}
|
|
}
|
|
|
|
void ClientQUIC::OnStream(std::function<void(QUICStream)> cb) {
|
|
std::deque<QUICStream> backlog;
|
|
{
|
|
std::lock_guard lk(impl->mtx);
|
|
impl->onStream = cb;
|
|
std::swap(backlog, impl->pendingStreams);
|
|
}
|
|
while (!backlog.empty()) {
|
|
auto* shared = new QUICStream(std::move(backlog.front()));
|
|
backlog.pop_front();
|
|
auto handler = cb;
|
|
ThreadPool::Enqueue([handler, shared]{
|
|
handler(std::move(*shared));
|
|
delete shared;
|
|
});
|
|
}
|
|
}
|
|
|
|
void ClientQUIC::OnDatagram(std::function<void(std::vector<char>)> cb) {
|
|
impl->onDatagram = std::move(cb);
|
|
}
|
|
|
|
std::vector<char> ClientQUIC::RecieveDatagramSync() {
|
|
std::unique_lock lk(impl->mtx);
|
|
impl->cv.wait(lk, [&]{ return !impl->datagramQueue.empty() || impl->closed; });
|
|
if (!impl->datagramQueue.empty()) {
|
|
auto out = std::move(impl->datagramQueue.front());
|
|
impl->datagramQueue.pop_front();
|
|
return out;
|
|
}
|
|
throw QUICClosedException();
|
|
}
|
|
|
|
HQUIC ClientQUIC::GetHandle() const { return impl ? impl->connection : nullptr; }
|