# HG changeset patch # User jbe # Date 1431047887 -7200 # Node ID c820130f55d7a7df6bf54aba4dc52aa1a843c89c # Parent c51c38d991df8a35ed447722167b89bb8bd8ba55 New function moonbridge_io.run(...) as a "coroutine scheduler" diff -r c51c38d991df -r c820130f55d7 moonbridge_io.c --- a/moonbridge_io.c Thu May 07 22:22:21 2015 +0200 +++ b/moonbridge_io.c Fri May 08 03:18:07 2015 +0200 @@ -23,6 +23,8 @@ #include #include +#include + #define MOONBR_IO_MAXSTRERRORLEN 80 #define MOONBR_IO_READBUFLEN 4096 #define MOONBR_IO_WRITEBUFLEN 4096 @@ -38,6 +40,7 @@ #define MOONBR_IO_LISTENER_MT_REGKEY "moonbridge_io_listener" char moonbr_io_block_udata = 0; +char moonbr_io_multiblock_udata = 0; typedef struct { int fd; @@ -74,7 +77,7 @@ } moonbr_io_listener_t; static int moonbr_io_yield(lua_State *L) { - return lua_yield(L, 0); + return lua_yield(L, lua_gettop(L)); } #if LUA_VERSION_NUM >= 503 @@ -1334,6 +1337,154 @@ return 1; } +#define MOONBR_IO_RUN_STACKBASE 7 + +#if LUA_VERSION_NUM >= 503 +static int moonbr_io_run_cont(lua_State *L, int status, lua_KContext ctx) { +#else +static int moonbr_io_run_cont(lua_State *L) { +#endif +#if !(LUA_VERSION_NUM >= 503) + int ctx = 0; + lua_getctx(L, &ctx); +#endif + while (1) { + int work_to_do = 0; + assert(lua_gettop(L) == 7); + if (lua_isboolean(L, -1) && !lua_toboolean(L, -1)) return 1; + lua_pop(L, 1); + if (ctx) { + for (lua_pushnil(L); lua_next(L, 4); lua_pop(L, 1)) { + lua_pushvalue(L, -2); + lua_pushnil(L); + lua_rawset(L, 4); + } + for (lua_pushnil(L); lua_next(L, 5); lua_pop(L, 1)) { + lua_pushvalue(L, -2); + lua_pushnil(L); + lua_rawset(L, 5); + } + } + assert(lua_gettop(L) == 6); + while (lua_next(L, 1)) { + void *marker; + assert(lua_gettop(L) == MOONBR_IO_RUN_STACKBASE); + while (1) { + lua_pushvalue(L, -2); + lua_call(L, 0, LUA_MULTRET); + if (!lua_checkstack(L, LUA_MINSTACK)) luaL_error(L, "Lua stack exhausted"); + marker = lua_touserdata(L, MOONBR_IO_RUN_STACKBASE+1); + if (marker == &moonbr_io_block_udata) { + const char *mode = lua_tostring(L, MOONBR_IO_RUN_STACKBASE+3); + if (mode && !lua_isnoneornil(L, MOONBR_IO_RUN_STACKBASE+2)) { + if (strchr(mode, 'r')) { + lua_pushvalue(L, MOONBR_IO_RUN_STACKBASE+2); + lua_pushboolean(L, 1); + lua_rawset(L, 4); + } + if (strchr(mode, 'w')) { + lua_pushvalue(L, MOONBR_IO_RUN_STACKBASE+2); + lua_pushboolean(L, 1); + lua_rawset(L, 5); + } + } + work_to_do = 1; + break; + } else if (marker == &moonbr_io_multiblock_udata) { + if (lua_type(L, MOONBR_IO_RUN_STACKBASE+2) == LUA_TTABLE) { + for (lua_pushnil(L); lua_next(L, MOONBR_IO_RUN_STACKBASE+2); lua_pop(L, 1)) { + if (lua_toboolean(L, -1)) { + lua_pushvalue(L, -2); + lua_pushboolean(L, 1); + lua_rawset(L, 4); + } + } + } + if (lua_type(L, MOONBR_IO_RUN_STACKBASE+3) == LUA_TTABLE) { + for (lua_pushnil(L); lua_next(L, MOONBR_IO_RUN_STACKBASE+3); lua_pop(L, 1)) { + if (lua_toboolean(L, -1)) { + lua_pushvalue(L, -2); + lua_pushboolean(L, 1); + lua_rawset(L, 5); + } + } + } + work_to_do = 1; + break; + } else if (lua_isboolean(L, MOONBR_IO_RUN_STACKBASE)) { + lua_pushvalue(L, MOONBR_IO_RUN_STACKBASE-1); + lua_pushnil(L); + lua_rawset(L, 1); + break; + } else { + lua_pushvalue(L, MOONBR_IO_RUN_STACKBASE); + lua_insert(L, MOONBR_IO_RUN_STACKBASE+1); + lua_call(L, lua_gettop(L)-1-MOONBR_IO_RUN_STACKBASE, 1); + if (lua_toboolean(L, -1)) { + lua_pushvalue(L, MOONBR_IO_RUN_STACKBASE-1); + lua_pushnil(L); + lua_rawset(L, 1); + break; + } + } + lua_settop(L, MOONBR_IO_RUN_STACKBASE); + } + lua_settop(L, MOONBR_IO_RUN_STACKBASE-1); + } + if (!work_to_do) { + lua_pushboolean(L, 1); + return 1; + } + lua_pushnil(L); + assert(lua_gettop(L) == 6); + ctx = 1; + if (lua_isfunction(L, 2)) { + lua_pushvalue(L, 2); + lua_pushlightuserdata(L, &moonbr_io_multiblock_udata); + lua_pushvalue(L, 4); + lua_pushvalue(L, 5); + lua_callk(L, 3, 1, ctx, moonbr_io_run_cont); + } else { + lua_pushcfunction(L, moonbr_io_poll); + lua_pushvalue(L, 4); + lua_pushvalue(L, 5); + if (lua_isnil(L, 2)) { + lua_call(L, 2, 1); + } else { + lua_pushvalue(L, 2); + lua_pushcfunction(L, moonbr_io_timeref); + lua_pushvalue(L, 3); + lua_call(L, 1, 1); + lua_arith(L, LUA_OPSUB); + lua_call(L, 3, 1); + } + } + assert(lua_gettop(L) == 7); + } +} + +static int moonbr_io_run(lua_State *L) { + lua_settop(L, 2); + luaL_checktype(L, 1, LUA_TTABLE); + if (lua_isnil(L, 2) || lua_isfunction(L, 2)) { + lua_pushnil(L); + } else if (!lua_isnil(L, 2)) { + luaL_checknumber(L, 2); + lua_pushcfunction(L, moonbr_io_timeref); + lua_call(L, 0, 1); + } + assert(lua_gettop(L) == 3); + lua_newtable(L); /* read_fds at stack position 4 */ + lua_newtable(L); /* write_fds at stack position 5 */ + lua_pushnil(L); /* current thread */ + lua_pushnil(L); +#if LUA_VERSION_NUM >= 503 + return moonbr_io_run_cont(L, 0, 0); +#else + return moonbr_io_run_cont(L); +#endif +} + static const struct luaL_Reg moonbr_io_handle_methods[] = { {"read", moonbr_io_read}, {"read_nb", moonbr_io_read_nb}, @@ -1385,6 +1536,7 @@ {"tcplisten", moonbr_io_tcplisten}, {"poll", moonbr_io_poll}, {"timeref", moonbr_io_timeref}, + {"run", moonbr_io_run}, {NULL, NULL} }; @@ -1396,6 +1548,8 @@ lua_pushlightuserdata(L, &moonbr_io_block_udata); lua_setfield(L, -2, "block"); + lua_pushlightuserdata(L, &moonbr_io_multiblock_udata); + lua_setfield(L, -2, "multiblock"); lua_newtable(L); // public metatable lua_newtable(L); // handle methods diff -r c51c38d991df -r c820130f55d7 moonbridge_io.h --- a/moonbridge_io.h Thu May 07 22:22:21 2015 +0200 +++ b/moonbridge_io.h Fri May 08 03:18:07 2015 +0200 @@ -1,5 +1,6 @@ char moonbr_io_block_udata; +char moonbr_io_multiblock_udata; void moonbr_io_pushhandle(lua_State *L, int fd); void moonbr_io_closehandle(lua_State *L, int idx, int reset); diff -r c51c38d991df -r c820130f55d7 reference.txt --- a/reference.txt Thu May 07 22:22:21 2015 +0200 +++ b/reference.txt Fri May 08 03:18:07 2015 +0200 @@ -187,8 +187,9 @@ ### socket:read_call(waitfunc, maxlen, terminator) -Same as socket:read(maxlen, terminator), but calls waitfunc(socket, "r") (in an -infinite loop) as long as the reading is blocked. +Same as socket:read(maxlen, terminator), but calls waitfunc( +moonbridge_io.block, socket, "r") (in an infinite loop) as long as the reading +is blocked. ### socket:read_nb(maxlen, terminator) @@ -255,8 +256,8 @@ ### socket:write_call(waitfunc, ...) -Same as socket:write(...), but calls waitfunc(socket, "w") (in an infinite -loop) as long as the writing is blocked. +Same as socket:write(...), but calls waitfunc(moonbridge_io.block, socket, +"w") (in an infinite loop) as long as the writing is blocked. ### socket:write_nb(...) @@ -292,6 +293,18 @@ listed below. +### moonbridge_io.block + +An opaque value (lightuserdata) used by yielding non-blocking I/O functions. + +When socket:read_yield(...) could not read from a socket, it yields the three +values moonbridge_io.block, the socket, and the string "r". +When socket:write_yield(...) could not write to a socket, it yields the three +values moonbridge_io.block, the socket, and the string "w". + +See reference for moonbridge_io.run(...) for further information. + + ### moonbridge_io.localconnect(path) Tries to connect to a local socket (also known as Unix Domain Socket). Returns @@ -339,6 +352,12 @@ nil (as first return value) plus an error message (as second return value). +### moonbridge_io.multiblock + +An opaque value (lightuserdata) used by yielding non-blocking I/O functions. +See reference for moonbridge_io.run(...) for further information. + + ### moonbridge_io.poll(input_set, output_set, timeout) This function waits for at least one of the given file descriptors and/or @@ -352,6 +371,74 @@ received. +### moonbridge_io.run(function_set, timeout_or_poll_func) + +Executes multiple coroutines (created via coroutine.wrap(...)) in the following +way: + + function moonbridge_io.run(function_set, timeout_or_poll_func) + local pairs = function(t) return next, t end -- raw pairs + local starttime = moonbridge_io.timeref() + local read_fds, write_fds = {}, {} + while true do + local work_to_do = false + for func, result_handler in pairs(function_set) do + ::again:: + local r1, r2, r3, ... = func() + if r1 == moonbridge_io.block then + if type(r3) == "string" and string.match(r3, "r") then + read_fds[r2] = true + end + if type(r3) == "string" and string.match(r3, "w") then + write_fds[r2] = true + end + work_to_do = true + elseif r1 == moonbridge_io.multiblock then + for fd, active in pairs(r2) do + if active then read_fds[fd] = true end + end + for fd, active in pairs(r3) do + if active then write_fds[fd] = true end + end + work_to_do = true + else + if result_handler == true or result_handler(r1, r2, r3, ...) then + function_set[func] = nil + else + goto again + end + end + end + if not work_to_do then + return true -- all tasks finished + end + if type(timeout_or_poll_func) == "function" then + local poll_func = timeout_or_poll_func + if + poll_func(moonbridge_io.multiblock, read_fds, write_fds) == false + then + return false + end + elseif type(timeout_or_poll_func) == "number" then + local timeout = timeout_or_poll_func + if + moonbridge_io.poll( + read_fds, + write_fds, + timeout - moonbridge_io.timeref(start) + ) == false + then + return false -- timeout + end + else + moonbridge_io.poll(read_fds, write_fds) + end + for fd in pairs(read_fds) do read_fds[fd] = nil end + for fd in pairs(write_fds) do write_fds[fd] = nil end + end + end + + ### moonbridge_io.tcpconnect(hostname, port) Tries to open a TCP connection with the given host and TCP port number. Returns