Explorar el Código

Fix various bugs and crashes with HTTP client in the unhappy-path. Implement DB caching.

default_compile_flags
vector-of-bool hace 3 años
padre
commit
bc9d2d2fc8
Se han modificado 10 ficheros con 218 adiciones y 104 borrados
  1. +3
    -2
      src/dds/pkg/db.cpp
  2. +1
    -1
      src/dds/pkg/get/http.cpp
  3. +61
    -29
      src/dds/pkg/remote.cpp
  4. +3
    -1
      src/dds/pkg/remote.hpp
  5. +112
    -33
      src/dds/util/http/pool.cpp
  6. +16
    -26
      src/dds/util/http/pool.hpp
  7. +4
    -6
      src/dds/util/http/pool.test.cpp
  8. +8
    -5
      src/dds/util/http/request.hpp
  9. +3
    -0
      src/dds/util/http/response.hpp
  10. +7
    -1
      src/dds/util/signal.cpp

+ 3
- 2
src/dds/pkg/db.cpp Ver fichero

@@ -81,8 +81,9 @@ void migrate_repodb_3(nsql::database& db) {
CREATE TABLE dds_cat_remotes (
remote_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
gen_ident TEXT NOT NULL,
remote_url TEXT NOT NULL
remote_url TEXT NOT NULL,
db_etag TEXT,
db_mtime TEXT
);

CREATE TABLE dds_cat_pkgs_new (

+ 1
- 1
src/dds/pkg/get/http.cpp Ver fichero

@@ -48,7 +48,7 @@ void http_remote_listing::pull_source(path_ref dest) const {
fs::create_directory(dl_path.parent_path());

http_pool pool;
auto [client, resp] = pool.request_with_redirects("GET", url);
auto [client, resp] = pool.request(url);
auto dl_file = neo::file_stream::open(dl_path, neo::open_mode::write);
client.recv_body_into(resp, neo::stream_io_buffers{dl_file});


+ 61
- 29
src/dds/pkg/remote.cpp Ver fichero

@@ -27,12 +27,9 @@ struct remote_db {
temporary_dir _tempdir;
nsql::database db;

static remote_db download_and_open(neo::url const& url) {
http_pool pool;

auto [client, resp] = pool.request_with_redirects("GET", url);
auto tempdir = temporary_dir::create();
auto repo_db_dl = tempdir.path() / "repo.db";
static remote_db download_and_open(http_client& client, const http_response_info& resp) {
auto tempdir = temporary_dir::create();
auto repo_db_dl = tempdir.path() / "repo.db";
fs::create_directories(tempdir.path());
auto outfile = neo::file_stream::open(repo_db_dl, neo::open_mode::write);
client.recv_body_into(resp, neo::stream_io_buffers(outfile));
@@ -40,16 +37,6 @@ struct remote_db {
auto db = nsql::open(repo_db_dl.string());
return {tempdir, std::move(db)};
}

static remote_db download_and_open_for_base(neo::url url) {
auto repo_url = url;
repo_url.path = fs::path(url.path).append("repo.db").generic_string();
return download_and_open(repo_url);
}

static remote_db download_and_open_for_base(std::string_view url_str) {
return download_and_open_for_base(neo::url::parse(url_str));
}
};

} // namespace
@@ -58,7 +45,15 @@ pkg_remote pkg_remote::connect(std::string_view url_str) {
DDS_E_SCOPE(e_url_string{std::string(url_str)});
const auto url = neo::url::parse(url_str);

auto db = remote_db::download_and_open_for_base(url);
auto& pool = http_pool::global_pool();
auto db_url = url;
while (db_url.path.ends_with("/"))
db_url.path.pop_back();
auto full_path = fmt::format("{}/{}", db_url.path, "repo.db");
db_url.path = full_path;
auto [client, resp] = pool.request(db_url, http_request_params{.method = "GET"});
auto db = remote_db::download_and_open(client, resp);

auto name_st = db.db.prepare("SELECT name FROM dds_repo_meta");
auto [name] = nsql::unpack_single<std::string>(name_st);

@@ -67,18 +62,39 @@ pkg_remote pkg_remote::connect(std::string_view url_str) {

void pkg_remote::store(nsql::database_ref db) {
auto st = db.prepare(R"(
INSERT INTO dds_cat_remotes (name, gen_ident, remote_url)
VALUES (?, ?, ?)
INSERT INTO dds_cat_remotes (name, remote_url)
VALUES (?, ?)
ON CONFLICT (name) DO
UPDATE SET gen_ident = ?2, remote_url = ?3
UPDATE SET remote_url = ?2
)");
nsql::exec(st, _name, "[placeholder]", _base_url.to_string());
nsql::exec(st, _name, _base_url.to_string());
}

void pkg_remote::update_pkg_db(nsql::database_ref db) {
void pkg_remote::update_pkg_db(nsql::database_ref db,
std::optional<std::string_view> etag,
std::optional<std::string_view> db_mtime) {
dds_log(info, "Pulling repository contents for {} [{}]", _name, _base_url.to_string());

auto rdb = remote_db::download_and_open_for_base(_base_url);
auto& pool = http_pool::global_pool();
auto url = _base_url;
while (url.path.ends_with("/"))
url.path.pop_back();
auto full_path = fmt::format("{}/{}", url.path, "repo.db");
url.path = full_path;
auto [client, resp] = pool.request(url,
http_request_params{
.method = "GET",
.prior_etag = etag.value_or(""),
.last_modified = db_mtime.value_or(""),
});
if (resp.not_modified()) {
// Cache hit
dds_log(info, "Package database {} is up-to-date", _name);
client.discard_body(resp);
return;
}

auto rdb = remote_db::download_and_open(client, resp);

auto base_url_str = _base_url.to_string();
while (base_url_str.ends_with("/")) {
@@ -140,7 +156,7 @@ void pkg_remote::update_pkg_db(nsql::database_ref db) {
)");
// Validate our database
dds_log(trace, "Running integrity check");
auto fk_check = db.prepare("PRAGMA foreign_key_check");
auto fk_check = db.prepare("PRAGMA foreign_key_check");
auto rows = nsql::iter_tuples<std::string, std::int64_t, std::string, std::string>(fk_check);
bool any_failed = false;
for (auto [child_table, rowid, parent_table, failed_idx] : rows) {
@@ -165,17 +181,33 @@ void pkg_remote::update_pkg_db(nsql::database_ref db) {
throw_external_error<errc::corrupted_catalog_db>(
"Database update failed due to data integrity errors");
}

// Save the cache info for the remote
if (auto new_etag = resp.etag()) {
nsql::exec(db.prepare("UPDATE dds_cat_remotes SET db_etag = ? WHERE name = ?"),
*new_etag,
_name);
}
if (auto mtime = resp.last_modified()) {
nsql::exec(db.prepare("UPDATE dds_cat_remotes SET db_mtime = ? WHERE name = ?"),
*mtime,
_name);
}
}

void dds::update_all_remotes(nsql::database_ref db) {
dds_log(info, "Updating catalog from all remotes");
auto repos_st = db.prepare("SELECT name, remote_url FROM dds_cat_remotes");
auto tups = nsql::iter_tuples<std::string, std::string>(repos_st) | ranges::to_vector;

for (const auto& [name, remote_url] : tups) {
auto repos_st = db.prepare("SELECT name, remote_url, db_etag, db_mtime FROM dds_cat_remotes");
auto tups = nsql::iter_tuples<std::string,
std::string,
std::optional<std::string>,
std::optional<std::string>>(repos_st)
| ranges::to_vector;

for (const auto& [name, remote_url, etag, db_mtime] : tups) {
DDS_E_SCOPE(e_url_string{remote_url});
pkg_remote repo{name, neo::url::parse(remote_url)};
repo.update_pkg_db(db);
repo.update_pkg_db(db, etag, db_mtime);
}

dds_log(info, "Recompacting database...");

+ 3
- 1
src/dds/pkg/remote.hpp Ver fichero

@@ -20,7 +20,9 @@ public:
static pkg_remote connect(std::string_view url);

void store(neo::sqlite3::database_ref);
void update_pkg_db(neo::sqlite3::database_ref);
void update_pkg_db(neo::sqlite3::database_ref,
std::optional<std::string_view> etag = {},
std::optional<std::string_view> last_modified = {});
};

void update_all_remotes(neo::sqlite3::database_ref);

+ 112
- 33
src/dds/util/http/pool.cpp Ver fichero

@@ -1,6 +1,7 @@
#include "./pool.hpp"

#include <dds/error/errors.hpp>
#include <dds/util/log.hpp>
#include <dds/util/result.hpp>

#include <boost/leaf/exception.hpp>
@@ -31,6 +32,8 @@ struct http_client_impl {

_state_t _state = _state_t::ready;

bool _peer_disconnected = false;

neo::socket _conn;

std::string _host_string;
@@ -94,24 +97,34 @@ struct http_client_impl {
.parse_tail = {},
};

auto content_len_str = std::to_string(params.content_length);
auto hostname_port = fmt::format("{}:{}", origin.hostname, origin.port);
dds_log(trace,
" --> HTTP {} {}://{}:{}{}",
origin.protocol,
params.method,
origin.hostname,
origin.port,
params.path);

auto hostname_port = fmt::format("{}:{}", origin.hostname, origin.port);

std::pair<std::string_view, std::string_view> headers[] = {
std::vector<std::pair<std::string_view, std::string_view>> headers = {
{"Host", hostname_port},
{"Accept", "*/*"},
{"Content-Length", content_len_str},
{"Content-Length", "0"},
{"TE", "gzip, chunked, plain"},
{"Connection", "keep-alive"},
};
if (!params.prior_etag.empty()) {
headers.push_back({"If-None-Match", params.prior_etag});
}
if (!params.last_modified.empty()) {
headers.push_back({"If-Modified-Since", params.last_modified});
}

_do_io([&](auto&& sink) {
neo::http::write_request(sink, start_line, headers, neo::const_buffer());
});
_state = _state_t::sent_req_head;
if (params.content_length == 0) {
_state = _state_t::sent_req_body;
}
_state = _state_t::sent_req_body;
}

http_response_info recv_head() {
@@ -130,6 +143,18 @@ struct http_client_impl {
if (clen_hdr && clen_hdr->value == "0") {
_state = _state_t::ready;
}
bool disconnect = false;
if (r.version == neo::http::version::v1_0) {
dds_log(trace, "HTTP/1.0 server will disconnect by default");
disconnect = true;
} else if (r.version == neo::http::version::v1_1) {
disconnect = r.header_value("Connection") == "close";
} else {
// Invalid version??
disconnect = true;
}
_peer_disconnected = disconnect;
dds_log(trace, " <-- HTTP {} {}", r.status, r.status_message);
return r;
}
};
@@ -149,6 +174,8 @@ struct http_pool_impl {

using namespace dds;

using client_impl_ptr = std::shared_ptr<detail::http_client_impl>;

http_pool::~http_pool() = default;

http_pool::http_pool()
@@ -156,8 +183,23 @@ http_pool::http_pool()

http_client::~http_client() {
// When the http_client is dropped, return its impl back to the connection pool for this origin
auto pool = _pool.lock();
if (pool && _impl) {
if (!_impl) {
// We are moved-from
return;
}
neo_assert(expects,
_impl->_state == detail::http_client_impl::_state_t::ready,
"An http_client object was dropped while in a partial-request state. Did you read "
"the response header AND body?",
int(_impl->_state),
_impl->origin.protocol,
_impl->origin.hostname,
_impl->origin.port);
if (_impl->_peer_disconnected) {
// Do not return this connection to the pool. Let it destroy
return;
}
if (auto pool = _pool.lock()) {
pool->_clients.emplace(_impl->origin, _impl);
}
}
@@ -210,24 +252,40 @@ template <typename Stream>
struct recv_chunked_state : erased_message_body {
Stream& _strm;
neo::http::chunked_buffers<Stream&> _chunked{_strm};
client_impl_ptr _client;

explicit recv_chunked_state(Stream& s)
: _strm(s) {}
explicit recv_chunked_state(Stream& s, client_impl_ptr c)
: _strm(s)
, _client(c) {}

neo::const_buffer next(std::size_t n) override { return _chunked.next(n); }
void consume(std::size_t n) override { _chunked.consume(n); }
neo::const_buffer next(std::size_t n) override {
auto part = _chunked.next(n);
if (neo::buffer_is_empty(part)) {
_client->_state = detail::http_client_impl::_state_t::ready;
}
return part;
}
void consume(std::size_t n) override { _chunked.consume(n); }
};

template <typename Stream>
struct recv_gzip_state : erased_message_body {
Stream& _strm;
neo::gzip_source<Stream&> _gzip{_strm};
client_impl_ptr _client;

explicit recv_gzip_state(Stream& s)
: _strm(s) {}
explicit recv_gzip_state(Stream& s, client_impl_ptr c)
: _strm(s)
, _client(c) {}

neo::const_buffer next(std::size_t n) override { return _gzip.next(n); }
void consume(std::size_t n) override { _gzip.consume(n); }
neo::const_buffer next(std::size_t n) override {
auto part = _gzip.next(n);
if (neo::buffer_is_empty(part)) {
_client->_state = detail::http_client_impl::_state_t::ready;
}
return part;
}
void consume(std::size_t n) override { _gzip.consume(n); }
};

template <typename Stream>
@@ -236,12 +294,19 @@ struct recv_plain_state : erased_message_body {
std::size_t _size;
client_impl_ptr _client;

explicit recv_plain_state(Stream& s, std::size_t size)
explicit recv_plain_state(Stream& s, std::size_t size, client_impl_ptr cl)
: _strm(s)
, _size(size) {}
, _size(size)
, _client(cl) {}

neo::const_buffer next(std::size_t n) override { return _strm.next((std::min)(n, _size)); }
void consume(std::size_t n) override {
neo::const_buffer next(std::size_t n) override {
auto part = _strm.next((std::min)(n, _size));
if (neo::buffer_is_empty(part)) {
_client->_state = detail::http_client_impl::_state_t::ready;
}
return part;
}
void consume(std::size_t n) override {
_size -= n;
return _strm.consume(n);
}
@@ -264,13 +329,20 @@ std::unique_ptr<erased_message_body> http_client::_make_body_reader(const http_r
return _impl->_do_io([&](auto&& source) -> std::unique_ptr<erased_message_body> {
using source_type = decltype(source);
if (res.content_length() == 0) {
dds_log(trace, "Empty response body");
_set_ready();
return std::make_unique<recv_none_state>();
} else if (res.transfer_encoding() == "chunked") {
return std::make_unique<recv_chunked_state<source_type>>(source);
dds_log(trace, "Chunked response body");
return std::make_unique<recv_chunked_state<source_type>>(source, _impl);
} else if (res.transfer_encoding() == "gzip") {
return std::make_unique<recv_gzip_state<source_type>>(source);
dds_log(trace, "GZip encoded response body");
return std::make_unique<recv_gzip_state<source_type>>(source, _impl);
} else if (!res.transfer_encoding().has_value() && res.content_length() > 0) {
return std::make_unique<recv_plain_state<source_type>>(source, *res.content_length());
dds_log(trace, "Plain response body");
return std::make_unique<recv_plain_state<source_type>>(source,
*res.content_length(),
_impl);
} else {
neo_assert(invariant,
false,
@@ -297,23 +369,30 @@ void http_client::_set_ready() noexcept {
_impl->_state = detail::http_client_impl::_state_t::ready;
}

std::pair<http_client, http_response_info>
http_pool::request_with_redirects(std::string_view method, const neo::url& url_) {
auto url = url_;
request_result http_pool::request(neo::url url, http_request_params params) {
DDS_E_SCOPE(url);
for (auto i = 0; i <= 100; ++i) {
params.path = url.path;
params.query = url.query.value_or("");

auto origin = network_origin::for_url(url);
auto client = client_for_origin(origin);

http_request_params params{
.method = method,
.path = url.path,
.query = url.query.value_or(""),
};
client.send_head(params);
auto resp = client.recv_head();
DDS_E_SCOPE(resp);

if (dds::log::level_enabled(dds::log::level::trace)) {
for (auto hdr : resp.headers) {
dds_log(trace, " -- {}: {}", hdr.key, hdr.value);
}
}

if (resp.not_modified()) {
// Not Modified, a cache hit
return {std::move(client), std::move(resp)};
}

if (resp.is_error()) {
client.discard_body(resp);
throw boost::leaf::exception(http_status_error("Received an error from HTTP"));

+ 16
- 26
src/dds/util/http/pool.hpp Ver fichero

@@ -99,6 +99,13 @@ public:
void discard_body(const http_response_info&);
};

struct request_result {
http_client client;
http_response_info resp;

void discard_body() { client.discard_body(resp); }
};

class http_pool {
friend class http_client;
std::shared_ptr<detail::http_pool_impl> _impl;
@@ -109,37 +116,20 @@ public:
http_pool& operator=(http_pool&&) = default;
~http_pool();

http_client client_for_origin(const network_origin&);

http_response_info request(neo::url_view url) { return request(url, neo::mutable_buffer()); }

template <neo::buffer_output Output>
http_response_info request(neo::url_view url, Output&& out) {
return request(url, neo::const_buffer(), out);
static http_pool& thread_local_pool() {
thread_local http_pool inst;
return inst;
}

template <neo::buffer_input In, neo::buffer_output Out>
http_response_info request(neo::url_view url, In&& in, Out&& out) {
auto origin = network_origin::for_url(url);
auto size = neo::buffer_size(in);
auto client = client_for_origin(origin);
client.send_head(http_request_params{
.method = "GET",
.path = url.path.empty() ? "/" : url.path,
.query = url.query.value_or(""),
.content_length = size,
});
client.send_body(in);
auto resp = client.recv_head();
client.recv_body_into(resp, out);
return resp;
static http_pool& global_pool() {
static http_pool inst;
return inst;
}

std::pair<http_client, http_response_info>
request_with_redirects(http_client& cl, const http_request_params& params);
http_client client_for_origin(const network_origin&);

std::pair<http_client, http_response_info> request_with_redirects(std::string_view method,
const neo::url& url);
request_result request(neo::url url, http_request_params params);
auto request(neo::url url) { return request(url, http_request_params{}); }
};

} // namespace dds

+ 4
- 6
src/dds/util/http/pool.test.cpp Ver fichero

@@ -12,16 +12,14 @@ TEST_CASE("Connect to a remote") {
// auto client = pool.access();
auto cl = pool.client_for_origin({"https", "www.google.com", 443});
cl.send_head({.method = "GET", .path = "/"});
// cl.send_head({.method = "GET", .path = "/"});
auto resp = cl.recv_head();
CHECK(resp.status == 200);
CHECK(resp.status_message == "OK");
cl.discard_body(resp);
}

TEST_CASE("Issue a request on a pool") {
dds::http_pool pool;
neo::string_dynbuf_io body;
auto resp = pool.request(neo::url_view::split("https://www.google.com"), body);
CHECK(resp.status == 200);
CHECK(body.read_area_view().size() > 5);
dds::http_pool pool;
auto resp = pool.request(neo::url::parse("https://www.google.com"));
resp.discard_body();
}

+ 8
- 5
src/dds/util/http/request.hpp Ver fichero

@@ -7,11 +7,14 @@
namespace dds {

struct http_request_params {
std::string_view method;
std::string_view path;
std::string_view query = "";
std::size_t content_length = 0;
neo::http::headers headers{};
std::string_view method = "GET";
std::string_view path{};
std::string_view query{};

bool follow_redirects = true;

std::string_view prior_etag{};
std::string_view last_modified{};
};

} // namespace dds

+ 3
- 0
src/dds/util/http/response.hpp Ver fichero

@@ -21,12 +21,15 @@ struct http_response_info {
bool is_server_error() const noexcept { return status >= 500 && status < 600; }
bool is_error() const noexcept { return is_client_error() || is_server_error(); }
bool is_redirect() const noexcept { return status >= 300 && status < 400; }
bool not_modified() const noexcept { return status == 304; }

std::optional<std::string_view> header_value(std::string_view key) const noexcept;
std::optional<int> content_length() const noexcept;

auto location() const noexcept { return header_value("Location"); }
auto transfer_encoding() const noexcept { return header_value("Transfer-Encoding"); }
auto etag() const noexcept { return header_value("ETag"); }
auto last_modified() const noexcept { return header_value("Last-Modified"); }
};

} // namespace dds

+ 7
- 1
src/dds/util/signal.cpp Ver fichero

@@ -17,6 +17,12 @@ void dds::notify_cancel() noexcept { got_signal = SIGINT; }
void dds::install_signal_handlers() noexcept {
std::signal(SIGINT, handle_signal);
std::signal(SIGTERM, handle_signal);

#ifdef SIGPIPE
// XXX: neo-io doesn't behave nicely when EOF is hit on sockets. This Isn't
// easily fixed portably without simply blocking SIGPIPE globally.
std::signal(SIGPIPE, SIG_IGN);
#endif
}

bool dds::is_cancelled() noexcept { return got_signal != 0; }
@@ -24,4 +30,4 @@ void dds::cancellation_point() {
if (is_cancelled()) {
throw user_cancelled();
}
}
}

Cargando…
Cancelar
Guardar