webmcp
diff libraries/mondelefant/mondelefant_native.c @ 393:b2d2ff6e4385
Added <db_handle>:wait(...) method to wait for NOTIFYs
author | jbe |
---|---|
date | Wed Dec 09 03:40:27 2015 +0100 (2015-12-09) |
parents | 3fffd1ae5a68 |
children | 762ab1e87702 |
line diff
1.1 --- a/libraries/mondelefant/mondelefant_native.c Fri Dec 04 14:04:54 2015 +0100 1.2 +++ b/libraries/mondelefant/mondelefant_native.c Wed Dec 09 03:40:27 2015 +0100 1.3 @@ -4,6 +4,7 @@ 1.4 #include <postgres.h> 1.5 #include <catalog/pg_type.h> 1.6 #include <stdint.h> 1.7 +#include <time.h> 1.8 1.9 // NOTE: Comments with format "// <number>" denote the Lua stack position 1.10 1.11 @@ -310,6 +311,8 @@ 1.12 lua_getfield(L, LUA_REGISTRYINDEX, MONDELEFANT_CONN_DATA_REGKEY); // 2 1.13 lua_pushvalue(L, 1); // 3 1.14 lua_newtable(L); // 4 1.15 + lua_pushinteger(L, PQsocket(pgconn)); // determine file descriptor 1.16 + lua_setfield(L, 4, "fd"); // set "fd" attribute 1.17 lua_settable(L, 2); 1.18 lua_settop(L, 1); 1.19 // store key "engine" with value "postgresql" as connection specific data: 1.20 @@ -402,6 +405,11 @@ 1.21 conn = mondelefant_get_conn(L, 1); 1.22 PQfinish(conn->pgconn); 1.23 conn->pgconn = NULL; 1.24 + lua_getfield(L, LUA_REGISTRYINDEX, MONDELEFANT_CONN_DATA_REGKEY); // 2 1.25 + lua_pushvalue(L, 1); // 3 1.26 + lua_gettable(L, 2); // 3 1.27 + lua_pushnil(L); // 4 1.28 + lua_setfield(L, 3, "fd"); // set "fd" attribute to nil 1.29 return 0; 1.30 } 1.31 1.32 @@ -438,6 +446,88 @@ 1.33 return 1; 1.34 } 1.35 1.36 +// method "wait" of database handles: 1.37 +static int mondelefant_conn_wait(lua_State *L) { 1.38 + mondelefant_conn_t *conn; 1.39 + int infinite, nonblock = 0; 1.40 + struct timespec wakeup; 1.41 + int fd; 1.42 + fd_set fds; 1.43 + conn = mondelefant_get_conn(L, 1); 1.44 + infinite = lua_isnoneornil(L, 2); 1.45 + if (!infinite) { 1.46 + lua_Number n; 1.47 + int isnum; 1.48 + n = lua_tonumberx(L, 2, &isnum); 1.49 + if (isnum && n>0 && n<=86400*366) { 1.50 + if (clock_gettime(CLOCK_MONOTONIC, &wakeup)) { 1.51 + return luaL_error(L, "Could not access CLOCK_MONOTONIC"); 1.52 + } 1.53 + wakeup.tv_sec += n; 1.54 + wakeup.tv_nsec += 1000000000 * (n - (time_t)n); 1.55 + if (wakeup.tv_nsec >= 1000000000) { 1.56 + wakeup.tv_sec += 1; 1.57 + wakeup.tv_nsec -= 1000000000; 1.58 + } 1.59 + } else if (isnum && n==0) { 1.60 + nonblock = 1; 1.61 + } else { 1.62 + luaL_argcheck(L, 0, 2, "not a valid timeout"); 1.63 + } 1.64 + } 1.65 + if (!nonblock) { 1.66 + fd = PQsocket(conn->pgconn); 1.67 + FD_ZERO(&fds); 1.68 + FD_SET(fd, &fds); 1.69 + } 1.70 + while (true) { 1.71 + { 1.72 + PGnotify *notify; 1.73 + if (!PQconsumeInput(conn->pgconn)) { 1.74 + lua_pushnil(L); 1.75 + mondelefant_push_first_line(L, PQerrorMessage(conn->pgconn)); 1.76 + return 2; 1.77 + } 1.78 + notify = PQnotifies(conn->pgconn); 1.79 + if (notify) { 1.80 + lua_pushstring(L, notify->relname); 1.81 + lua_pushstring(L, notify->extra); 1.82 + lua_pushinteger(L, notify->be_pid); 1.83 + PQfreemem(notify); 1.84 + return 3; 1.85 + } 1.86 + } 1.87 + if (infinite) { 1.88 + select(fd+1, &fds, NULL, NULL, NULL); 1.89 + } else if (nonblock) { 1.90 + break; 1.91 + } else { 1.92 + struct timespec tp; 1.93 + struct timeval timeout = { 0, }; 1.94 + if (clock_gettime(CLOCK_MONOTONIC, &tp)) { 1.95 + return luaL_error(L, "Could not access CLOCK_MONOTONIC"); 1.96 + } 1.97 + tp.tv_sec = wakeup.tv_sec - tp.tv_sec; 1.98 + tp.tv_nsec = wakeup.tv_nsec - tp.tv_nsec; 1.99 + if (tp.tv_nsec < 0) { 1.100 + tp.tv_sec -= 1; 1.101 + tp.tv_nsec += 1000000000; 1.102 + } 1.103 + timeout.tv_sec = tp.tv_sec; 1.104 + timeout.tv_usec = (tp.tv_nsec + 500) / 1000; 1.105 + if ( 1.106 + timeout.tv_sec < 0 || 1.107 + (timeout.tv_sec == 0 && timeout.tv_usec == 0) 1.108 + ) break; 1.109 + select(fd+1, &fds, NULL, NULL, &timeout); 1.110 + } 1.111 + } 1.112 + lua_pushboolean(L, 0); 1.113 + if (nonblock) lua_pushliteral(L, "No notification pending"); 1.114 + else lua_pushliteral(L, "Timeout while waiting for notification"); 1.115 + return 2; 1.116 +} 1.117 + 1.118 // method "create_list" of database handles: 1.119 static int mondelefant_conn_create_list(lua_State *L) { 1.120 // ensure that first argument is a database connection: 1.121 @@ -1767,6 +1857,7 @@ 1.122 {"close", mondelefant_conn_close}, 1.123 {"is_ok", mondelefant_conn_is_ok}, 1.124 {"get_transaction_status", mondelefant_conn_get_transaction_status}, 1.125 + {"wait", mondelefant_conn_wait}, 1.126 {"create_list", mondelefant_conn_create_list}, 1.127 {"create_object", mondelefant_conn_create_object}, 1.128 {"quote_string", mondelefant_conn_quote_string},