feat(api): "broadcast" events to ALL channels

Problem:
`vim.rpcnotify(0)` and `rpcnotify(0)` are documented as follows:

    If {channel} is 0, the event is broadcast to all channels.

But that's not actually true. Channels must call `nvim_subscribe` to
receive "broadcast" events, so it's actually "multicast".

- Assuming there is a use-case for "broadcast", the current model adds
  an extra step for broadcasting: all channels need to "subscribe".
- The presence of `nvim_subscribe` is a source of confusion for users,
  because its name implies something more generally useful than what it
  does.

Presumably the use-case of `nvim_subscribe` is to avoid "noise" on RPC
channels not expected a broadcast notification, and potentially an error
if the channel client reports an unknown event.

Solution:
- Deprecate `nvim_subscribe`/`nvim_unsubscribe`.
  - If applications want to multicast, they can keep their own multicast
    list. Or they can use `nvim_list_chans()` and `nvim_get_chan_info()`
    to enumerate and filter the clients they want to target.
- Always send "broadcast" events to ALL channels. Don't require channels
  to "subscribe" to receive broadcasts. This matches the documented
  behavior of `rpcnotify()`.
This commit is contained in:
Justin M. Keyes 2024-04-24 13:36:47 +02:00
parent a1550dbf0a
commit 091bba54ef
7 changed files with 37 additions and 133 deletions

View File

@ -1547,24 +1547,6 @@ nvim_strwidth({text}) *nvim_strwidth()*
Return: ~
Number of cells
nvim_subscribe({event}) *nvim_subscribe()*
Subscribes to event broadcasts.
Attributes: ~
|RPC| only
Parameters: ~
• {event} Event type string
nvim_unsubscribe({event}) *nvim_unsubscribe()*
Unsubscribes to event broadcasts.
Attributes: ~
|RPC| only
Parameters: ~
• {event} Event type string
==============================================================================
Vimscript Functions *api-vimscript*

View File

@ -786,3 +786,23 @@ theend:
api_clear_error(&nested_error);
return rv;
}
/// @deprecated
///
/// @param channel_id Channel id (passed automatically by the dispatcher)
/// @param event Event type string
void nvim_subscribe(uint64_t channel_id, String event)
FUNC_API_SINCE(1) FUNC_API_REMOTE_ONLY
{
// Does nothing. `rpcnotify(0,…)` broadcasts to all channels, there are no "subscriptions".
}
/// @deprecated
///
/// @param channel_id Channel id (passed automatically by the dispatcher)
/// @param event Event type string
void nvim_unsubscribe(uint64_t channel_id, String event)
FUNC_API_SINCE(1) FUNC_API_REMOTE_ONLY
{
// Does nothing. `rpcnotify(0,…)` broadcasts to all channels, there are no "subscriptions".
}

View File

@ -1332,36 +1332,6 @@ void nvim_put(ArrayOf(String) lines, String type, Boolean after, Boolean follow,
});
}
/// Subscribes to event broadcasts.
///
/// @param channel_id Channel id (passed automatically by the dispatcher)
/// @param event Event type string
void nvim_subscribe(uint64_t channel_id, String event)
FUNC_API_SINCE(1) FUNC_API_REMOTE_ONLY
{
size_t length = (event.size < METHOD_MAXLEN ? event.size : METHOD_MAXLEN);
char e[METHOD_MAXLEN + 1];
memcpy(e, event.data, length);
e[length] = NUL;
rpc_subscribe(channel_id, e);
}
/// Unsubscribes to event broadcasts.
///
/// @param channel_id Channel id (passed automatically by the dispatcher)
/// @param event Event type string
void nvim_unsubscribe(uint64_t channel_id, String event)
FUNC_API_SINCE(1) FUNC_API_REMOTE_ONLY
{
size_t length = (event.size < METHOD_MAXLEN
? event.size
: METHOD_MAXLEN);
char e[METHOD_MAXLEN + 1];
memcpy(e, event.data, length);
e[length] = NUL;
rpc_unsubscribe(channel_id, e);
}
/// Returns the 24-bit RGB value of a |nvim_get_color_map()| color name or
/// "#rrggbb" hexadecimal string.
///

View File

@ -67,8 +67,6 @@ static void log_notify(char *dir, uint64_t channel_id, const char *name)
# define log_notify(...)
#endif
static Set(cstr_t) event_strings = SET_INIT;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "msgpack_rpc/channel.c.generated.h"
#endif
@ -111,9 +109,9 @@ static Channel *find_rpc_channel(uint64_t id)
return chan;
}
/// Publishes an event to a channel.
/// Publishes an event to a channel (emits a notification to method `name`).
///
/// @param id Channel id. 0 means "broadcast to all subscribed channels"
/// @param id Channel id, or 0 to broadcast to all RPC channels.
/// @param name Event name (application-defined)
/// @param args Array of event arguments
/// @return True if the event was sent successfully, false otherwise.
@ -204,41 +202,6 @@ Object rpc_send_call(uint64_t id, const char *method_name, Array args, ArenaMem
return frame.errored ? NIL : frame.result;
}
/// Subscribes to event broadcasts
///
/// @param id The channel id
/// @param event The event type string
void rpc_subscribe(uint64_t id, char *event)
{
Channel *channel;
if (!(channel = find_rpc_channel(id))) {
abort();
}
const char **key_alloc = NULL;
if (set_put_ref(cstr_t, &event_strings, event, &key_alloc)) {
*key_alloc = xstrdup(event);
}
set_put(cstr_t, channel->rpc.subscribed_events, *key_alloc);
}
/// Unsubscribes to event broadcasts
///
/// @param id The channel id
/// @param event The event type string
void rpc_unsubscribe(uint64_t id, char *event)
{
Channel *channel;
if (!(channel = find_rpc_channel(id))) {
abort();
}
unsubscribe(channel, event);
}
static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, bool eof)
{
Channel *channel = data;
@ -494,34 +457,24 @@ static void send_error(Channel *chan, MsgpackRpcRequestHandler handler, MessageT
api_clear_error(&e);
}
/// Broadcasts a notification to all RPC channels.
static void broadcast_event(const char *name, Array args)
{
kvec_withinit_t(Channel *, 4) subscribed = KV_INITIAL_VALUE;
kvi_init(subscribed);
kvec_withinit_t(Channel *, 4) chans = KV_INITIAL_VALUE;
kvi_init(chans);
Channel *channel;
map_foreach_value(&channels, channel, {
if (channel->is_rpc
&& set_has(cstr_t, channel->rpc.subscribed_events, name)) {
kv_push(subscribed, channel);
if (channel->is_rpc) {
kv_push(chans, channel);
}
});
if (kv_size(subscribed)) {
serialize_request(subscribed.items, kv_size(subscribed), 0, name, args);
if (kv_size(chans)) {
serialize_request(chans.items, kv_size(chans), 0, name, args);
}
kvi_destroy(subscribed);
}
static void unsubscribe(Channel *channel, char *event)
{
if (!set_has(cstr_t, &event_strings, event)) {
WLOG("RPC: ch %" PRIu64 ": tried to unsubscribe unknown event '%s'",
channel->id, event);
return;
}
set_del(cstr_t, channel->rpc.subscribed_events, event);
kvi_destroy(chans);
}
/// Mark rpc state as closed, and release its reference to the channel.
@ -551,7 +504,6 @@ void rpc_free(Channel *channel)
unpacker_teardown(channel->rpc.unpacker);
xfree(channel->rpc.unpacker);
set_destroy(cstr_t, channel->rpc.subscribed_events);
kv_destroy(channel->rpc.call_stack);
api_free_dictionary(channel->rpc.info);
}
@ -719,12 +671,6 @@ const char *get_client_info(Channel *chan, const char *key)
#ifdef EXITFREE
void rpc_free_all_mem(void)
{
cstr_t key;
set_foreach(&event_strings, key, {
xfree((void *)key);
});
set_destroy(cstr_t, &event_strings);
multiqueue_free(ch_before_blocking_events);
}
#endif

View File

@ -37,7 +37,6 @@ typedef struct {
} RequestEvent;
typedef struct {
Set(cstr_t) subscribed_events[1];
bool closed;
Unpacker *unpacker;
uint32_t next_request_id;

View File

@ -1,7 +1,6 @@
local t = require('test.testutil')
local n = require('test.functional.testnvim')()
local assert_log = t.assert_log
local eq, clear, eval, command, next_msg = t.eq, n.clear, n.eval, n.command, n.next_msg
local api = n.api
local exec_lua = n.exec_lua
@ -34,18 +33,18 @@ describe('notify', function()
end)
end)
describe('passing 0 as the channel id', function()
it('sends the notification/args to all subscribed channels', function()
api.nvim_subscribe('event2')
describe('channel id 0', function()
it('broadcasts the notification/args to all channels', function()
eval('rpcnotify(0, "event1", 1, 2, 3)')
eval('rpcnotify(0, "event2", 4, 5, 6)')
eval('rpcnotify(0, "event2", 7, 8, 9)')
eq({ 'notification', 'event1', { 1, 2, 3 } }, next_msg())
eq({ 'notification', 'event2', { 4, 5, 6 } }, next_msg())
eq({ 'notification', 'event2', { 7, 8, 9 } }, next_msg())
api.nvim_unsubscribe('event2')
api.nvim_subscribe('event1')
eval('rpcnotify(0, "event2", 10, 11, 12)')
eval('rpcnotify(0, "event1", 13, 14, 15)')
eq({ 'notification', 'event2', { 10, 11, 12 } }, next_msg())
eq({ 'notification', 'event1', { 13, 14, 15 } }, next_msg())
end)
@ -78,17 +77,6 @@ describe('notify', function()
end)
end)
it('unsubscribe non-existing event #8745', function()
clear { env = {
NVIM_LOG_FILE = testlog,
} }
api.nvim_subscribe('event1')
api.nvim_unsubscribe('doesnotexist')
assert_log("tried to unsubscribe unknown event 'doesnotexist'", testlog, 10)
api.nvim_unsubscribe('event1')
assert_alive()
end)
it('cancels stale events on channel close', function()
local catchan = eval("jobstart(['cat'], {'rpc': v:true})")
local catpath = eval('exepath("cat")')
@ -97,7 +85,6 @@ describe('notify', function()
exec_lua(
[[
vim.rpcnotify(..., "nvim_call_function", 'chanclose', {..., 'rpc'})
vim.rpcnotify(..., "nvim_subscribe", "daily_rant")
return vim.api.nvim_get_chan_info(...)
]],
catchan

View File

@ -181,8 +181,8 @@ describe('eval-API', function()
eq('Vim(call):E117: Unknown function: buffer_get_line', err)
-- some api functions are only useful from a msgpack-rpc channel
err = exc_exec('call nvim_subscribe("fancyevent")')
eq('Vim(call):E117: Unknown function: nvim_subscribe', err)
err = exc_exec('call nvim_set_client_info()')
eq('Vim(call):E117: Unknown function: nvim_set_client_info', err)
end)
it('have metadata accessible with api_info()', function()