Skip to content

Instantly share code, notes, and snippets.

@dohai2105
Forked from leiless/uv_pipe_ipc_echo.c
Created November 13, 2023 13:25
Show Gist options
  • Select an option

  • Save dohai2105/7f77536f3427b97b2f7153e7b058702c to your computer and use it in GitHub Desktop.

Select an option

Save dohai2105/7f77536f3427b97b2f7153e7b058702c to your computer and use it in GitHub Desktop.
libuv pipe IPC echo client/server example
/*
* Created Jul 4, 2020.
*/
#include <unistd.h>
#include <uv.h>
#include "test.h"
#include "utils.h"
#include "assertf.h"
#include "uv_utils.h"
#define NR_THREAD 3
#define IPC_PATH "/tmp/.uv.ipc.sock"
static uv_pipe_t *ipc_server = &(uv_pipe_t){};
static uv_thread_t threads[NR_THREAD];
#define READ_BUFSZ 1024
static void ipc_read_cb(uv_poll_t *handle, int status, int events)
{
if (events & UV_DISCONNECT) {
LOG_WARN("Connection reset by IPC server");
int e = uv_poll_stop(handle);
assert_eqf(e, 0, "%d", "uv_poll_stop() fail uv error: %s", UVE(e));
/*
* In order to safely close the loop, you need to uv_close() the
* uv_pipe_t while the loop is active (before uv_run() returns)
* see: https://github.com/libuv/help/issues/156
*/
uv_close((uv_handle_t *) handle, NULL);
/* handle->data is the `ipc_client' in thread_cb() */
uv_close((uv_handle_t *) handle->data, NULL);
return;
}
if (status < 0) {
LOG_ERR("uv poll error: %s", UVE(status));
return;
}
int fd;
int e = uv_fileno((uv_handle_t *) handle, &fd);
assert_eq(e, 0, "%d");
static char buffer[READ_BUFSZ];
ssize_t nread;
out_EINTR:
nread = read(fd, buffer, sizeof(buffer));
if (nread < 0) {
if (errno == EINTR) goto out_EINTR;
LOG_ERR("read(2) fail fd: %d errno: %d", fd, errno);
} else if (nread == 0) {
LOG_ERR("Connection reset by peer");
} else {
(void) fprintf(stderr, "%.*s", (int) nread, buffer);
}
}
static void ipc_client_connect_cb(uv_connect_t *req, int status)
{
if (status < 0) {
LOG_ERR("uv_connect() fail uv error: %s", UVE(status));
return;
}
int fd;
int e = uv_fileno((uv_handle_t *) req->handle, &fd);
assert_eq(e, 0, "%d");
poll_ctx_t *p = req->data;
/* req->handle->loop is the `thread_loop' */
e = uv_util_poll_init_start(p, req->handle->loop, fd, UV_READABLE | UV_DISCONNECT, ipc_read_cb);
assert_eq(e, 0, "%d");
}
static void thread_cb(void *arg)
{
assert_null(arg);
uv_loop_t *thread_loop = &(uv_loop_t){};
int e = uv_loop_init(thread_loop);
assert_eqf(e, 0, "%d", "uv_loop_init() fail uv error: %s", UVE(e));
uv_pipe_t *ipc_client = &(uv_pipe_t){};
e = uv_pipe_init(thread_loop, ipc_client, 0);
assert_eq(e, 0, "%d");
poll_ctx_t p;
p.handle.data = ipc_client;
uv_connect_t *conn = &(uv_connect_t){};
conn->data = &p;
uv_pipe_connect(conn, ipc_client, IPC_PATH, ipc_client_connect_cb);
e = uv_run(thread_loop, UV_RUN_DEFAULT);
assert_eq(e, 0, "%d");
e = uv_loop_close(thread_loop);
assert_eqf(e, 0, "%d", "uv_loop_close() fail error: %s", UVE(e));
}
static void thread_init(void)
{
int e;
LOG("Waiting IPC clients...");
for (size_t i = 0; i < ARRAY_SIZE(threads); i++) {
e = uv_thread_create(&threads[i], thread_cb, NULL);
assert_eqf(e, 0, "%d", " uv_thread_create() fail uv error: %s", UVE(e));
}
}
static void thread_join(void)
{
int e;
for (size_t i = 0; i < ARRAY_SIZE(threads); i++) {
e = uv_thread_join(&threads[i]);
assert_eqf(e, 0, "%d", "uv_thread_join() fail uv error: %s", UVE(e));
}
}
/* Round robin index */
static uint32_t rr_index = 0;
static uv_pipe_t ipc_client[NR_THREAD];
#define USE_STDIO 0
#if USE_STDIO
static ssize_t fdgets(char *s, size_t size, int fd)
{
char c;
size_t nread = 0;
ssize_t n = 0;
while (nread + 1 < size && (n = read(fd, &c, 1)) > 0) {
s[nread] = c;
nread += n;
if (c == '\n') break;
}
if (size) s[nread] = '\0';
return n >= 0 ? (ssize_t) nread : n;
}
/*
* Called in main thread
*/
static void stdin_read_cb(uv_poll_t *handle, int status, int events)
{
if (status < 0) {
LOG_ERR("uv poll error: %s", UVE(status));
return;
}
assert_eq(events, UV_READABLE, "%#x");
int fd;
int e = uv_fileno((uv_handle_t *) handle, &fd);
assert_eq(e, 0, "%d");
assert_eq(fd, STDIN_FILENO, "%d");
static char buffer[4096];
ssize_t n = fdgets(buffer, sizeof(buffer), fd);
if (n > 0) {
static uv_write_t write_req;
uv_buf_t buf = uv_buf_init(buffer, n);
uv_stream_t *stream = (uv_stream_t *) &ipc_client[rr_index];
if (++rr_index == NR_THREAD) {
rr_index = 0;
}
e = uv_write(&write_req, stream, &buf, 1, NULL);
if (e != 0) {
if (e == UV_EPIPE) {
LOG_ERR("Broken pipe i: %u", rr_index ? rr_index - 1 : NR_THREAD);
} else {
panicf("uv_write() fail uv error: %s", UVE(e));
}
}
} else if (n == 0) {
LOG_DBG("Met EOF in stdin");
e = uv_poll_stop(handle);
assert_eqf(e, 0, "%d", "uv_poll_stop() fail uv error: %s", UVE(e));
uv_close((uv_handle_t *) handle, NULL);
for (size_t i = 0; i < ARRAY_SIZE(ipc_client); i++) {
uv_close((uv_handle_t *) &ipc_client[i], NULL);
}
uv_close((uv_handle_t *) ipc_server, NULL);
} else {
/* Simply ignore if there is any error */
}
}
#else
static void ipc_echo(void)
{
static uv_write_t write_req;
char *buffer = "+";
size_t n = strlen(buffer);
uv_buf_t buf = uv_buf_init(buffer, n);
while (1) {
uv_stream_t *stream = (uv_stream_t *) &ipc_client[rr_index];
if (++rr_index == NR_THREAD) {
rr_index = 0;
}
int e = uv_write(&write_req, stream, &buf, 1, NULL);
if (e != 0) {
if (e == UV_EPIPE) {
LOG_ERR("Broken pipe i: %u", rr_index ? rr_index - 1 : NR_THREAD);
} else {
panicf("uv_write() fail uv error: %s", UVE(e));
}
}
/* XXX: In case of IPC client send too soon(need confirmation)? */
ms_sleep(1);
}
}
#endif
/**
* Called in main thread
*/
static void ipc_listen_cb(uv_stream_t *server, int status)
{
static size_t i = 0;
assert_lt(i, NR_THREAD, "%zu");
assert_eq(server, ipc_server, "%p");
if (status < 0) {
LOG_ERR("uv listen error: %s", UVE(status));
return;
}
int e = uv_pipe_init(server->loop, &ipc_client[i], 0);
assert_eqf(e, 0, "%d", "uv_pipe_init() fail uv error: %s", UVE(e));
e = uv_accept(server, (uv_stream_t *) &ipc_client[i]);
assert_eqf(e, 0, "%d", "uv_accept() fail uv error: %s", UVE(e));
if (++i == NR_THREAD) {
#if USE_STDIO
LOG("All IPC clients connected, ready to poll stdin input...");
static poll_ctx_t poll_ctx;
e = uv_util_poll_init_start(&poll_ctx, server->loop, STDIN_FILENO, UV_READABLE, stdin_read_cb);
assert_eqf(e, 0, "%d", "uv_util_poll_init_start() fail uv error: %s", UVE(e));
#else
LOG("All IPC clients connected");
ipc_echo();
#endif
}
}
int test_main(void)
{
LOG("%s", uv_version_string());
uv_loop_t *main_loop = uv_default_loop();
assert_nonnull(main_loop);
int e = uv_pipe_init(main_loop, ipc_server, 0);
assert_eqf(e, 0, "%d", "uv_pipe_init() fail uv error: %s", UVE(e));
uv_fs_t *fs = &(uv_fs_t){};
e = uv_fs_unlink(main_loop, fs, IPC_PATH, NULL);
if (e != 0 && e != UV_ENOENT) {
panicf("uv_fs_unlink() fail uv error: %s", UVE(e));
}
e = uv_pipe_bind(ipc_server, IPC_PATH);
assert_eqf(e, 0, "%d", "uv_pipe_bind() fail uv error: %s", UVE(e));
e = uv_listen((uv_stream_t *) ipc_server, SOMAXCONN, ipc_listen_cb);
assert_eqf(e, 0, "%d", "uv_listen() fail uv error: %s", UVE(e));
thread_init();
/*
* [sic]
* Returns non-zero if uv_stop() was called and there are still active handles or requests.
* Returns zero in all other cases.
*/
e = uv_run(main_loop, UV_RUN_DEFAULT);
assert_eqf(e, 0, "%d", "uv_run() fail uv error: %s", UVE(e));
LOG("Main thread main_loop terminated, join threads...");
thread_join();
e = uv_loop_close(main_loop);
assert_eqf(e, 0, "%d", "uv_loop_close() fail uv error: %s", UVE(e));
#if UV_VER_REQ(>=, 1, 38, 0)
/* see: libuv/test/task.h#MAKE_VALGRIND_HAPPY() */
uv_library_shutdown();
#endif
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment