# HG changeset patch # User jbe # Date 1449628827 -3600 # Node ID b2d2ff6e43857399c40fd9482b1e86639379abee # Parent c048f0af2497ee9a95bfe0fd63cb96b059eec586 Added :wait(...) method to wait for NOTIFYs diff -r c048f0af2497 -r b2d2ff6e4385 libraries/mondelefant/mondelefant_native.autodoc.lua --- a/libraries/mondelefant/mondelefant_native.autodoc.lua Fri Dec 04 14:04:54 2015 +0100 +++ b/libraries/mondelefant/mondelefant_native.autodoc.lua Wed Dec 09 03:40:27 2015 +0100 @@ -25,6 +25,18 @@ --[[-- +.fd -- file descriptor of underlaying database connection + +The file descriptor number of the underlaying database connection. This value may be used in conjunction with :wait(0) and a select/poll system call to wait for several events at once. + +--]]-- +-- set/unset in mondelefant_native.c in +-- static int mondelefant_connect(lua_State *L) and +-- static int mondelefant_close(lua_State *L) +--//-- + + +--[[-- :close() Closes the database connection. This method may be called multiple times and is called automatically when the database handle is garbage collected. @@ -65,6 +77,22 @@ --[[-- +channel, -- notification channel name +payload, -- notification payload string +pid = -- process ID of notifying server process +:wait( + timeout -- number of seconds to wait, 0 = do not block, nil = wait infinitely +) + +Waits for any NOTIFY event that is being LISTENed for. One or more LISTEN commands must have been sent previously with :query("LISTEN channel_name"). + +--]]-- +-- implemented in mondelefant_native.c as +-- static int mondelefant_conn_wait(lua_State *L) +--//-- + + +--[[-- db_list = -- database result being an empty list :create_list() diff -r c048f0af2497 -r b2d2ff6e4385 libraries/mondelefant/mondelefant_native.c --- a/libraries/mondelefant/mondelefant_native.c Fri Dec 04 14:04:54 2015 +0100 +++ b/libraries/mondelefant/mondelefant_native.c Wed Dec 09 03:40:27 2015 +0100 @@ -4,6 +4,7 @@ #include #include #include +#include // NOTE: Comments with format "// " denote the Lua stack position @@ -310,6 +311,8 @@ lua_getfield(L, LUA_REGISTRYINDEX, MONDELEFANT_CONN_DATA_REGKEY); // 2 lua_pushvalue(L, 1); // 3 lua_newtable(L); // 4 + lua_pushinteger(L, PQsocket(pgconn)); // determine file descriptor + lua_setfield(L, 4, "fd"); // set "fd" attribute lua_settable(L, 2); lua_settop(L, 1); // store key "engine" with value "postgresql" as connection specific data: @@ -402,6 +405,11 @@ conn = mondelefant_get_conn(L, 1); PQfinish(conn->pgconn); conn->pgconn = NULL; + lua_getfield(L, LUA_REGISTRYINDEX, MONDELEFANT_CONN_DATA_REGKEY); // 2 + lua_pushvalue(L, 1); // 3 + lua_gettable(L, 2); // 3 + lua_pushnil(L); // 4 + lua_setfield(L, 3, "fd"); // set "fd" attribute to nil return 0; } @@ -438,6 +446,88 @@ return 1; } +// method "wait" of database handles: +static int mondelefant_conn_wait(lua_State *L) { + mondelefant_conn_t *conn; + int infinite, nonblock = 0; + struct timespec wakeup; + int fd; + fd_set fds; + conn = mondelefant_get_conn(L, 1); + infinite = lua_isnoneornil(L, 2); + if (!infinite) { + lua_Number n; + int isnum; + n = lua_tonumberx(L, 2, &isnum); + if (isnum && n>0 && n<=86400*366) { + if (clock_gettime(CLOCK_MONOTONIC, &wakeup)) { + return luaL_error(L, "Could not access CLOCK_MONOTONIC"); + } + wakeup.tv_sec += n; + wakeup.tv_nsec += 1000000000 * (n - (time_t)n); + if (wakeup.tv_nsec >= 1000000000) { + wakeup.tv_sec += 1; + wakeup.tv_nsec -= 1000000000; + } + } else if (isnum && n==0) { + nonblock = 1; + } else { + luaL_argcheck(L, 0, 2, "not a valid timeout"); + } + } + if (!nonblock) { + fd = PQsocket(conn->pgconn); + FD_ZERO(&fds); + FD_SET(fd, &fds); + } + while (true) { + { + PGnotify *notify; + if (!PQconsumeInput(conn->pgconn)) { + lua_pushnil(L); + mondelefant_push_first_line(L, PQerrorMessage(conn->pgconn)); + return 2; + } + notify = PQnotifies(conn->pgconn); + if (notify) { + lua_pushstring(L, notify->relname); + lua_pushstring(L, notify->extra); + lua_pushinteger(L, notify->be_pid); + PQfreemem(notify); + return 3; + } + } + if (infinite) { + select(fd+1, &fds, NULL, NULL, NULL); + } else if (nonblock) { + break; + } else { + struct timespec tp; + struct timeval timeout = { 0, }; + if (clock_gettime(CLOCK_MONOTONIC, &tp)) { + return luaL_error(L, "Could not access CLOCK_MONOTONIC"); + } + tp.tv_sec = wakeup.tv_sec - tp.tv_sec; + tp.tv_nsec = wakeup.tv_nsec - tp.tv_nsec; + if (tp.tv_nsec < 0) { + tp.tv_sec -= 1; + tp.tv_nsec += 1000000000; + } + timeout.tv_sec = tp.tv_sec; + timeout.tv_usec = (tp.tv_nsec + 500) / 1000; + if ( + timeout.tv_sec < 0 || + (timeout.tv_sec == 0 && timeout.tv_usec == 0) + ) break; + select(fd+1, &fds, NULL, NULL, &timeout); + } + } + lua_pushboolean(L, 0); + if (nonblock) lua_pushliteral(L, "No notification pending"); + else lua_pushliteral(L, "Timeout while waiting for notification"); + return 2; +} + // method "create_list" of database handles: static int mondelefant_conn_create_list(lua_State *L) { // ensure that first argument is a database connection: @@ -1767,6 +1857,7 @@ {"close", mondelefant_conn_close}, {"is_ok", mondelefant_conn_is_ok}, {"get_transaction_status", mondelefant_conn_get_transaction_status}, + {"wait", mondelefant_conn_wait}, {"create_list", mondelefant_conn_create_list}, {"create_object", mondelefant_conn_create_object}, {"quote_string", mondelefant_conn_quote_string},