# HG changeset patch # User jbe # Date 1428365717 -7200 # Node ID fca51922b7081c5b916db1377997ea20bb9a363b # Parent 1d91c6eedf18429970b1b324107998e60af67fe8 Extended I/O library; Integrated new I/O library into moonbridge.c and moonbridge_http.lua diff -r 1d91c6eedf18 -r fca51922b708 moonbridge.c --- a/moonbridge.c Tue Apr 07 01:17:55 2015 +0200 +++ b/moonbridge.c Tue Apr 07 02:15:17 2015 +0200 @@ -277,12 +277,6 @@ /* Variable set to nonzero value to disallow further calls of 'listen' function */ static int moonbr_booted = 0; -/* Global variables to store information on connection socket in child process */ -static int moonbr_child_peersocket_type; /* type of socket by MOONBR_SOCKETTYPE constant */ -static int moonbr_child_peersocket_fd; /* Original file descriptor of peer socket */ -static luaL_Stream *moonbr_child_peersocket_inputstream; /* Lua input stream of socket */ -static luaL_Stream *moonbr_child_peersocket_outputstream; /* Lua output stream of socket */ - /* Verbosity settings */ static int moonbr_debug = 0; static int moonbr_stat = 0; @@ -817,249 +811,10 @@ } } -/* Closes the input stream from peer unless it has already been closed */ -static int moonbr_child_close_peersocket_inputstream( - int cleanshut, /* nonzero = use shutdown() if applicable */ - int mark /* nonzero = mark the stream as closed for Lua */ -) { - int err = 0; /* nonzero = error occurred */ - int errno2; /* stores previous errno values that take precedence */ - if (moonbr_child_peersocket_inputstream->f) { - /* NOTE: shutdown() with SHUT_RD shows different behavior on different - * operating systems and particularly causes problems with Linux. Hence it - * is disabled here. */ - /* - if (cleanshut && moonbr_child_peersocket_type == MOONBR_SOCKETTYPE_NETWORK) { - if (shutdown(moonbr_child_peersocket_fd, SHUT_RD)) { - errno2 = errno; - err = -1; - } - } - */ - if (fclose(moonbr_child_peersocket_inputstream->f)) { - if (!err) errno2 = errno; - err = -1; - } - moonbr_child_peersocket_inputstream->f = NULL; - } - if (mark) moonbr_child_peersocket_inputstream->closef = NULL; - if (err) errno = errno2; - return err; -} - -/* Closes the output stream to peer unless it has already been closed */ -static int moonbr_child_close_peersocket_outputstream( - int cleanshut, /* nonzero = use fflush() and shutdown() if applicable */ - int mark /* nonzero = mark the stream as closed for Lua */ -) { - int err = 0; /* nonzero = error occurred */ - int errno2; /* stores previous errno values that take precedence */ - if (moonbr_child_peersocket_outputstream->f) { - if (moonbr_child_peersocket_type == MOONBR_SOCKETTYPE_NETWORK) { - if (cleanshut) { - if (fflush(moonbr_child_peersocket_outputstream->f)) { - errno2 = errno; - err = -1; - } else { - if (shutdown(moonbr_child_peersocket_fd, SHUT_WR)) { - errno2 = errno; - err = -1; - } - } - } else { - fpurge(moonbr_child_peersocket_outputstream->f); - } - } - if (fclose(moonbr_child_peersocket_outputstream->f)) { - if (!err) errno2 = errno; - err = -1; - } - moonbr_child_peersocket_outputstream->f = NULL; - } - if (mark) moonbr_child_peersocket_outputstream->closef = NULL; - if (err) errno = errno2; - return err; -} - -/* Perform a clean shutdown of input and output stream (may be called multiple times) */ -static int moonbr_child_close_peersocket(int timeout) { - int errprio = 0; - int errno2; - if (moonbr_child_peersocket_fd == -1) return 0; - if (moonbr_child_close_peersocket_inputstream(1, 1)) { - errprio = 1; - errno2 = errno; - } - if (moonbr_child_close_peersocket_outputstream(1, 1)) { - errprio = 4; - errno2 = errno; - } - if (moonbr_child_peersocket_type == MOONBR_SOCKETTYPE_NETWORK) { - struct linger lingerval = { 0, }; - if (timeout && !errprio) { - lingerval.l_onoff = 1; - lingerval.l_linger = timeout; - } - if (setsockopt(moonbr_child_peersocket_fd, SOL_SOCKET, SO_LINGER, &lingerval, sizeof(lingerval))) { - if (errprio < 2) { - errprio = 2; - errno2 = errno; - } - } - } - if (close(moonbr_child_peersocket_fd)) { - if (errprio < 3) { - errprio = 3; - errno2 = errno; - } - } - moonbr_child_peersocket_fd = -1; - if (errprio) { - errno = errno2; - return -1; - } - return 0; -} - -/* Close socket and cause reset of TCP connection (TCP RST aka "Connection reset by peer") if possible */ -static int moonbr_child_cancel_peersocket() { - int err = 0; - if (moonbr_child_close_peersocket_inputstream(0, 1)) err = -1; - if (moonbr_child_close_peersocket_outputstream(0, 1)) err = -1; - if (close(moonbr_child_peersocket_fd)) err = -1; - moonbr_child_peersocket_fd = -1; - return err; -} - -/* Lua method for socket object to read from input stream */ -static int moonbr_child_lua_read_stream(lua_State *L) { - lua_getfield(L, 1, "input"); - lua_getfield(L, -1, "read"); - lua_insert(L, 1); - lua_replace(L, 2); - lua_call(L, lua_gettop(L) - 1, LUA_MULTRET); - return lua_gettop(L); -} - -/* Lua method for socket object to read from input stream until terminator */ -static int moonbr_child_lua_readuntil_stream(lua_State *L) { - lua_getfield(L, 1, "input"); - lua_getfield(L, -1, "readuntil"); - lua_insert(L, 1); - lua_replace(L, 2); - lua_call(L, lua_gettop(L) - 1, LUA_MULTRET); - return lua_gettop(L); -} - -/* Lua method for socket object to iterate over input stream */ -static int moonbr_child_lua_lines_stream(lua_State *L) { - lua_getfield(L, 1, "input"); - lua_getfield(L, -1, "lines"); - lua_insert(L, 1); - lua_replace(L, 2); - lua_call(L, lua_gettop(L) - 1, LUA_MULTRET); - return lua_gettop(L); -} - -/* Lua method for socket object to write to output stream */ -static int moonbr_child_lua_write_stream(lua_State *L) { - lua_getfield(L, 1, "output"); - lua_getfield(L, -1, "write"); - lua_insert(L, 1); - lua_replace(L, 2); - lua_call(L, lua_gettop(L) - 1, LUA_MULTRET); - return lua_gettop(L); -} - -/* Lua method for socket object to flush the output stream */ -static int moonbr_child_lua_flush_stream(lua_State *L) { - lua_getfield(L, 1, "output"); - lua_getfield(L, -1, "flush"); - lua_insert(L, 1); - lua_replace(L, 2); - lua_call(L, lua_gettop(L) - 1, LUA_MULTRET); - return lua_gettop(L); -} - -/* Lua function to close a single stream (input or output) from/to peer */ -static int moonbr_child_lua_close_stream(lua_State *L) { - luaL_Stream *stream = lua_touserdata(L, 1); - if (stream == moonbr_child_peersocket_inputstream) { - if (moonbr_child_close_peersocket_inputstream(1, 0)) { /* don't mark as closed as it's done by Lua */ - char errmsg[MOONBR_MAXSTRERRORLEN]; - strerror_r(errno, errmsg, MOONBR_MAXSTRERRORLEN); /* use thread-safe call in case child created threads */ - lua_pushnil(L); - lua_pushfstring(L, "Could not close input stream from peer: %s", errmsg); - return 2; - } - } else if (stream == moonbr_child_peersocket_outputstream) { - if (moonbr_child_close_peersocket_outputstream(1, 0)) { /* don't mark as closed as it's done by Lua */ - char errmsg[MOONBR_MAXSTRERRORLEN]; - strerror_r(errno, errmsg, MOONBR_MAXSTRERRORLEN); /* use thread-safe call in case child created threads */ - lua_pushnil(L); - lua_pushfstring(L, "Could not close output stream to peer: %s", errmsg); - return 2; - } - } else { - luaL_argerror(L, 1, "Not a connection socket"); - } - lua_pushboolean(L, 1); - return 1; -} - -/* Lua function to close both input and output stream from/to peer */ -static int moonbr_child_lua_close_both_streams(lua_State *L) { - int timeout = 0; - if (!lua_isnoneornil(L, 2)) { - lua_Integer n = luaL_checkinteger(L, 2); - luaL_argcheck(L, n >= 0 && n <= INT_MAX, 2, "out of range"); - timeout = n; - } - if (moonbr_child_peersocket_fd == -1) { - luaL_error(L, "Connection with peer has already been explicitly closed"); - } - if (moonbr_child_close_peersocket(timeout)) { - char errmsg[MOONBR_MAXSTRERRORLEN]; - strerror_r(errno, errmsg, MOONBR_MAXSTRERRORLEN); /* use thread-safe call in case child created threads */ - lua_pushnil(L); - lua_pushfstring(L, "Could not close socket connection with peer: %s", errmsg); - return 2; - } - lua_pushboolean(L, 1); - return 1; -} - -/* Lua function to close both input and output stream from/to peer */ -static int moonbr_child_lua_cancel_both_streams(lua_State *L) { - if (moonbr_child_peersocket_fd == -1) { - luaL_error(L, "Connection with peer has already been explicitly closed"); - } - if (moonbr_child_cancel_peersocket()) { - char errmsg[MOONBR_MAXSTRERRORLEN]; - strerror_r(errno, errmsg, MOONBR_MAXSTRERRORLEN); /* use thread-safe call in case child created threads */ - lua_pushnil(L); - lua_pushfstring(L, "Could not cancel socket connection with peer: %s", errmsg); - return 2; - } - lua_pushboolean(L, 1); - return 1; -} - -/* Methods of (bidirectional) socket object passed to handler */ -static luaL_Reg moonbr_child_lua_socket_functions[] = { - {"read", moonbr_child_lua_read_stream}, - {"readuntil", moonbr_child_lua_readuntil_stream}, - {"lines", moonbr_child_lua_lines_stream}, - {"write", moonbr_child_lua_write_stream}, - {"flush", moonbr_child_lua_flush_stream}, - {"close", moonbr_child_lua_close_both_streams}, - {"cancel", moonbr_child_lua_cancel_both_streams}, - {NULL, NULL} -}; - /* Main function of child process to be called after fork() and file descriptor rearrangement */ void moonbr_child_run(struct moonbr_pool *pool, lua_State *L) { char controlmsg; + int fd; struct itimerval notimer = { { 0, }, { 0, } }; lua_rawgetp(L, LUA_REGISTRYINDEX, moonbr_luakey_prepare_func(pool)); if (lua_isnil(L, -1)) lua_pop(L, 1); @@ -1076,66 +831,20 @@ if (write(MOONBR_FD_CONTROL, &controlmsg, 1) <= 0) { moonbr_child_log_errno_fatal("Error while sending ready message to parent process"); } - moonbr_child_receive_control_message( - MOONBR_FD_CONTROL, - &controlmsg, - &moonbr_child_peersocket_fd - ); + moonbr_child_receive_control_message(MOONBR_FD_CONTROL, &controlmsg, &fd); if (!( - (controlmsg == MOONBR_COMMAND_TERMINATE && moonbr_child_peersocket_fd == -1) || - (controlmsg == MOONBR_SOCKETTYPE_INTERVAL && moonbr_child_peersocket_fd == -1) || - (controlmsg == MOONBR_SOCKETTYPE_LOCAL && moonbr_child_peersocket_fd != -1) || - (controlmsg == MOONBR_SOCKETTYPE_NETWORK && moonbr_child_peersocket_fd != -1) + (controlmsg == MOONBR_COMMAND_TERMINATE && fd == -1) || + (controlmsg == MOONBR_SOCKETTYPE_INTERVAL && fd == -1) || + (controlmsg == MOONBR_SOCKETTYPE_LOCAL && fd != -1) || + (controlmsg == MOONBR_SOCKETTYPE_NETWORK && fd != -1) )) { moonbr_child_log_fatal("Received illegal control message from parent process"); } if (controlmsg == MOONBR_COMMAND_TERMINATE) break; listener = moonbr_child_receive_pointer(MOONBR_FD_CONTROL); - moonbr_child_peersocket_type = controlmsg; - if (moonbr_child_peersocket_fd != -1) { - { - int clonedfd; - clonedfd = dup(moonbr_child_peersocket_fd); - if (!clonedfd) { - moonbr_child_log_errno_fatal("Could not duplicate file descriptor for input stream"); - } - moonbr_child_peersocket_inputstream = lua_newuserdata(L, sizeof(luaL_Stream)); - if (!moonbr_child_peersocket_inputstream) { - moonbr_child_log_fatal("Memory allocation error"); - } - moonbr_child_peersocket_inputstream->f = fdopen(clonedfd, "rb"); - if (!moonbr_child_peersocket_inputstream->f) { - moonbr_child_log_errno_fatal("Could not open input stream for remote connection"); - } - moonbr_child_peersocket_inputstream->closef = moonbr_child_lua_close_stream; - if (luaL_newmetatable(L, LUA_FILEHANDLE)) { - moonbr_child_log_fatal("Lua metatable LUA_FILEHANDLE does not exist"); - } - lua_setmetatable(L, -2); - } - { - int clonedfd; - clonedfd = dup(moonbr_child_peersocket_fd); - if (!clonedfd) { - moonbr_child_log_errno_fatal("Could not duplicate file descriptor for output stream"); - } - moonbr_child_peersocket_outputstream = lua_newuserdata(L, sizeof(luaL_Stream)); - if (!moonbr_child_peersocket_outputstream) { - moonbr_child_log_fatal("Memory allocation error"); - } - moonbr_child_peersocket_outputstream->f = fdopen(clonedfd, "wb"); - if (!moonbr_child_peersocket_outputstream->f) { - moonbr_child_log_errno_fatal("Could not open output stream for remote connection"); - } - moonbr_child_peersocket_outputstream->closef = moonbr_child_lua_close_stream; - if (luaL_newmetatable(L, LUA_FILEHANDLE)) { - moonbr_child_log_fatal("Lua metatable LUA_FILEHANDLE does not exist"); - } - lua_setmetatable(L, -2); - } - } + if (fd) moonbr_io_pushhandle(L, fd, 1, controlmsg == MOONBR_SOCKETTYPE_NETWORK); lua_rawgetp(L, LUA_REGISTRYINDEX, moonbr_luakey_connect_func(pool)); - if (listener->proto == MOONBR_PROTO_INTERVAL) { + if (!fd) { lua_newtable(L); lua_pushstring(L, listener->proto_specific.interval.name ? @@ -1143,16 +852,11 @@ ); lua_setfield(L, -2, "interval"); } else { - lua_newtable(L); - lua_pushvalue(L, -4); - lua_setfield(L, -2, "input"); - lua_pushvalue(L, -3); - lua_setfield(L, -2, "output"); - luaL_setfuncs(L, moonbr_child_lua_socket_functions, 0); + lua_pushvalue(L, -2); if (listener->proto == MOONBR_PROTO_TCP6) { struct sockaddr_in6 addr; socklen_t addr_len = sizeof(struct sockaddr_in6); - if (getsockname(moonbr_child_peersocket_fd, (struct sockaddr *)&addr, &addr_len)) { + if (getsockname(fd, (struct sockaddr *)&addr, &addr_len)) { moonbr_child_log_errno("Could not get local IP address/port"); } else { lua_pushlstring(L, (char *)addr.sin6_addr.s6_addr, 16); @@ -1160,7 +864,7 @@ lua_pushinteger(L, ntohs(addr.sin6_port)); lua_setfield(L, -2, "local_tcpport"); } - if (getpeername(moonbr_child_peersocket_fd, (struct sockaddr *)&addr, &addr_len)) { + if (getpeername(fd, (struct sockaddr *)&addr, &addr_len)) { moonbr_child_log_errno("Could not get remote IP address/port"); } else { lua_pushlstring(L, (char *)addr.sin6_addr.s6_addr, 16); @@ -1171,7 +875,7 @@ } else if (listener->proto == MOONBR_PROTO_TCP4) { struct sockaddr_in addr; socklen_t addr_len = sizeof(struct sockaddr_in); - if (getsockname(moonbr_child_peersocket_fd, (struct sockaddr *)&addr, &addr_len)) { + if (getsockname(fd, (struct sockaddr *)&addr, &addr_len)) { moonbr_child_log_errno("Could not get local IP address/port"); } else { lua_pushlstring(L, (char *)&addr.sin_addr.s_addr, 4); @@ -1179,7 +883,7 @@ lua_pushinteger(L, ntohs(addr.sin_port)); lua_setfield(L, -2, "local_tcpport"); } - if (getpeername(moonbr_child_peersocket_fd, (struct sockaddr *)&addr, &addr_len)) { + if (getpeername(fd, (struct sockaddr *)&addr, &addr_len)) { moonbr_child_log_errno("Could not get remote IP address/port"); } else { lua_pushlstring(L, (char *)&addr.sin_addr.s_addr, 4); @@ -1193,9 +897,7 @@ fprintf(stderr, "Error in \"connect\" function: %s\n", lua_tostring(L, -1)); exit(1); } - if (moonbr_child_close_peersocket(0)) { - moonbr_child_log_errno("Could not close socket connection with peer"); - } + if (fd) moonbr_io_closehandle(L, -2, -1); /* attemt clean close */ if (lua_type(L, -1) != LUA_TBOOLEAN || !lua_toboolean(L, -1)) break; #ifdef MOONBR_LUA_PANIC_BUG_WORKAROUND lua_settop(L, 2); diff -r 1d91c6eedf18 -r fca51922b708 moonbridge_http.lua --- a/moonbridge_http.lua Tue Apr 07 01:17:55 2015 +0200 +++ b/moonbridge_http.lua Tue Apr 07 02:15:17 2015 +0200 @@ -238,7 +238,7 @@ io.stderr:write(errmsg, "\n") if not socket_closed then socket_closed = true - socket:cancel() + socket:reset() end error("Could not send data to client: " .. errmsg) end @@ -270,7 +270,7 @@ else if not socket_closed then socket_closed = true - socket:cancel() + socket:reset() end end if throw_error then @@ -309,10 +309,10 @@ local function finish_response() if connection_close_responded then -- close output stream: - assert_output(socket.output:close()) + assert_output(socket:finish()) -- wait for EOF of peer to avoid immediate TCP RST condition: timeout(2, function() - while socket.input:read(input_chunk_size) do end + socket:drain() end) -- fully close socket: socket_closed = true -- avoid double close on error @@ -832,7 +832,7 @@ end if request.headers_flags["Transfer-Encoding"]["chunked"] then while true do - local line = socket:readuntil("\n", 32 + remaining_body_size_limit) + local line = socket:read(32 + remaining_body_size_limit, "\n") if not line then request_error(true, "400 Bad Request", "Unexpected EOF while reading next chunk of request body") end @@ -853,13 +853,13 @@ end if len == 0 then break end read_body_bytes(len, callback) - local term = socket:readuntil("\n", 2) + local term = socket:read(2, "\n") if term ~= "\r\n" and term ~= "\n" then request_error(true, "400 Bad Request", "Encoding error while reading chunk of request body") end end while true do - local line = socket:readuntil("\n", 2 + remaining_body_size_limit) + local line = socket:read(2 + remaining_body_size_limit, "\n") if line == "\r\n" or line == "\n" then break end remaining_body_size_limit = remaining_body_size_limit - #line if remaining_body_size_limit < 0 then @@ -894,7 +894,7 @@ end }) -- read and parse request line: - local line = socket:readuntil("\n", remaining_header_size_limit) + local line = socket:read(remaining_header_size_limit, "\n") if not line then return survive end remaining_header_size_limit = remaining_header_size_limit - #line if remaining_header_size_limit == 0 then @@ -910,7 +910,7 @@ end -- read and parse headers: while true do - local line = socket:readuntil("\n", remaining_header_size_limit); + local line = socket:read(remaining_header_size_limit, "\n"); remaining_header_size_limit = remaining_header_size_limit - #line if not line then return request_error(false, "400 Bad Request") diff -r 1d91c6eedf18 -r fca51922b708 moonbridge_io.c --- a/moonbridge_io.c Tue Apr 07 01:17:55 2015 +0200 +++ b/moonbridge_io.c Tue Apr 07 02:15:17 2015 +0200 @@ -26,6 +26,8 @@ typedef struct { int fd; int issocket; + int canshutdown; + int shutwr; int nonblocking; int readerr; int readbufcnt; @@ -167,6 +169,7 @@ ssize_t result; handle = luaL_checkudata(L, 1, MOONBR_IO_HANDLE_MT_REGKEY); if (handle->fd < 0) luaL_error(L, "Attempt to write to a closed I/O handle"); + if (handle->shutwr) luaL_error(L, "Attempt to write to a finished I/O handle"); if (handle->writeerr) { lua_pushnil(L); lua_pushliteral(L, "Previous write error"); @@ -277,6 +280,30 @@ return moonbr_io_write_impl(L, 1, 1); } +static int moonbr_io_finish(lua_State *L) { + moonbr_io_handle_t *handle; + handle = luaL_checkudata(L, 1, MOONBR_IO_HANDLE_MT_REGKEY); + if (handle->fd < 0) luaL_error(L, "Attempt to finish a closed I/O handle"); + if (handle->shutwr) luaL_error(L, "Attempt to finish a finished I/O handle"); + handle->shutwr = 1; + if (handle->canshutdown) { + if (handle->writeleft) { + lua_pushcfunction(L, moonbr_io_flush); + lua_pushvalue(L, 1); + lua_call(L, 1, 2); + if (!lua_toboolean(L, -2)) return 2; + } + if (shutdown(handle->fd, SHUT_WR)) { + moonbr_io_errmsg(); + lua_pushnil(L); + lua_pushstring(L, errmsg); + return 2; + } + } + lua_pushboolean(L, 1); + return 1; +} + static int moonbr_io_close(lua_State *L) { moonbr_io_handle_t *handle; int timeout; @@ -319,11 +346,36 @@ return lua_gettop(L); } -void moonbr_io_pushhandle(lua_State *L, int fd, int issocket) { +static int moonbr_io_gc(lua_State *L) { + moonbr_io_handle_t *handle; + handle = luaL_checkudata(L, 1, MOONBR_IO_HANDLE_MT_REGKEY); + if (handle->fd >= 0) { + lua_pushcfunction(L, moonbr_io_close); + lua_pushvalue(L, 1); + lua_pushinteger(L, 0); + lua_call(L, 2, 0); + } + return 0; +} + +void moonbr_io_closehandle(lua_State *L, int idx, int timeout) { + moonbr_io_handle_t *handle; + handle = luaL_checkudata(L, idx, MOONBR_IO_HANDLE_MT_REGKEY); + if (handle->fd >= 0) { + lua_pushcfunction(L, moonbr_io_close); + lua_pushvalue(L, idx); + lua_pushinteger(L, timeout); + lua_call(L, 2, 0); + } +} + +void moonbr_io_pushhandle(lua_State *L, int fd, int issocket, int canshutdown) { moonbr_io_handle_t *handle; handle = lua_newuserdata(L, sizeof(moonbr_io_handle_t)); handle->fd = fd; handle->issocket = issocket; + handle->canshutdown = canshutdown; + handle->shutwr = 0; handle->nonblocking = -1; handle->readerr = 0; handle->readbufcnt = 0; @@ -374,6 +426,7 @@ {"write_nb", moonbr_io_write_nb}, {"flush", moonbr_io_flush}, {"flush_nb", moonbr_io_flush_nb}, + {"finish", moonbr_io_finish}, {"close", moonbr_io_close}, {"reset", moonbr_io_reset}, {NULL, NULL} @@ -382,7 +435,7 @@ static const struct luaL_Reg moonbr_io_handle_metamethods[] = { {"__index", moonbr_io_handleindex}, {"__newindex", moonbr_io_handlenewindex}, - {"__gc", moonbr_io_reset}, + {"__gc", moonbr_io_gc}, {NULL, NULL} }; diff -r 1d91c6eedf18 -r fca51922b708 moonbridge_io.h --- a/moonbridge_io.h Tue Apr 07 01:17:55 2015 +0200 +++ b/moonbridge_io.h Tue Apr 07 02:15:17 2015 +0200 @@ -1,4 +1,5 @@ -void moonbr_io_pushhandle(lua_State *L, int fd, int issocket); +void moonbr_io_pushhandle(lua_State *L, int fd, int issocket, int canshutdown); +void moonbr_io_closehandle(lua_State *L, int idx, int timeout); int luaopen_moonbridge_io(lua_State *L);