Browse Source

Consolidate HTTP code, handle redirects and SSL everywhere.

default_compile_flags
vector-of-bool 4 years ago
parent
commit
a364ac6f35
14 changed files with 646 additions and 462 deletions
  1. +6
    -5
      src/dds/cli/cmd/pkg_get.cpp
  2. +0
    -1
      src/dds/cli/cmd/pkg_import.cpp
  3. +21
    -11
      src/dds/cli/cmd/pkg_repo_err_handle.cpp
  4. +0
    -204
      src/dds/http/session.cpp
  5. +0
    -145
      src/dds/http/session.hpp
  6. +0
    -8
      src/dds/http/session.test.cpp
  7. +7
    -72
      src/dds/pkg/get/http.cpp
  8. +16
    -16
      src/dds/pkg/remote.cpp
  9. +341
    -0
      src/dds/util/http/pool.cpp
  10. +145
    -0
      src/dds/util/http/pool.hpp
  11. +27
    -0
      src/dds/util/http/pool.test.cpp
  12. +17
    -0
      src/dds/util/http/request.hpp
  13. +34
    -0
      src/dds/util/http/response.cpp
  14. +32
    -0
      src/dds/util/http/response.hpp

+ 6
- 5
src/dds/cli/cmd/pkg_get.cpp View File

@@ -2,13 +2,14 @@

#include <dds/dym.hpp>
#include <dds/error/errors.hpp>
#include <dds/http/session.hpp>
#include <dds/pkg/db.hpp>
#include <dds/pkg/get/get.hpp>
#include <dds/util/http/pool.hpp>
#include <dds/util/result.hpp>

#include <boost/leaf/handle_exception.hpp>
#include <json5/parse_data.hpp>
#include <neo/url.hpp>

namespace dds::cli::cmd {

@@ -47,10 +48,10 @@ int pkg_get(const options& opts) {
url_err.what());
return 1;
},
[&](const json5::parse_error& e, dds::e_http_url bad_url) {
[&](const json5::parse_error& e, neo::url bad_url) {
dds_log(error,
"Error parsing JSON5 document package downloaded from [{}]: {}",
bad_url.value,
bad_url.to_string(),
e.what());
return 1;
},
@@ -58,10 +59,10 @@ int pkg_get(const options& opts) {
dds_log(error, "Error accessing the package database: {}", e.message);
return 1;
},
[&](dds::e_system_error_exc e, dds::e_http_connect conn) {
[&](dds::e_system_error_exc e, dds::network_origin conn) {
dds_log(error,
"Error opening connection to [{}:{}]: {}",
conn.host,
conn.hostname,
conn.port,
e.message);
return 1;

+ 0
- 1
src/dds/cli/cmd/pkg_import.cpp View File

@@ -1,6 +1,5 @@
#include "../options.hpp"

#include <dds/http/session.hpp>
#include <dds/pkg/cache.hpp>
#include <dds/sdist/dist.hpp>
#include <dds/util/result.hpp>

+ 21
- 11
src/dds/cli/cmd/pkg_repo_err_handle.cpp View File

@@ -1,12 +1,12 @@
#include "./pkg_repo_err_handle.hpp"

#include <dds/http/session.hpp>
#include <dds/util/http/pool.hpp>
#include <dds/util/log.hpp>
#include <dds/util/result.hpp>

#include <boost/leaf/handle_exception.hpp>
#include <json5/parse_data.hpp>
#include <neo/url/parse.hpp>
#include <neo/url.hpp>

int dds::cli::cmd::handle_pkg_repo_remote_errors(std::function<int()> fn) {
return boost::leaf::try_catch(
@@ -17,29 +17,39 @@ int dds::cli::cmd::handle_pkg_repo_remote_errors(std::function<int()> fn) {
dds::capture_exception();
}
},
[&](neo::url_validation_error url_err, dds::e_url_string bad_url) {
dds_log(error, "Invalid URL [{}]: {}", bad_url.value, url_err.what());
[](neo::url_validation_error url_err, neo::url bad_url) {
dds_log(error, "Invalid URL [{}]: {}", bad_url.to_string(), url_err.what());
return 1;
},
[&](const json5::parse_error& e, dds::e_http_url bad_url) {
[](dds::http_status_error err, dds::http_response_info resp, neo::url bad_url) {
dds_log(error,
"An HTTP error occured while requesting [{}]: HTTP Status {} {}",
err.what(),
bad_url.to_string(),
resp.status,
resp.status_message);
return 1;
},
[](const json5::parse_error& e, neo::url bad_url) {
dds_log(error,
"Error parsing JSON downloaded from URL [{}]: {}",
bad_url.value,
bad_url.to_string(),
e.what());
return 1;
},
[](dds::e_sqlite3_error_exc e, dds::e_url_string url) {
dds_log(error, "Error accessing remote database (From {}): {}", url.value, e.message);
[](dds::e_sqlite3_error_exc e, neo::url url) {
dds_log(error, "Error accessing remote database [{}]: {}", url.to_string(), e.message);
return 1;
},
[](dds::e_sqlite3_error_exc e) {
dds_log(error, "Unexpected database error: {}", e.message);
return 1;
},
[&](dds::e_system_error_exc e, dds::e_http_connect conn) {
[](dds::e_system_error_exc e, dds::network_origin conn) {
dds_log(error,
"Error opening connection to [{}:{}]: {}",
conn.host,
"Error communicating with [{}://{}:{}]: {}",
conn.protocol,
conn.hostname,
conn.port,
e.message);
return 1;

+ 0
- 204
src/dds/http/session.cpp View File

@@ -1,204 +0,0 @@
#include "./session.hpp"

#include <dds/error/errors.hpp>
#include <dds/util/fs.hpp>
#include <dds/util/log.hpp>
#include <dds/util/result.hpp>

#include <fmt/format.h>
#include <fmt/ostream.h>
#include <neo/event.hpp>
#include <neo/http/parse/chunked.hpp>
#include <neo/http/request.hpp>
#include <neo/http/response.hpp>
#include <neo/io/openssl/init.hpp>
#include <neo/io/stream/buffers.hpp>
#include <neo/io/stream/file.hpp>
#include <neo/string_io.hpp>

using namespace dds;

namespace {

struct simple_request {
std::string method;

std::vector<std::pair<std::string, std::string>> headers;
};

template <typename Out, typename In>
void download_into(Out&& out, In&& in, http_response_info resp) {
auto resp_te = resp.headers.find(neo::http::standard_headers::transfer_encoding);
if (resp_te) {
if (resp_te->value != "chunked") {
throw std::runtime_error(fmt::format(
"We can't yet handle HTTP responses that set Transfer-Encoding [Transfer "
"encoding is '{}']",
resp_te->value));
}
neo::http::chunked_buffers chunked_in{in};
buffer_copy(out, chunked_in);
} else {
auto clen = resp.headers.find(neo::http::standard_headers::content_length);
neo_assert(invariant, !!clen, "HTTP response has no Content-Length header??");
buffer_copy(out, in, std::stoull(clen->value));
}
}

} // namespace

http_session http_session::connect_for(const neo::url& url) {
if (!url.host) {
throw_user_error<
errc::invalid_remote_url>("URL is invalid for network connection [{}]: No host segment",
url.to_string());
}
auto sub = neo::subscribe(
[&](neo::address::ev_resolve ev) {
dds_log(trace, "Resolving '{}:{}'", ev.host, ev.service);
neo::bubble_event(ev);
},
[&](neo::socket::ev_connect ev) {
dds_log(trace, "Connecting {}", *url.host);
neo::bubble_event(ev);
},
[&](neo::ssl::ev_handshake ev) {
dds_log(trace, "TLS handshake...");
neo::bubble_event(ev);
});
if (url.scheme == "http") {
return connect(*url.host, url.port_or_default_port_or(80));
} else if (url.scheme == "https") {
return connect_ssl(*url.host, url.port_or_default_port_or(443));
} else {
throw_user_error<errc::invalid_remote_url>("URL is invalid [{}]", url.to_string());
}
}

http_session http_session::connect(const std::string& host, int port) {
DDS_E_SCOPE(e_http_connect{host, port});

auto addr = neo::address::resolve(host, std::to_string(port));
auto sock = neo::socket::open_connected(addr, neo::socket::type::stream);

return http_session{std::move(sock), fmt::format("{}:{}", host, port)};
}

http_session http_session::connect_ssl(const std::string& host, int port) {
DDS_E_SCOPE(e_http_connect{host, port});

auto addr = neo::address::resolve(host, std::to_string(port));
auto sock = neo::socket::open_connected(addr, neo::socket::type::stream);

static neo::ssl::openssl_app_init ssl_init;
static neo::ssl::context ssl_ctx{neo::ssl::protocol::tls_any, neo::ssl::role::client};

neo::stream_io_buffers sock_in{sock};
ssl_engine ssl_eng{ssl_ctx, sock_in, neo::stream_io_buffers{sock}};
ssl_eng.connect();

return http_session(std::move(sock), fmt::format("{}:{}", host, port), std::move(ssl_eng));
}

void http_session::send_head(http_request_params params) {
neo_assert_always(invariant,
_state == _state_t::ready,
"Invalid state for HTTP session to send a request head",
_state,
params.method,
params.path,
params.query);
neo::emit(ev_http_request{params});
neo::http::request_line start_line{
.method_view = params.method,
.target = neo::http::origin_form_target{
.path_view = params.path,
.query_view = params.query,
.has_query = !params.query.empty(),
.parse_tail = neo::const_buffer(),
},
.http_version = neo::http::version::v1_1,
.parse_tail = neo::const_buffer(),
};

dds_log(trace, "Send: HTTP {} to {}{}", params.method, host_string(), params.path);

auto cl_str = std::to_string(params.content_length);

// TODO: GZip downloads
std::pair<std::string_view, std::string_view> headers[] = {
{"Host", host_string()},
{"Accept", "*/*"},
{"Content-Length", cl_str},
};

_do_io(
[&](auto&& io) { neo::http::write_request(io, start_line, headers, neo::const_buffer()); });
_state = _state_t::sent_request;
}

http_response_info http_session::recv_head() {
neo_assert_always(invariant,
_state == _state_t::sent_request,
"Invalid state to receive response head",
_state);
auto r
= _do_io([&](auto&& io) { return neo::http::read_response_head<http_response_info>(io); });
dds_log(trace, "Recv: HTTP {} {}", r.status, r.status_message);
_state = _state_t::recvd_head;
neo::emit(ev_http_response_begin{r});
return r;
}

std::string http_session::request(http_request_params params) {
send_head(params);

auto resp_head = recv_head();

neo::string_dynbuf_io resp_body;
_do_io([&](auto&& io) { download_into(resp_body, io, resp_head); });
neo::emit(ev_http_response_end{resp_head});
auto body_size = resp_body.available();
auto str = std::move(resp_body.string());
str.resize(body_size);
_state = _state_t::ready;
return str;
}

void http_session::recv_body_to_file(http_response_info const& resp_head,
const std::filesystem::path& dest) {
neo_assert_always(invariant,
_state == _state_t::recvd_head,
"Invalid state to receive request body",
_state,
dest);
auto ofile = neo::file_stream::open(dest, neo::open_mode::write | neo::open_mode::create);
neo::stream_io_buffers file_out{ofile};
_do_io([&](auto&& io) { download_into(file_out, io, resp_head); });
neo::emit(ev_http_response_end{resp_head});
_state = _state_t::ready;
}

void http_session::download_file(http_request_params params, const std::filesystem::path& dest) {
send_head(params);
auto resp_head = recv_head();
if (resp_head.is_error()) {
throw_external_error<
errc::http_download_failure>("Failed to download file from {}{} to {}: HTTP {} {}",
host_string(),
params.path,
dest,
resp_head.status,
resp_head.status_message);
}

if (resp_head.is_redirect()) {
throw_external_error<errc::http_download_failure>(
"dds does not yet support HTTP redirects when downloading data. An HTTP redirect "
"was encountered when accessing {}{}: It wants to redirect to {}",
host_string(),
params.path,
resp_head.headers["Location"].value);
}
recv_body_to_file(resp_head, dest);
}

+ 0
- 145
src/dds/http/session.hpp View File

@@ -1,145 +0,0 @@
#pragma once

#include <neo/http/headers.hpp>
#include <neo/http/version.hpp>
#include <neo/io/openssl/engine.hpp>
#include <neo/io/stream/buffers.hpp>
#include <neo/io/stream/socket.hpp>
#include <neo/string_io.hpp>
#include <neo/url.hpp>

#include <filesystem>
#include <string>

namespace dds {

struct e_http_url {
std::string value;
};

struct e_http_connect {
std::string host;
int port;
};

struct http_request_params {
std::string_view method;
std::string_view path;
std::string_view query = "";
std::size_t content_length = 0;
};

struct ev_http_request {
const http_request_params& request;
};

struct http_response_info {
int status;
std::string status_message;
neo::http::version version;
neo::http::headers headers;

std::size_t head_byte_size = 0;

void throw_for_status() const;

bool is_client_error() const noexcept { return status >= 400 && status < 500; }
bool is_server_error() const noexcept { return status >= 500 && status < 600; }
bool is_error() const noexcept { return is_client_error() || is_server_error(); }
bool is_redirect() const noexcept { return status >= 300 && status < 400; }
};

struct ev_http_response_begin {
const http_response_info& response;
};

struct ev_http_response_end {
const http_response_info& response;
};

enum class http_kind {
plain,
ssl,
};

class http_session {
neo::socket _conn;

std::string _host_string;

using sock_buffers = neo::stream_io_buffers<neo::socket&>;
sock_buffers _sock_in{_conn};

using ssl_engine = neo::ssl::engine<sock_buffers&, sock_buffers>;
using ssl_buffers = neo::stream_io_buffers<ssl_engine>;
std::optional<ssl_buffers> _ssl_in;

enum _state_t { ready, sent_request, recvd_head } _state = ready;

template <typename F>
decltype(auto) _do_io(F&& fn) {
if (_ssl_in) {
return fn(*_ssl_in);
} else {
return fn(_sock_in);
}
}

void _rebind_refs() {
if (_ssl_in) {
_ssl_in->stream().rebind_input(_sock_in);
_ssl_in->stream().output().rebind_stream(_conn);
}
}

public:
explicit http_session(neo::socket s, std::string host_header)
: _conn(std::move(s))
, _host_string(std::move(host_header)) {}

explicit http_session(neo::socket s, std::string host_header, ssl_engine&& eng)
: _conn(std::move(s))
, _host_string(std::move(host_header))
, _sock_in(_conn, std::move(eng.input().io_buffers()))
, _ssl_in(std::move(eng)) {
_rebind_refs();
}

http_session(http_session&& other) noexcept
: _conn(std::move(other._conn))
, _host_string(std::move(other._host_string))
, _sock_in(_conn, std::move(other._sock_in.io_buffers()))
, _ssl_in(std::move(other._ssl_in)) {
_rebind_refs();
}

http_session& operator=(http_session&& other) noexcept {
_conn = std::move(other._conn);
_host_string = std::move(other._host_string);
_sock_in.io_buffers() = std::move(other._sock_in.io_buffers());
_ssl_in = std::move(other._ssl_in);
_rebind_refs();
return *this;
}

void send_head(http_request_params);
http_response_info recv_head();
void recv_body_to_file(http_response_info const& res_head, const std::filesystem::path& dest);

std::string_view host_string() const noexcept { return _host_string; }

static http_session connect(const std::string& host, int port);
static http_session connect_ssl(const std::string& host, int port);

static http_session connect_for(const neo::url& url);

std::string request(http_request_params);

std::string request_get(std::string_view path) {
return request({.method = "GET", .path = path});
}

void download_file(http_request_params, const std::filesystem::path& dest);
};

} // namespace dds

+ 0
- 8
src/dds/http/session.test.cpp View File

@@ -1,8 +0,0 @@
#include <dds/http/session.hpp>

#include <catch2/catch.hpp>

TEST_CASE("Create an HTTP session") {
auto sess = dds::http_session::connect("google.com", 80);
auto resp = sess.request_get("/");
}

+ 7
- 72
src/dds/pkg/get/http.cpp View File

@@ -1,86 +1,18 @@
#include "./http.hpp"

#include <dds/error/errors.hpp>
#include <dds/http/session.hpp>
#include <dds/temp.hpp>
#include <dds/util/http/pool.hpp>
#include <dds/util/log.hpp>

#include <neo/io/stream/buffers.hpp>
#include <neo/io/stream/file.hpp>
#include <neo/tar/util.hpp>
#include <neo/url.hpp>
#include <neo/url/query.hpp>

using namespace dds;

namespace {

void http_download_with_redir(neo::url url, path_ref dest) {
for (auto redir_count = 0;; ++redir_count) {
auto sess = http_session::connect_for(url);

sess.send_head({.method = "GET", .path = url.path});

auto res_head = sess.recv_head();
if (res_head.is_error()) {
dds_log(error,
"Received an HTTP {} {} for [{}]",
res_head.status,
res_head.status_message,
url.to_string());
throw_external_error<errc::http_download_failure>(
"HTTP error while downloading resource [{}]. Got: HTTP {} '{}'",
url.to_string(),
res_head.status,
res_head.status_message);
}

if (res_head.is_redirect()) {
dds_log(trace,
"Received HTTP redirect for [{}]: {} {}",
url.to_string(),
res_head.status,
res_head.status_message);
if (redir_count == 100) {
throw_external_error<errc::http_download_failure>("Too many redirects on URL");
}
auto loc = res_head.headers.find("Location");
if (!loc) {
throw_external_error<errc::http_download_failure>(
"HTTP endpoint told us to redirect without sending a 'Location' header "
"(Received "
"HTTP {} '{}')",
res_head.status,
res_head.status_message);
}
dds_log(debug,
"Redirect [{}]: {} {} to [{}]",
url.to_string(),
res_head.status,
res_head.status_message,
loc->value);
auto new_url = neo::url::try_parse(loc->value);
auto err = std::get_if<neo::url_parse_error>(&new_url);
if (err) {
throw_external_error<errc::http_download_failure>(
"Server returned an invalid URL for HTTP redirection [{}]", loc->value);
}
url = std::move(std::get<neo::url>(new_url));
continue;
}

// Not a redirect nor an error: Download the body
dds_log(trace,
"HTTP {} {} [{}]: Saving to [{}]",
res_head.status,
res_head.status_message,
url.to_string(),
dest.string());
sess.recv_body_to_file(res_head, dest);
break;
}
}

} // namespace

void http_remote_listing::pull_source(path_ref dest) const {
neo::url url;
try {
@@ -115,7 +47,10 @@ void http_remote_listing::pull_source(path_ref dest) const {
auto dl_path = tdir.path() / fname;
fs::create_directory(dl_path.parent_path());

http_download_with_redir(url, dl_path);
http_pool pool;
auto [client, resp] = pool.request_with_redirects("GET", url);
auto dl_file = neo::file_stream::open(dl_path, neo::open_mode::write);
client.recv_body_into(resp, neo::stream_io_buffers{dl_file});

neo_assert(invariant,
fs::is_regular_file(dl_path),

+ 16
- 16
src/dds/pkg/remote.cpp View File

@@ -1,12 +1,14 @@
#include "./remote.hpp"

#include <dds/error/errors.hpp>
#include <dds/http/session.hpp>
#include <dds/temp.hpp>
#include <dds/util/http/pool.hpp>
#include <dds/util/log.hpp>
#include <dds/util/result.hpp>

#include <neo/event.hpp>
#include <neo/io/stream/buffers.hpp>
#include <neo/io/stream/file.hpp>
#include <neo/scope.hpp>
#include <neo/sqlite3/exec.hpp>
#include <neo/sqlite3/iter_tuples.hpp>
@@ -26,21 +28,14 @@ struct remote_db {
nsql::database db;

static remote_db download_and_open(neo::url const& url) {
neo_assert(expects,
url.host.has_value(),
"URL does not have a hostname??",
url.to_string());
auto sess = http_session::connect_for(url);

auto tempdir = temporary_dir::create();
auto repo_db_dl = tempdir.path() / "repo.db";
http_pool pool;

auto [client, resp] = pool.request_with_redirects("GET", url);
auto tempdir = temporary_dir::create();
auto repo_db_dl = tempdir.path() / "repo.db";
fs::create_directories(tempdir.path());
sess.download_file(
{
.method = "GET",
.path = url.path,
},
repo_db_dl);
auto outfile = neo::file_stream::open(repo_db_dl, neo::open_mode::write);
client.recv_body_into(resp, neo::stream_io_buffers(outfile));

auto db = nsql::open(repo_db_dl.string());
return {tempdir, std::move(db)};
@@ -97,15 +92,18 @@ void pkg_remote::update_pkg_db(nsql::database_ref db) {
auto [remote_id] = nsql::unpack_single<std::int64_t>(rid_st);
rid_st.reset();

dds_log(trace, "Attaching downloaded database");
nsql::exec(db.prepare("ATTACH DATABASE ? AS remote"), db_path.string());
neo_defer { db.exec("DETACH DATABASE remote"); };
nsql::transaction_guard tr{db};
dds_log(trace, "Clearing prior contents");
nsql::exec( //
db.prepare(R"(
DELETE FROM dds_cat_pkgs
WHERE remote_id = ?
)"),
remote_id);
dds_log(trace, "Importing packages");
nsql::exec( //
db.prepare(R"(
INSERT INTO dds_cat_pkgs
@@ -128,6 +126,7 @@ void pkg_remote::update_pkg_db(nsql::database_ref db) {
)"),
remote_id,
base_url_str);
dds_log(trace, "Importing dependencies");
db.exec(R"(
INSERT OR REPLACE INTO dds_cat_pkg_deps (pkg_id, dep_name, low, high)
SELECT
@@ -140,7 +139,8 @@ void pkg_remote::update_pkg_db(nsql::database_ref db) {
dds_cat_pkgs AS local_pkgs USING(name, version)
)");
// Validate our database
auto fk_check = db.prepare("PRAGMA foreign_key_check");
dds_log(trace, "Running integrity check");
auto fk_check = db.prepare("PRAGMA foreign_key_check");
auto rows = nsql::iter_tuples<std::string, std::int64_t, std::string, std::string>(fk_check);
bool any_failed = false;
for (auto [child_table, rowid, parent_table, failed_idx] : rows) {

+ 341
- 0
src/dds/util/http/pool.cpp View File

@@ -0,0 +1,341 @@
#include "./pool.hpp"

#include <dds/error/errors.hpp>
#include <dds/util/result.hpp>

#include <boost/leaf/exception.hpp>
#include <fmt/format.h>
#include <neo/gzip_io.hpp>
#include <neo/http/parse/chunked.hpp>
#include <neo/http/request.hpp>
#include <neo/http/response.hpp>
#include <neo/io/openssl/engine.hpp>
#include <neo/io/stream/buffers.hpp>
#include <neo/io/stream/socket.hpp>

#include <map>

namespace dds::detail {

struct http_client_impl {
network_origin origin;
explicit http_client_impl(network_origin o)
: origin(std::move(o)) {}

enum class _state_t {
ready,
sent_req_head,
sent_req_body,
recvd_resp_head,
};

_state_t _state = _state_t::ready;

neo::socket _conn;

std::string _host_string;

using sock_buffers = neo::stream_io_buffers<neo::socket&>;
sock_buffers _sock_in{_conn};

using ssl_engine = neo::ssl::engine<sock_buffers&, sock_buffers>;
using ssl_buffers = neo::stream_io_buffers<ssl_engine>;
std::optional<ssl_buffers> _ssl_in;

template <typename Fun>
auto _do_io(Fun&& fn) {
if (_ssl_in.has_value()) {
return fn(*_ssl_in);
} else {
return fn(_sock_in);
}
}

void connect() {
DDS_E_SCOPE(origin);
auto addr = neo::address::resolve(origin.hostname, std::to_string(origin.port));
auto sock = neo::socket::open_connected(addr, neo::socket::type::stream);

_conn = std::move(sock);

if (origin.protocol == "https") {
static neo::ssl::openssl_app_init ssl_init;
static neo::ssl::context ssl_ctx{neo::ssl::protocol::tls_any, neo::ssl::role::client};
_ssl_in.emplace(ssl_engine{ssl_ctx, _sock_in, neo::stream_io_buffers{_conn}});
_ssl_in->stream().connect();
} else if (origin.protocol == "http") {
// Plain HTTP, nothing special to do
} else {
throw_user_error<errc::invalid_remote_url>("Unknown protocol: {}", origin.protocol);
}
}

void send_head(const http_request_params& params) {
neo_assert(invariant,
_state == _state_t::ready,
"Invalid state for http_client::send_head()",
int(_state),
params.method,
params.path,
params.query,
origin.hostname,
origin.protocol,
origin.port);

neo::http::request_line start_line{
.method_view = params.method,
.target = neo::http::origin_form_target{
.path_view = params.path,
.query_view = params.query,
.has_query = !params.query.empty(),
.parse_tail = {},
},
.http_version = neo::http::version::v1_1,
.parse_tail = {},
};

auto content_len_str = std::to_string(params.content_length);
auto hostname_port = fmt::format("{}:{}", origin.hostname, origin.port);

std::pair<std::string_view, std::string_view> headers[] = {
{"Host", hostname_port},
{"Accept", "*/*"},
{"Content-Length", content_len_str},
{"TE", "gzip, chunked, plain"},
{"Connection", "keep-alive"},
};

_do_io([&](auto&& sink) {
neo::http::write_request(sink, start_line, headers, neo::const_buffer());
});
_state = _state_t::sent_req_head;
if (params.content_length == 0) {
_state = _state_t::sent_req_body;
}
}

http_response_info recv_head() {
neo_assert(invariant,
_state == _state_t::sent_req_body,
"Invalid state for http_client::recv_head()",
int(_state),
origin.hostname,
origin.protocol,
origin.port);
auto r = _do_io([&](auto&& source) {
return neo::http::read_response_head<http_response_info>(source);
});
_state = _state_t::recvd_resp_head;
auto clen_hdr = r.headers.find(neo::http::standard_headers::content_length);
if (clen_hdr && clen_hdr->value == "0") {
_state = _state_t::ready;
}
return r;
}
};

struct origin_order {
bool operator()(const network_origin& left, const network_origin& right) const noexcept {
return std::tie(left.protocol, left.hostname, left.port)
< std::tie(right.protocol, right.hostname, right.port);
}
};

struct http_pool_impl {
std::multimap<network_origin, std::shared_ptr<http_client_impl>, origin_order> _clients;
};

} // namespace dds::detail

using namespace dds;

http_pool::~http_pool() = default;

http_pool::http_pool()
: _impl(new detail::http_pool_impl) {}

http_client::~http_client() {
// When the http_client is dropped, return its impl back to the connection pool for this origin
auto pool = _pool.lock();
if (pool && _impl) {
pool->_clients.emplace(_impl->origin, _impl);
}
}

network_origin network_origin::for_url(neo::url_view url) noexcept {
auto proto = url.scheme;
auto host = url.host.value_or("");
auto port = url.port.value_or(proto == "https" ? 443 : 80);
return {std::string(proto), std::string(host), port};
}

network_origin network_origin::for_url(neo::url const& url) noexcept {
auto proto = url.scheme;
auto host = url.host.value_or("");
auto port = url.port.value_or(proto == "https" ? 443 : 80);
return {std::string(proto), std::string(host), port};
}

http_client http_pool::client_for_origin(const network_origin& origin) {
auto iter = _impl->_clients.find(origin);
http_client ret;
ret._pool = _impl;
if (iter == _impl->_clients.end()) {
// Nothing for this origin yet
auto ptr = std::make_shared<detail::http_client_impl>(origin);
ptr->connect();
ret._impl = ptr;
} else {
ret._impl = iter->second;
_impl->_clients.erase(iter);
}
return ret;
}

void http_client::send_head(const http_request_params& params) { _impl->send_head(params); }
http_response_info http_client::recv_head() { return _impl->recv_head(); }

void http_client::_send_buf(neo::const_buffer cbuf) {
_impl->_do_io([&](auto&& sink) { buffer_copy(sink, cbuf); });
}

namespace {

struct recv_none_state : erased_message_body {
neo::const_buffer next(std::size_t) override { return {}; }
void consume(std::size_t) override {}
};

template <typename Stream>
struct recv_chunked_state : erased_message_body {
Stream& _strm;
neo::http::chunked_buffers<Stream&> _chunked{_strm};

explicit recv_chunked_state(Stream& s)
: _strm(s) {}

neo::const_buffer next(std::size_t n) override { return _chunked.next(n); }
void consume(std::size_t n) override { _chunked.consume(n); }
};

template <typename Stream>
struct recv_gzip_state : erased_message_body {
Stream& _strm;
neo::gzip_source<Stream&> _gzip{_strm};

explicit recv_gzip_state(Stream& s)
: _strm(s) {}

neo::const_buffer next(std::size_t n) override { return _gzip.next(n); }
void consume(std::size_t n) override { _gzip.consume(n); }
};

template <typename Stream>
struct recv_plain_state : erased_message_body {
Stream& _strm;
std::size_t _size;
client_impl_ptr _client;

explicit recv_plain_state(Stream& s, std::size_t size)
: _strm(s)
, _size(size) {}

neo::const_buffer next(std::size_t n) override { return _strm.next((std::min)(n, _size)); }
void consume(std::size_t n) override {
_size -= n;
return _strm.consume(n);
}
};

} // namespace

std::unique_ptr<erased_message_body> http_client::_make_body_reader(const http_response_info& res) {
neo_assert(
expects,
_impl->_state == detail::http_client_impl::_state_t::recvd_resp_head,
"Invalid state to ready HTTP response body. Have not yet received the response header",
int(_impl->_state),
_impl->origin.protocol,
_impl->origin.hostname,
_impl->origin.port);
if (res.status < 200 || res.status == 204 || res.status == 304) {
return std::make_unique<recv_none_state>();
}
return _impl->_do_io([&](auto&& source) -> std::unique_ptr<erased_message_body> {
using source_type = decltype(source);
if (res.content_length() == 0) {
return std::make_unique<recv_none_state>();
} else if (res.transfer_encoding() == "chunked") {
return std::make_unique<recv_chunked_state<source_type>>(source);
} else if (res.transfer_encoding() == "gzip") {
return std::make_unique<recv_gzip_state<source_type>>(source);
} else if (!res.transfer_encoding().has_value() && res.content_length() > 0) {
return std::make_unique<recv_plain_state<source_type>>(source, *res.content_length());
} else {
neo_assert(invariant,
false,
"Unimplemented",
res.transfer_encoding().value_or("[null]"));
}
});
}

void http_client::discard_body(const http_response_info& resp) {
auto reader_ = _make_body_reader(resp);
auto& reader = *reader_;
while (true) {
auto part = reader.next(1024);
reader.consume(neo::buffer_size(part));
if (neo::buffer_is_empty(part)) {
break;
}
}
_set_ready();
}

void http_client::_set_ready() noexcept {
_impl->_state = detail::http_client_impl::_state_t::ready;
}

std::pair<http_client, http_response_info>
http_pool::request_with_redirects(std::string_view method, const neo::url& url_) {
auto url = url_;
DDS_E_SCOPE(url);
for (auto i = 0; i <= 100; ++i) {
auto origin = network_origin::for_url(url);
auto client = client_for_origin(origin);

http_request_params params{
.method = method,
.path = url.path,
.query = url.query.value_or(""),
};
client.send_head(params);
auto resp = client.recv_head();
DDS_E_SCOPE(resp);

if (resp.is_error()) {
client.discard_body(resp);
throw boost::leaf::exception(http_status_error("Received an error from HTTP"));
}

if (resp.is_redirect()) {
client.discard_body(resp);
if (i == 100) {
throw boost::leaf::exception(
http_server_error("Encountered over 100 HTTP redirects. Request aborted."));
}
auto loc = resp.headers.find("Location");
if (!loc) {
throw boost::leaf::exception(
http_server_error("Server sent an invalid response of a 30x redirect without a "
"'Location' header"));
}
url = neo::url::parse(loc->value);
continue;
}

return {std::move(client), std::move(resp)};
}
neo::unreachable();
}

+ 145
- 0
src/dds/util/http/pool.hpp View File

@@ -0,0 +1,145 @@
#pragma once

#include "./request.hpp"
#include "./response.hpp"

#include <neo/buffer_algorithm/copy.hpp>
#include <neo/buffer_sink.hpp>
#include <neo/buffer_source.hpp>
#include <neo/url.hpp>
#include <neo/url/view.hpp>
#include <neo/utility.hpp>

#include <memory>

namespace dds {

namespace detail {

struct http_pool_access_impl;
struct http_pool_impl;

struct http_client_impl;

} // namespace detail

struct erased_message_body {
virtual ~erased_message_body() = default;
virtual neo::const_buffer next(std::size_t n) = 0;
virtual void consume(std::size_t n) = 0;
};

class http_status_error : public std::runtime_error {
using runtime_error::runtime_error;
};

class http_server_error : public std::runtime_error {
using runtime_error::runtime_error;
};

struct network_origin {
std::string protocol;
std::string hostname;
int port = 0;

static network_origin for_url(neo::url_view url) noexcept;
static network_origin for_url(const neo::url& url) noexcept;
};

class http_client {
friend class http_pool;

std::weak_ptr<detail::http_pool_impl> _pool;
std::shared_ptr<detail::http_client_impl> _impl;

http_client() = default;

void _send_buf(neo::const_buffer);

std::unique_ptr<erased_message_body> _make_body_reader(const http_response_info&);
void _set_ready() noexcept;

public:
http_client(http_client&& o)
: _pool(neo::take(o._pool))
, _impl(neo::take(o._impl)) {}
~http_client();

void send_head(http_request_params const& params);

http_response_info recv_head();

template <neo::buffer_input Body>
void send_body(Body&& body) {
if constexpr (neo::single_buffer<Body>) {
_send_buf(body);
} else if constexpr (neo::buffer_range<Body>) {
neo::buffers_consumer cons{body};
send_body(cons);
} else {
while (true) {
auto part = body.next(1024);
if (neo::buffer_is_empty(part)) {
break;
}
send_body(part);
body.consume(neo::buffer_size(part));
}
}
}

template <neo::buffer_output Out>
void recv_body_into(const http_response_info& resp, Out&& out) {
auto&& sink = neo::ensure_buffer_sink(out);
auto state = _make_body_reader(resp);
neo::buffer_copy(sink, *state);
_set_ready();
}

void discard_body(const http_response_info&);
};

class http_pool {
friend class http_client;
std::shared_ptr<detail::http_pool_impl> _impl;

public:
http_pool();
http_pool(http_pool&&) = default;
http_pool& operator=(http_pool&&) = default;
~http_pool();

http_client client_for_origin(const network_origin&);

http_response_info request(neo::url_view url) { return request(url, neo::mutable_buffer()); }

template <neo::buffer_output Output>
http_response_info request(neo::url_view url, Output&& out) {
return request(url, neo::const_buffer(), out);
}

template <neo::buffer_input In, neo::buffer_output Out>
http_response_info request(neo::url_view url, In&& in, Out&& out) {
auto origin = network_origin::for_url(url);
auto size = neo::buffer_size(in);
auto client = client_for_origin(origin);
client.send_head(http_request_params{
.method = "GET",
.path = url.path.empty() ? "/" : url.path,
.query = url.query.value_or(""),
.content_length = size,
});
client.send_body(in);
auto resp = client.recv_head();
client.recv_body_into(resp, out);
return resp;
}

std::pair<http_client, http_response_info>
request_with_redirects(http_client& cl, const http_request_params& params);

std::pair<http_client, http_response_info> request_with_redirects(std::string_view method,
const neo::url& url);
};

} // namespace dds

+ 27
- 0
src/dds/util/http/pool.test.cpp View File

@@ -0,0 +1,27 @@
#include "./pool.hpp"

#include <neo/string_io.hpp>
#include <neo/url.hpp>

#include <catch2/catch.hpp>

TEST_CASE("Create an empty pool") { dds::http_pool pool; }

TEST_CASE("Connect to a remote") {
dds::http_pool pool;
// auto client = pool.access();
auto cl = pool.client_for_origin({"https", "www.google.com", 443});
cl.send_head({.method = "GET", .path = "/"});
// cl.send_head({.method = "GET", .path = "/"});
auto resp = cl.recv_head();
CHECK(resp.status == 200);
CHECK(resp.status_message == "OK");
}

TEST_CASE("Issue a request on a pool") {
dds::http_pool pool;
neo::string_dynbuf_io body;
auto resp = pool.request(neo::url_view::split("https://www.google.com"), body);
CHECK(resp.status == 200);
CHECK(body.read_area_view().size() > 5);
}

+ 17
- 0
src/dds/util/http/request.hpp View File

@@ -0,0 +1,17 @@
#pragma once

#include <string_view>

#include <neo/http/headers.hpp>

namespace dds {

struct http_request_params {
std::string_view method;
std::string_view path;
std::string_view query = "";
std::size_t content_length = 0;
neo::http::headers headers{};
};

} // namespace dds

+ 34
- 0
src/dds/util/http/response.cpp View File

@@ -0,0 +1,34 @@
#include "./response.hpp"

#include <dds/util/log.hpp>

#include <neo/http/parse/header.hpp>

#include <charconv>

using namespace dds;

std::optional<int> http_response_info::content_length() const noexcept {
auto cl_str = header_value("Content-Length");
if (!cl_str) {
return {};
}
int clen = 0;
auto conv_res = std::from_chars(cl_str->data(), cl_str->data() + cl_str->size(), clen);
if (conv_res.ec != std::errc{}) {
dds_log(warn,
"The HTTP server returned a non-integral 'Content-Length' header: '{}'. We'll "
"pretend that there is no 'Content-Length' on this message.",
*cl_str);
return {};
}
return clen;
}

std::optional<std::string_view> http_response_info::header_value(std::string_view key) const noexcept {
auto hdr = headers.find(key);
if (!hdr) {
return {};
}
return hdr->value;
}

+ 32
- 0
src/dds/util/http/response.hpp View File

@@ -0,0 +1,32 @@
#pragma once

#include <neo/http/headers.hpp>
#include <neo/http/version.hpp>

#include <string>

namespace dds {

struct http_response_info {
int status;
std::string status_message;
neo::http::version version;
neo::http::headers headers;

std::size_t head_byte_size = 0;

void throw_for_status() const;

bool is_client_error() const noexcept { return status >= 400 && status < 500; }
bool is_server_error() const noexcept { return status >= 500 && status < 600; }
bool is_error() const noexcept { return is_client_error() || is_server_error(); }
bool is_redirect() const noexcept { return status >= 300 && status < 400; }

std::optional<std::string_view> header_value(std::string_view key) const noexcept;
std::optional<int> content_length() const noexcept;

auto location() const noexcept { return header_value("Location"); }
auto transfer_encoding() const noexcept { return header_value("Transfer-Encoding"); }
};

} // namespace dds

Loading…
Cancel
Save