当前位置: 首页 > 知识库问答 >
问题:

nghttp2:使用服务器发送的事件供EventSource使用

公良阳波
2023-03-14

我使用nghttp2实现了一个REST服务器,它应该使用HTTP/2和服务器发送的事件(由浏览器中的EventSource使用)。然而,基于这些例子,我不清楚如何实现SSE。像在< code>asio-sv.cc中那样使用res.push()似乎不是正确的方法。

做这件事的正确方法是什么?我更喜欢使用nghttp2的C API,但是C API也可以。

共有1个答案

甄阳朔
2023-03-14

是的,我在2018年做过类似的事情。文档相当稀疏:)。

首先,忽略<code>response::push

关键的一点是,无论何时您的end()回调都必须返回NGHTTP2_ERR_DEFERRED。当应用程序以某种方式获得更多要发送的数据时,调用<code>http::response::resume()。

下面是一个简单的代码。构建它为 g -std=c 17 -Wall -O3 -ggdb clock.cpp -lssl -lcrypto -pthread -lnghttp2_asio -lspdlog -lfmt.请注意,现代浏览器不会在明文套接字上执行HTTP / 2,因此您需要通过nghttpx -f '*,8080;no-tls' -b '::1,10080;;proto=h2'

#include <boost/asio/io_service.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/signals2.hpp>
#include <chrono>
#include <list>
#include <nghttp2/asio_http2_server.h>
#define SPDLOG_FMT_EXTERNAL
#include <spdlog/spdlog.h>
#include <thread>

using namespace nghttp2::asio_http2;
using namespace std::literals;

using Signal = boost::signals2::signal<void(const std::string& message)>;

class Client {
    const server::response& res;
    enum State {
        HasEvents,
        WaitingForEvents,
    };
    std::atomic<State> state;

    std::list<std::string> queue;
    mutable std::mutex mtx;
    boost::signals2::scoped_connection subscription;

    size_t send_chunk(uint8_t* destination, std::size_t len, uint32_t* data_flags [[maybe_unused]])
    {
        std::size_t written{0};
        std::lock_guard lock{mtx};
        if (state != HasEvents) throw std::logic_error{std::to_string(__LINE__)};
        while (!queue.empty()) {
            auto num = std::min(queue.front().size(), len - written);
            std::copy_n(queue.front().begin(), num, destination + written);
            written += num;
            if (num < queue.front().size()) {
                queue.front() = queue.front().substr(num);
                spdlog::debug("{} send_chunk: partial write", (void*)this);
                return written;
            }
            queue.pop_front();
            spdlog::debug("{} send_chunk: sent one event", (void*)this);
        }
        state = WaitingForEvents;
        return written;
    }

public:
    Client(const server::request& req, const server::response& res, Signal& signal)
    : res{res}
    , state{WaitingForEvents}
    , subscription{signal.connect([this](const auto& msg) {
        enqueue(msg);
    })}
    {
        spdlog::warn("{}: {} {} {}", (void*)this, boost::lexical_cast<std::string>(req.remote_endpoint()), req.method(), req.uri().raw_path);
        res.write_head(200, {{"content-type", {"text/event-stream", false}}});
    }

    void onClose(const uint32_t ec)
    {
        spdlog::error("{} onClose", (void*)this);
        subscription.disconnect();
    }

    ssize_t process(uint8_t* destination, std::size_t len, uint32_t* data_flags)
    {
        spdlog::trace("{} process", (void*)this);
        switch (state) {
        case HasEvents:
            return send_chunk(destination, len, data_flags);
        case WaitingForEvents:
            return NGHTTP2_ERR_DEFERRED;
        }
        __builtin_unreachable();
    }

    void enqueue(const std::string& what)
    {
        {
            std::lock_guard lock{mtx};
            queue.push_back("data: " + what + "\n\n");
        }
        state = HasEvents;
        res.resume();
    }
};

int main(int argc [[maybe_unused]], char** argv [[maybe_unused]])
{
    spdlog::set_level(spdlog::level::trace);

    Signal sig;
    std::thread timer{[&sig]() {
        for (int i = 0; /* forever */; ++i) {
            std::this_thread::sleep_for(std::chrono::milliseconds{666});
            spdlog::info("tick: {}", i);
            sig("ping #" + std::to_string(i));
        }
    }};

    server::http2 server;
    server.num_threads(4);

    server.handle("/events", [&sig](const server::request& req, const server::response& res) {
        auto client = std::make_shared<Client>(req, res, sig);

        res.on_close([client](const auto ec) {
            client->onClose(ec);
        });
        res.end([client](uint8_t* destination, std::size_t len, uint32_t* data_flags) {
            return client->process(destination, len, data_flags);
        });
    });

    server.handle("/", [](const auto& req, const auto& resp) {
        spdlog::warn("{} {} {}", boost::lexical_cast<std::string>(req.remote_endpoint()), req.method(), req.uri().raw_path);
        resp.write_head(200, {{"content-type", {"text/html", false}}});
        resp.end(R"(<html><head><title>nghttp2 event stream</title></head>
<body><h1>events</h1><ul id="x"></ul>
<script type="text/javascript">
const ev = new EventSource("/events");
ev.onmessage = function(event) {
  const li = document.createElement("li");
  li.textContent = event.data;
  document.getElementById("x").appendChild(li);
};
</script>
</body>
</html>)");
    });

    boost::system::error_code ec;
    if (server.listen_and_serve(ec, "::", "10080")) {
        return 1;
    }
    return 0;
}

我觉得我的队列处理可能太复杂了。当通过curl进行测试时,我似乎从未用完缓冲区空间。换句话说,即使客户端没有从套接字读取任何数据,库也会不断调用<code>send_chunk</code>,一次为我请求最多16kB的数据。奇怪。我不知道在更大力度地推送更多数据时它是如何工作的。

我的“真实代码”曾经有第三种状态,< code>Closed,但是我认为通过< code>on_close阻塞事件在这里已经足够了。然而,我认为如果客户端已经断开连接,但在析构函数被调用之前,你永远不要输入< code>send_chunk。

 类似资料:
  • 我想为我的网站创建实时通知。 参考链接:http://sinhamohit.com/writing/spring-boot-reactive-sse 我该怎么做呢?参考,链接和例子将是伟大的。

  • 问题内容: 双方的WebSockets和服务器发送的事件能够将数据推送到浏览器。在我看来,它们似乎是竞争技术。它们之间有什么区别?您何时会选择一个? 问题答案: Websocket和SSE(服务器发送事件)都能够将数据推送到浏览器,但是它们不是竞争技术。 Websockets连接既可以将数据发送到浏览器,也可以从浏览器接收数据。可以使用websockets的应用程序的一个很好的例子是聊天应用程序。

  • 我正在尝试在Ruby Grape API上创建服务器发送的事件。问题是,连接似乎总是很快关闭,因为我一直在测试网页上看到连接关闭事件。 客户端连接到服务器,因为我可以看到正在调用的方法,但我想知道为什么连接不是恒定的,为什么我没有收到我使用Thread发送的数据。 以下是我的Ruby代码: 以下是我的Javascript: 编辑:我使用GET方法实现了它(我将Grape::API更改为Sinatr

  • 并在config/bootstrap.php中添加了以下行: 在config/events.php中 在lib/event/mylistener.php中 PS:我使用Cakephp事件是因为它允许我在一个地方从不同的控制器收集所需的数据,然后从那里,我可以创建事件流(服务器发送的事件)。如果有更好的选择,请分享。

  • 问题内容: 我正在尝试使用SSE将JSON数据发送到浏览器,但似乎无法正确处理,而且我也不知道为什么。 服务器端看起来像这样: 如您所见,我已经注释掉了帖子内容,但最终我希望将testdata用作JSON本身,如下所示: 客户端看起来像这样: 我看到控制台日志,但 没有看到警报。 问题答案: 尝试发送适当的JSON(输出中未引用): 但最好:

  • 概述 客户端代码 概述 建立连接 open事件 message事件 error事件 自定义事件 close方法 数据格式 概述 data:数据栏 id:数据标识符 event栏:自定义信息类型 retry:最大间隔时间 服务器代码 参考链接 概述 传统的网页都是浏览器向服务器“查询”数据,但是很多场合,最有效的方式是服务器向浏览器“发送”数据。比如,每当收到新的电子邮件,服务器就向浏览器发送一个“