moonbridge
changeset 149:c820130f55d7
New function moonbridge_io.run(...) as a "coroutine scheduler"
author | jbe |
---|---|
date | Fri May 08 03:18:07 2015 +0200 (2015-05-08) |
parents | c51c38d991df |
children | b0e2fbf9d5a8 |
files | moonbridge_io.c moonbridge_io.h reference.txt |
line diff
1.1 --- a/moonbridge_io.c Thu May 07 22:22:21 2015 +0200 1.2 +++ b/moonbridge_io.c Fri May 08 03:18:07 2015 +0200 1.3 @@ -23,6 +23,8 @@ 1.4 #include <lauxlib.h> 1.5 #include <lualib.h> 1.6 1.7 +#include <assert.h> 1.8 + 1.9 #define MOONBR_IO_MAXSTRERRORLEN 80 1.10 #define MOONBR_IO_READBUFLEN 4096 1.11 #define MOONBR_IO_WRITEBUFLEN 4096 1.12 @@ -38,6 +40,7 @@ 1.13 #define MOONBR_IO_LISTENER_MT_REGKEY "moonbridge_io_listener" 1.14 1.15 char moonbr_io_block_udata = 0; 1.16 +char moonbr_io_multiblock_udata = 0; 1.17 1.18 typedef struct { 1.19 int fd; 1.20 @@ -74,7 +77,7 @@ 1.21 } moonbr_io_listener_t; 1.22 1.23 static int moonbr_io_yield(lua_State *L) { 1.24 - return lua_yield(L, 0); 1.25 + return lua_yield(L, lua_gettop(L)); 1.26 } 1.27 1.28 #if LUA_VERSION_NUM >= 503 1.29 @@ -1334,6 +1337,154 @@ 1.30 return 1; 1.31 } 1.32 1.33 +#define MOONBR_IO_RUN_STACKBASE 7 1.34 + 1.35 +#if LUA_VERSION_NUM >= 503 1.36 +static int moonbr_io_run_cont(lua_State *L, int status, lua_KContext ctx) { 1.37 +#else 1.38 +static int moonbr_io_run_cont(lua_State *L) { 1.39 +#endif 1.40 +#if !(LUA_VERSION_NUM >= 503) 1.41 + int ctx = 0; 1.42 + lua_getctx(L, &ctx); 1.43 +#endif 1.44 + while (1) { 1.45 + int work_to_do = 0; 1.46 + assert(lua_gettop(L) == 7); 1.47 + if (lua_isboolean(L, -1) && !lua_toboolean(L, -1)) return 1; 1.48 + lua_pop(L, 1); 1.49 + if (ctx) { 1.50 + for (lua_pushnil(L); lua_next(L, 4); lua_pop(L, 1)) { 1.51 + lua_pushvalue(L, -2); 1.52 + lua_pushnil(L); 1.53 + lua_rawset(L, 4); 1.54 + } 1.55 + for (lua_pushnil(L); lua_next(L, 5); lua_pop(L, 1)) { 1.56 + lua_pushvalue(L, -2); 1.57 + lua_pushnil(L); 1.58 + lua_rawset(L, 5); 1.59 + } 1.60 + } 1.61 + assert(lua_gettop(L) == 6); 1.62 + while (lua_next(L, 1)) { 1.63 + void *marker; 1.64 + assert(lua_gettop(L) == MOONBR_IO_RUN_STACKBASE); 1.65 + while (1) { 1.66 + lua_pushvalue(L, -2); 1.67 + lua_call(L, 0, LUA_MULTRET); 1.68 + if (!lua_checkstack(L, LUA_MINSTACK)) luaL_error(L, "Lua stack exhausted"); 1.69 + marker = lua_touserdata(L, MOONBR_IO_RUN_STACKBASE+1); 1.70 + if (marker == &moonbr_io_block_udata) { 1.71 + const char *mode = lua_tostring(L, MOONBR_IO_RUN_STACKBASE+3); 1.72 + if (mode && !lua_isnoneornil(L, MOONBR_IO_RUN_STACKBASE+2)) { 1.73 + if (strchr(mode, 'r')) { 1.74 + lua_pushvalue(L, MOONBR_IO_RUN_STACKBASE+2); 1.75 + lua_pushboolean(L, 1); 1.76 + lua_rawset(L, 4); 1.77 + } 1.78 + if (strchr(mode, 'w')) { 1.79 + lua_pushvalue(L, MOONBR_IO_RUN_STACKBASE+2); 1.80 + lua_pushboolean(L, 1); 1.81 + lua_rawset(L, 5); 1.82 + } 1.83 + } 1.84 + work_to_do = 1; 1.85 + break; 1.86 + } else if (marker == &moonbr_io_multiblock_udata) { 1.87 + if (lua_type(L, MOONBR_IO_RUN_STACKBASE+2) == LUA_TTABLE) { 1.88 + for (lua_pushnil(L); lua_next(L, MOONBR_IO_RUN_STACKBASE+2); lua_pop(L, 1)) { 1.89 + if (lua_toboolean(L, -1)) { 1.90 + lua_pushvalue(L, -2); 1.91 + lua_pushboolean(L, 1); 1.92 + lua_rawset(L, 4); 1.93 + } 1.94 + } 1.95 + } 1.96 + if (lua_type(L, MOONBR_IO_RUN_STACKBASE+3) == LUA_TTABLE) { 1.97 + for (lua_pushnil(L); lua_next(L, MOONBR_IO_RUN_STACKBASE+3); lua_pop(L, 1)) { 1.98 + if (lua_toboolean(L, -1)) { 1.99 + lua_pushvalue(L, -2); 1.100 + lua_pushboolean(L, 1); 1.101 + lua_rawset(L, 5); 1.102 + } 1.103 + } 1.104 + } 1.105 + work_to_do = 1; 1.106 + break; 1.107 + } else if (lua_isboolean(L, MOONBR_IO_RUN_STACKBASE)) { 1.108 + lua_pushvalue(L, MOONBR_IO_RUN_STACKBASE-1); 1.109 + lua_pushnil(L); 1.110 + lua_rawset(L, 1); 1.111 + break; 1.112 + } else { 1.113 + lua_pushvalue(L, MOONBR_IO_RUN_STACKBASE); 1.114 + lua_insert(L, MOONBR_IO_RUN_STACKBASE+1); 1.115 + lua_call(L, lua_gettop(L)-1-MOONBR_IO_RUN_STACKBASE, 1); 1.116 + if (lua_toboolean(L, -1)) { 1.117 + lua_pushvalue(L, MOONBR_IO_RUN_STACKBASE-1); 1.118 + lua_pushnil(L); 1.119 + lua_rawset(L, 1); 1.120 + break; 1.121 + } 1.122 + } 1.123 + lua_settop(L, MOONBR_IO_RUN_STACKBASE); 1.124 + } 1.125 + lua_settop(L, MOONBR_IO_RUN_STACKBASE-1); 1.126 + } 1.127 + if (!work_to_do) { 1.128 + lua_pushboolean(L, 1); 1.129 + return 1; 1.130 + } 1.131 + lua_pushnil(L); 1.132 + assert(lua_gettop(L) == 6); 1.133 + ctx = 1; 1.134 + if (lua_isfunction(L, 2)) { 1.135 + lua_pushvalue(L, 2); 1.136 + lua_pushlightuserdata(L, &moonbr_io_multiblock_udata); 1.137 + lua_pushvalue(L, 4); 1.138 + lua_pushvalue(L, 5); 1.139 + lua_callk(L, 3, 1, ctx, moonbr_io_run_cont); 1.140 + } else { 1.141 + lua_pushcfunction(L, moonbr_io_poll); 1.142 + lua_pushvalue(L, 4); 1.143 + lua_pushvalue(L, 5); 1.144 + if (lua_isnil(L, 2)) { 1.145 + lua_call(L, 2, 1); 1.146 + } else { 1.147 + lua_pushvalue(L, 2); 1.148 + lua_pushcfunction(L, moonbr_io_timeref); 1.149 + lua_pushvalue(L, 3); 1.150 + lua_call(L, 1, 1); 1.151 + lua_arith(L, LUA_OPSUB); 1.152 + lua_call(L, 3, 1); 1.153 + } 1.154 + } 1.155 + assert(lua_gettop(L) == 7); 1.156 + } 1.157 +} 1.158 + 1.159 +static int moonbr_io_run(lua_State *L) { 1.160 + lua_settop(L, 2); 1.161 + luaL_checktype(L, 1, LUA_TTABLE); 1.162 + if (lua_isnil(L, 2) || lua_isfunction(L, 2)) { 1.163 + lua_pushnil(L); 1.164 + } else if (!lua_isnil(L, 2)) { 1.165 + luaL_checknumber(L, 2); 1.166 + lua_pushcfunction(L, moonbr_io_timeref); 1.167 + lua_call(L, 0, 1); 1.168 + } 1.169 + assert(lua_gettop(L) == 3); 1.170 + lua_newtable(L); /* read_fds at stack position 4 */ 1.171 + lua_newtable(L); /* write_fds at stack position 5 */ 1.172 + lua_pushnil(L); /* current thread */ 1.173 + lua_pushnil(L); 1.174 +#if LUA_VERSION_NUM >= 503 1.175 + return moonbr_io_run_cont(L, 0, 0); 1.176 +#else 1.177 + return moonbr_io_run_cont(L); 1.178 +#endif 1.179 +} 1.180 + 1.181 static const struct luaL_Reg moonbr_io_handle_methods[] = { 1.182 {"read", moonbr_io_read}, 1.183 {"read_nb", moonbr_io_read_nb}, 1.184 @@ -1385,6 +1536,7 @@ 1.185 {"tcplisten", moonbr_io_tcplisten}, 1.186 {"poll", moonbr_io_poll}, 1.187 {"timeref", moonbr_io_timeref}, 1.188 + {"run", moonbr_io_run}, 1.189 {NULL, NULL} 1.190 }; 1.191 1.192 @@ -1396,6 +1548,8 @@ 1.193 1.194 lua_pushlightuserdata(L, &moonbr_io_block_udata); 1.195 lua_setfield(L, -2, "block"); 1.196 + lua_pushlightuserdata(L, &moonbr_io_multiblock_udata); 1.197 + lua_setfield(L, -2, "multiblock"); 1.198 1.199 lua_newtable(L); // public metatable 1.200 lua_newtable(L); // handle methods
2.1 --- a/moonbridge_io.h Thu May 07 22:22:21 2015 +0200 2.2 +++ b/moonbridge_io.h Fri May 08 03:18:07 2015 +0200 2.3 @@ -1,5 +1,6 @@ 2.4 2.5 char moonbr_io_block_udata; 2.6 +char moonbr_io_multiblock_udata; 2.7 2.8 void moonbr_io_pushhandle(lua_State *L, int fd); 2.9 void moonbr_io_closehandle(lua_State *L, int idx, int reset);
3.1 --- a/reference.txt Thu May 07 22:22:21 2015 +0200 3.2 +++ b/reference.txt Fri May 08 03:18:07 2015 +0200 3.3 @@ -187,8 +187,9 @@ 3.4 3.5 ### socket:read_call(waitfunc, maxlen, terminator) 3.6 3.7 -Same as socket:read(maxlen, terminator), but calls waitfunc(socket, "r") (in an 3.8 -infinite loop) as long as the reading is blocked. 3.9 +Same as socket:read(maxlen, terminator), but calls waitfunc( 3.10 +moonbridge_io.block, socket, "r") (in an infinite loop) as long as the reading 3.11 +is blocked. 3.12 3.13 3.14 ### socket:read_nb(maxlen, terminator) 3.15 @@ -255,8 +256,8 @@ 3.16 3.17 ### socket:write_call(waitfunc, ...) 3.18 3.19 -Same as socket:write(...), but calls waitfunc(socket, "w") (in an infinite 3.20 -loop) as long as the writing is blocked. 3.21 +Same as socket:write(...), but calls waitfunc(moonbridge_io.block, socket, 3.22 +"w") (in an infinite loop) as long as the writing is blocked. 3.23 3.24 3.25 ### socket:write_nb(...) 3.26 @@ -292,6 +293,18 @@ 3.27 listed below. 3.28 3.29 3.30 +### moonbridge_io.block 3.31 + 3.32 +An opaque value (lightuserdata) used by yielding non-blocking I/O functions. 3.33 + 3.34 +When socket:read_yield(...) could not read from a socket, it yields the three 3.35 +values moonbridge_io.block, the socket, and the string "r". 3.36 +When socket:write_yield(...) could not write to a socket, it yields the three 3.37 +values moonbridge_io.block, the socket, and the string "w". 3.38 + 3.39 +See reference for moonbridge_io.run(...) for further information. 3.40 + 3.41 + 3.42 ### moonbridge_io.localconnect(path) 3.43 3.44 Tries to connect to a local socket (also known as Unix Domain Socket). Returns 3.45 @@ -339,6 +352,12 @@ 3.46 nil (as first return value) plus an error message (as second return value). 3.47 3.48 3.49 +### moonbridge_io.multiblock 3.50 + 3.51 +An opaque value (lightuserdata) used by yielding non-blocking I/O functions. 3.52 +See reference for moonbridge_io.run(...) for further information. 3.53 + 3.54 + 3.55 ### moonbridge_io.poll(input_set, output_set, timeout) 3.56 3.57 This function waits for at least one of the given file descriptors and/or 3.58 @@ -352,6 +371,74 @@ 3.59 received. 3.60 3.61 3.62 +### moonbridge_io.run(function_set, timeout_or_poll_func) 3.63 + 3.64 +Executes multiple coroutines (created via coroutine.wrap(...)) in the following 3.65 +way: 3.66 + 3.67 + function moonbridge_io.run(function_set, timeout_or_poll_func) 3.68 + local pairs = function(t) return next, t end -- raw pairs 3.69 + local starttime = moonbridge_io.timeref() 3.70 + local read_fds, write_fds = {}, {} 3.71 + while true do 3.72 + local work_to_do = false 3.73 + for func, result_handler in pairs(function_set) do 3.74 + ::again:: 3.75 + local r1, r2, r3, ... = func() 3.76 + if r1 == moonbridge_io.block then 3.77 + if type(r3) == "string" and string.match(r3, "r") then 3.78 + read_fds[r2] = true 3.79 + end 3.80 + if type(r3) == "string" and string.match(r3, "w") then 3.81 + write_fds[r2] = true 3.82 + end 3.83 + work_to_do = true 3.84 + elseif r1 == moonbridge_io.multiblock then 3.85 + for fd, active in pairs(r2) do 3.86 + if active then read_fds[fd] = true end 3.87 + end 3.88 + for fd, active in pairs(r3) do 3.89 + if active then write_fds[fd] = true end 3.90 + end 3.91 + work_to_do = true 3.92 + else 3.93 + if result_handler == true or result_handler(r1, r2, r3, ...) then 3.94 + function_set[func] = nil 3.95 + else 3.96 + goto again 3.97 + end 3.98 + end 3.99 + end 3.100 + if not work_to_do then 3.101 + return true -- all tasks finished 3.102 + end 3.103 + if type(timeout_or_poll_func) == "function" then 3.104 + local poll_func = timeout_or_poll_func 3.105 + if 3.106 + poll_func(moonbridge_io.multiblock, read_fds, write_fds) == false 3.107 + then 3.108 + return false 3.109 + end 3.110 + elseif type(timeout_or_poll_func) == "number" then 3.111 + local timeout = timeout_or_poll_func 3.112 + if 3.113 + moonbridge_io.poll( 3.114 + read_fds, 3.115 + write_fds, 3.116 + timeout - moonbridge_io.timeref(start) 3.117 + ) == false 3.118 + then 3.119 + return false -- timeout 3.120 + end 3.121 + else 3.122 + moonbridge_io.poll(read_fds, write_fds) 3.123 + end 3.124 + for fd in pairs(read_fds) do read_fds[fd] = nil end 3.125 + for fd in pairs(write_fds) do write_fds[fd] = nil end 3.126 + end 3.127 + end 3.128 + 3.129 + 3.130 ### moonbridge_io.tcpconnect(hostname, port) 3.131 3.132 Tries to open a TCP connection with the given host and TCP port number. Returns