# HG changeset patch # User jbe # Date 1420396228 -3600 # Node ID f6d3b3f70dab26151071efa7fdd8d13012de2f11 Initial commit diff -r 000000000000 -r f6d3b3f70dab LICENSE --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/LICENSE Sun Jan 04 19:30:28 2015 +0100 @@ -0,0 +1,19 @@ +Copyright (c) 2015 Public Software Group e. V., Berlin, Germany + +Permission is hereby granted, free of charge, to any person obtaining a +copy of this software and associated documentation files (the "Software"), +to deal in the Software without restriction, including without limitation +the rights to use, copy, modify, merge, publish, distribute, sublicense, +and/or sell copies of the Software, and to permit persons to whom the +Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff -r 000000000000 -r f6d3b3f70dab Makefile --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Makefile Sun Jan 04 19:30:28 2015 +0100 @@ -0,0 +1,34 @@ +# BSD Makefile +# On GNU systems, use bmake. + +.if $(:!uname!) == "FreeBSD" +# Default configuration for FreeBSD +LUA_INCLUDE ?= /usr/local/include/lua52 +LUA_LIBDIR ?= /usr/local/lib +LUA_LIBRARY ?= lua-5.2 +UTIL_FLAGS ?= -lutil + +.elif $(:!uname!) == "Linux" +# Default configuration for Linux +LUA_INCLUDE ?= /usr/include +LUA_LIBDIR ?= /usr/lib +LUA_LIBRARY ?= lua +UTIL_FLAGS ?= -ldl -lbsd + +.else +# Default configuration for other systems +LUA_INCLUDE ?= /usr/include +LUA_LIBDIR ?= /usr/lib +LUA_LIBRARY ?= lua +UTIL_FLAGS ?= -lutil + +.endif + +all:: moonbridge + +moonbridge: moonbridge.c + cc -Wall -O2 -Wl,-E -I $(LUA_INCLUDE) -L $(LUA_LIBDIR) -o moonbridge moonbridge.c -lm -l$(LUA_LIBRARY) $(UTIL_FLAGS) + +clean:: + rm -f moonbridge + diff -r 000000000000 -r f6d3b3f70dab README --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/README Sun Jan 04 19:30:28 2015 +0100 @@ -0,0 +1,11 @@ +Quickstart guide: + +$ make # hint: use bmake on GNU systems +$ ./moonbridge example_application.lua + +Then connect to http://localhost:8080/ + +To learn more, check example_application.lua and reference.txt files. If you +experence any touble during compilation, please edit the Makefile to match +your system. + diff -r 000000000000 -r f6d3b3f70dab example_application.lua --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/example_application.lua Sun Jan 04 19:30:28 2015 +0100 @@ -0,0 +1,150 @@ +-- Moonbridge example application +-- invoke with ./moonbridge example_application.lua + +local http = require "http" + +local documents = {"example_webpage.html", "example_webpage.css"} + +listen{ + -- listen to a tcp version 4 socket + { proto = "tcp4", port = 8080, localhost = true }, + + -- listen to a tcp version 6 socket + { proto = "tcp6", port = 8080, localhost = true }, + + -- listen to a unix domain socket + --{ proto = "local", path = 'socket' }, + + -- execute the listener regularly (without incoming connection) + --{ proto = "interval", name = "myint", delay = 10, strict = false }, + + -- desired number of spare (idle) processes + pre_fork = 1, -- number of forks + + -- minimum number of processes + min_fork = 2, -- number of forks + + -- maximum number of processes (hard limit) + max_fork = 16, -- number of forks + + -- delay between creation of spare processes + fork_delay = 1, -- seconds + + -- delay before retry of failed process creation + fork_error_delay = 2, -- seconds + + -- delay between destruction of excessive spare processes + exit_delay = 60, -- seconds + + -- idle time after a fork gets terminated + idle_timeout = 0, -- seconds (0 for no timeout) + + -- maximum memory consumption before process gets terminated + memory_limit = 1024*1024, -- bytes + + -- preparation of process (executed before fork) + prepare = function() + for i, document in ipairs(documents) do + local file = assert(io.open(document)) + documents[document] = file:read("*a") + file:close() + end + end, + + -- connection handler + connect = http.generate_handler( + { + static_headers = {"Server: Moonbridge Example Server"}, + request_body_size_limit = 16*1024*1024*1024 -- allow big file uploads + }, + function(request) + + if request.method == "GET" or request.method == "HEAD" then + + if request.path == "/" then + request:send_status("303 See Other") + request:send_header("Location", "http://" .. request.headers_value.host .. "/example_webpage.html") + + else + local document_name = string.match(request.path, "^/(.*)$") + local document_extension = string.match(document_name, "%.([^.])$") + local document = documents[string.match(request.path, "^/(.*)$")] + if document then + request:send_status("200 OK") + + if document_extension == "html" then + request:send_header("Content-Type", "text/html; charset=UTF-8") + elseif document_extension == "css" then + request:send_header("Content-Type", "text/css; charset=UTF-8") + end + request:send_data(document) + else + request:send_status("404 Not Found") + request:send_header("Content-Type", "text/html; chatset=UTF-8") + request:send_data("404 Not Found

404 Not Found

") + end + + end + + elseif request.method == "POST" then + + if request.path == "/post_example" then + local files = {} + do + local file + request:stream_post_param("files", function(chunk, field_name, meta) + if meta then + file = { + file_name = meta.file_name, + content_type = meta.content_type, + length = 0 + } + end + if chunk then + file.length = file.length + #chunk + else + files[#files+1] = file + end + end) + end + + request:send_status("200 OK") + request:send_header("Content-Type", "text/html; chatset=UTF-8") + request:send_data("\n\n") + request:send_data('\n') + request:send_data("Moonbridge Network Server for Lua Applications – Example Application\n") + request:send_data("\n\n") + request:send_data("

Moonbridge Network Server for Lua – Example Application

\n") + request:send_data("

POST request successful

\n") + request:send_data('\n\n\n') + for i, file in ipairs(files) do + request:send_data("") + request:send_data("") + request:send_data("") + request:send_data('") + request:send_data("\n") + end + request:send_data("\n
File nameContent typeBytes received
", http.encode_html(file.file_name or "(unknown)"), "", http.encode_html(file.content_type or "(unknown)"), "', http.encode_html(tostring(file.length)), "
\n") + request:send_data("

Submitted comment: ", http.encode_html(request.post_params.comment), "

\n") + request:send_data("\n\n") + + else + request:send_status("404 Not Found") + request:send_data("404 Not Found

404 Not Found

") + + end + + else + request:send_status("405 Method not allowed") + + end + + -- returning false causes termination of current process (and re-forking) + return true + end), + + -- executed on process termination + finish = function() + end +} + diff -r 000000000000 -r f6d3b3f70dab example_webpage.css --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/example_webpage.css Sun Jan 04 19:30:28 2015 +0100 @@ -0,0 +1,31 @@ +body { + background-color: #bfc; + font-family: sans-serif; +} + +table { + border-spacing: 4px; +} + +table th { + text-align: left; +} + +table th, table td { + padding: 1ex 0.5em; + background: #f0fff4; +} + +table .numeric { + text-align: right; +} + +input[type=text] { + background: #f0fff4; + border: none; +} + +input { + padding: 0.5ex 0.5em; +} + diff -r 000000000000 -r f6d3b3f70dab example_webpage.html --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/example_webpage.html Sun Jan 04 19:30:28 2015 +0100 @@ -0,0 +1,16 @@ + + + + + Moonbridge Network Server for Lua Applications – Example Application + + +

Moonbridge Network Server for Lua – Example Application

+

Test POST request with file upload

+
+ Files: + Comment: + +
+ + diff -r 000000000000 -r f6d3b3f70dab http.lua --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/http.lua Sun Jan 04 19:30:28 2015 +0100 @@ -0,0 +1,883 @@ +#!/usr/bin/env lua + +-- module preamble +local _G, _M = _ENV, {} +_ENV = setmetatable({}, { + __index = function(self, key) + local value = _M[key]; if value ~= nil then return value end + return _G[key] + end, + __newindex = function(self, key, value) _M[key] = value end +}) + +-- function that encodes certain HTML entities: +-- (not used by the library itself) +function encode_html(text) + return ( + string.gsub( + text, '[<>&"]', + function(char) + if char == '<' then + return "<" + elseif char == '>' then + return ">" + elseif char == '&' then + return "&" + elseif char == '"' then + return """ + end + end + ) + ) + +end + +-- function that encodes special characters for URIs: +-- (not used by the library itself) +function encode_uri(text) + return ( + string.gsub(text, "[^0-9A-Za-z_%.~-]", + function (char) + return string.format("%%%02x", string.byte(char)) + end + ) + ) +end + +-- function undoing URL encoding: +do + local b0 = string.byte("0") + local b9 = string.byte("9") + local bA = string.byte("A") + local bF = string.byte("F") + local ba = string.byte("a") + local bf = string.byte("f") + function decode_uri(str) + return ( + string.gsub( + string.gsub(str, "%+", " "), + "%%([0-9A-Fa-f][0-9A-Fa-f])", + function(hex) + local n1, n2 = string.byte(hex, 1, 2) + if n1 >= b0 and n1 <= b9 then n1 = n1 - b0 + elseif n1 >= bA and n1 <= bF then n1 = n1 - bA + 10 + elseif n1 >= ba and n1 <= bf then n1 = n1 - ba + 10 + else error("Assertion failed") end + if n2 >= b0 and n2 <= b9 then n2 = n2 - b0 + elseif n2 >= bA and n2 <= bF then n2 = n2 - bA + 10 + elseif n2 >= ba and n2 <= bf then n2 = n2 - ba + 10 + else error("Assertion failed") end + return string.char(n1 * 16 + n2) + end + ) + ) + end +end + +-- status codes that carry no response body (in addition to 1xx): +-- (set to "zero_content_length" if Content-Length header is required) +status_without_response_body = { + ["101"] = true, + ["204"] = true, + ["205"] = "zero_content_length", + ["304"] = true +} + +-- handling of GET/POST param tables: +local new_params_list -- defined later +do + local params_list_mapping = setmetatable({}, {__mode="k"}) + local params_list_metatable = { + __index = function(self, key) + local tbl = {} + self[key] = tbl + return tbl + end + } + local params_metatable = { + __index = function(self, key) + local value = params_list_mapping[self][key][1] + self[key] = value + return value + end + } + -- returns a table to store key value-list pairs (i.e. multiple values), + -- and a second table automatically mapping keys to the first value + -- using the key value-list pairs in the first table: + new_params_list = function() + local params_list = setmetatable({}, params_list_metatable) + local params = setmetatable({}, params_metatable) + params_list_mapping[params] = params_list + return params_list, params + end +end +-- parses URL encoded form data and stores it in +-- a key value-list pairs structure that has to be +-- previously obtained by calling by new_params_list(): +local function read_urlencoded_form(tbl, data) + for rawkey, rawvalue in string.gmatch(data, "([^=&]*)=([^=&]*)") do + local subtbl = tbl[decode_uri(rawkey)] + subtbl[#subtbl+1] = decode_uri(rawvalue) + end +end + +-- function creating a HTTP handler: +function generate_handler(handler, options) + -- swap arguments if necessary (for convenience): + if type(handler) ~= "function" and type(options) == "function" then + handler, options = options, handler + end + -- process options: + options = options or {} + local preamble = "" -- preamble sent with every(!) HTTP response + do + -- named arg "static_headers" is used to create the preamble: + local s = options.static_headers + local t = {} + if s then + if type(s) == "string" then + for line in string.gmatch(s, "[^\r\n]+") do + t[#t+1] = line + end + else + for i, kv in ipairs(options.static_headers) do + if type(kv) == "string" then + t[#t+1] = kv + else + t[#t+1] = kv[1] .. ": " .. kv[2] + end + end + end + end + t[#t+1] = "" + preamble = table.concat(t, "\r\n") + end + -- return connect handler: + return function(socket) + local survive = true -- set to false if process shall be terminated later + while true do + -- desired chunk sizes: + local input_chunk_size = options.maximum_input_chunk_size or options.chunk_size or 16384 + local output_chunk_size = options.minimum_output_chunk_size or options.chunk_size or 1024 + -- process named arguments "request_header_size_limit" and "request_body_size_limit": + local remaining_header_size_limit = options.request_header_size_limit or 1024*1024 + local remaining_body_size_limit = options.request_body_size_limit or 64*1024*1024 + -- state variables for request handling: + local output_state = "no_status_sent" -- one of: + -- "no_status_sent" (initial state) + -- "info_status_sent" (1xx status code has been sent) + -- "bodyless_status_sent" (204/304 status code has been sent) + -- "status_sent" (regular status code has been sent) + -- "headers_sent" (headers have been terminated) + -- "finished" (request has been answered completely) + local content_length -- value of Content-Length header sent + local bytes_sent = 0 -- number of bytes sent if Content-Length is set + local chunk_parts = {} -- list of chunks to send + local chunk_bytes = 0 -- sum of lengths of chunks to send + local connection_close_requested = false + local connection_close_responded = false + local headers_value_nil = {} -- header values that are nil + local request_body_content_length -- Content-Length of request body + local input_state = "pending" -- one of: + -- "pending" (request body has not been processed yet) + -- "deferred" (request body processing is deferred) + -- "reading" (request body is currently being read) + -- "finished" (request body has been read) + local streamed_post_params = {} -- mapping from POST field name to stream function + local streamed_post_param_patterns = {} -- list of POST field pattern and stream function pairs + -- object passed to handler (with methods, GET/POST params, etc.): + local request + -- reads a number of bytes from the socket, + -- optionally feeding these bytes chunk-wise + -- into a callback function: + local function read_body_bytes(remaining, callback) + while remaining > 0 do + local limit + if remaining > input_chunk_size then + limit = input_chunk_size + else + limit = remaining + end + local chunk = socket:read(limit) + if not chunk or #chunk ~= limit then + error("Unexpected EOF while reading chunk of request body") + end + remaining = remaining - limit + if callback then + callback(chunk) + end + end + end + -- flushes or closes the socket (depending on + -- whether "Connection: close" header was sent): + local function finish_response() + if connection_close_responded then + -- close output stream: + socket.output:close() + -- wait for EOF of peer to avoid immediate TCP RST condition: + timeout(2, function() + while socket.input:read(input_chunk_size) do end + end) + -- fully close socket: + socket:close() + else + socket:flush() + request:stream_request_body() + end + end + -- writes out buffered chunks (without flushing the socket): + local function send_chunk() + if chunk_bytes > 0 then + socket:write(string.format("%x\r\n", chunk_bytes)) + for i = 1, #chunk_parts do + socket:write(chunk_parts[i]) + end + chunk_parts = {} + chunk_bytes = 0 + socket:write("\r\n") + end + end + -- terminate header section in response, optionally flushing: + -- (may be called multiple times unless response is finished) + local function finish_headers(flush) + if output_state == "no_status_sent" then + error("HTTP status has not been sent yet") + elseif output_state == "finished" then + error("Response has already been finished") + elseif output_state == "info_status_sent" then + socket:write("\r\n") + socket:flush() + output_state = "no_status_sent" + elseif output_state == "bodyless_status_sent" then + if connection_close_requested and not connection_close_responded then + request:send_header("Connection", "close") + end + socket:write("\r\n") + finish_response() + output_state = "finished" + elseif output_state == "status_sent" then + if not content_length then + socket:write("Transfer-Encoding: chunked\r\n") + end + if connection_close_requested and not connection_close_responded then + request:send_header("Connection", "close") + end + socket:write("\r\n") + if request.method == "HEAD" then + finish_response() + elseif flush then + socket:flush() + end + output_state = "headers_sent" + elseif output_state ~= "headers_sent" then + error("Unexpected internal status in HTTP engine") + end + end + -- create request object and set several functions and values: + request = { + -- allow raw socket access: + socket = socket, + -- parsed cookies: + cookies = {}, + -- send a HTTP response status (e.g. "200 OK"): + send_status = function(self, value) + if input_state == "pending" then + request:process_request_body() + end + if output_state == "info_status_sent" then + socket:write("\r\n") + socket:flush() + elseif output_state ~= "no_status_sent" then + error("HTTP status has already been sent") + end + local status1 = string.sub(value, 1, 1) + local status3 = string.sub(value, 1, 3) + socket:write("HTTP/1.1 ", value, "\r\n", preamble) + local without_response_body = status_without_response_body[status3] + if without_response_body then + output_state = "bodyless_status_sent" + if without_response_body == "zero_content_length" then + request:send_header("Content-Length", 0) + end + elseif status1 == "1" then + output_state = "info_status_sent" + else + output_state = "status_sent" + end + end, + -- send a HTTP response header + -- (key and value as separate args): + send_header = function(self, key, value) + if output_state == "no_status_sent" then + error("HTTP status has not been sent yet") + elseif + output_state ~= "info_status_sent" and + output_state ~= "bodyless_status_sent" and + output_state ~= "status_sent" + then + error("All HTTP headers have already been sent") + end + local key_lower = string.lower(key) + if key_lower == "content-length" then + if output_state == "info_status_sent" then + error("Cannot set Content-Length for informational status response") + end + local new_content_length = assert(tonumber(value), "Invalid content-length") + if content_length == nil then + content_length = new_content_length + elseif content_length == new_content_length then + return + else + error("Content-Length has been set multiple times with different values") + end + elseif key_lower == "connection" then + for entry in string.gmatch(string.lower(value), "[^,]+") do + if string.match(entry, "^[ \t]*close[ \t]*$") then + if output_state == "info_status_sent" then + error("Cannot set \"Connection: close\" for informational status response") + end + if connection_close_responded then + return + else + connection_close_responded = true + break + end + end + end + end + socket:write(key, ": ", value, "\r\n") + end, + -- method to finish and flush headers: + finish_headers = function() + finish_headers(true) + end, + -- send data for response body: + send_data = function(self, ...) + if output_state == "info_status_sent" then + error("No (non-informational) HTTP status has been sent yet") + elseif output_state == "bodyless_status_sent" then + error("Cannot send response data for body-less status message") + end + finish_headers(false) + if output_state ~= "headers_sent" then + error("Unexpected internal status in HTTP engine") + end + if request.method == "HEAD" then + return + end + for i = 1, select("#", ...) do + local str = tostring(select(i, ...)) + if #str > 0 then + if content_length then + local bytes_to_send = #str + if bytes_sent + bytes_to_send > content_length then + socket:write(string.sub(str, 1, content_length - bytes_sent)) + bytes_sent = content_length + error("Content length exceeded") + else + socket:write(str) + bytes_sent = bytes_sent + bytes_to_send + end + else + chunk_bytes = chunk_bytes + #str + chunk_parts[#chunk_parts+1] = str + end + end + end + if chunk_bytes >= output_chunk_size then + send_chunk() + end + end, + -- flush output buffer: + flush = function(self) + send_chunk() + socket:flush() + end, + -- finish response: + finish = function(self) + if output_state == "finished" then + return + elseif output_state == "info_status_sent" then + error("Informational HTTP response can be finished with :finish_headers() method") + end + finish_headers(false) + if output_state == "headers_sent" then + if request.method ~= "HEAD" then + if content_length then + if bytes_sent ~= content_length then + error("Content length not used") + end + else + send_chunk() + socket:write("0\r\n\r\n") + end + finish_response() + end + output_state = "finished" + elseif output_state ~= finished then + error("Unexpected internal status in HTTP engine") + end + end, + -- table mapping header field names to value-lists + -- (raw access): + headers = setmetatable({}, { + __index = function(self, key) + local lowerkey = string.lower(key) + if lowerkey == key then + return + end + local result = rawget(self, lowerkey) + if result == nil then + result = {} + end + self[lowerkey] = result + self[key] = result + return result + end + }), + -- table mapping header field names to value-lists + -- (for headers with comma separated values): + headers_csv_table = setmetatable({}, { + __index = function(self, key) + local result = {} + for i, line in ipairs(request.headers[key]) do + for entry in string.gmatch(line, "[^,]+") do + local value = string.match(entry, "^[ \t]*(..-)[ \t]*$") + if value then + result[#result+1] = value + end + end + end + self[key] = result + return result + end + }), + -- table mapping header field names to a comma separated string + -- (for headers with comma separated values): + headers_csv_string = setmetatable({}, { + __index = function(self, key) + local result = {} + for i, line in ipairs(request.headers[key]) do + result[#result+1] = line + end + result = string.concat(result, ", ") + self[key] = result + return result + end + }), + -- table mapping header field names to a single string value + -- (or false if header has been sent multiple times): + headers_value = setmetatable({}, { + __index = function(self, key) + if headers_value_nil[key] then + return nil + end + local result = nil + local values = request.headers_csv_table[key] + if #values == 0 then + headers_value_nil[key] = true + elseif #values == 1 then + result = values[1] + else + result = false + end + self[key] = result + return result + end + }), + -- table mapping header field names to a flag table, + -- indicating if the comma separated value contains certain entries: + headers_flags = setmetatable({}, { + __index = function(self, key) + local result = setmetatable({}, { + __index = function(self, key) + local lowerkey = string.lower(key) + local result = rawget(self, lowerkey) or false + self[lowerkey] = result + self[key] = result + return result + end + }) + for i, value in ipairs(request.headers_csv_table[key]) do + result[string.lower(value)] = true + end + self[key] = result + return result + end + }), + -- register POST param stream handler for a single field name: + stream_post_param = function(self, field_name, callback) + if input_state == "inprogress" or input_state == "finished" then + error("Cannot register POST param streaming function if request body is already processed") + end + streamed_post_params[field_name] = callback + end, + -- register POST param stream handler for a field name pattern: + stream_post_params = function(self, pattern, callback) + if input_state == "inprogress" or input_state == "finished" then + error("Cannot register POST param streaming function if request body is already processed") + end + streamed_post_param_patterns[#streamed_post_param_patterns+1] = {pattern, callback} + end, + -- disables automatic request body processing on write + -- (use with caution): + defer_reading = function(self) + if input_state == "pending" then + input_state = "deferred" + end + end, + -- processes the request body and sets the request.post_params, + -- request.post_params_list, request.meta_post_params, and + -- request.meta_post_params_list values (can be called manually or + -- automatically if post_params are accessed or data is written out) + process_request_body = function(self) + if input_state == "finished" then + return + end + local post_params_list, post_params = new_params_list() + local content_type = request.headers_value["Content-Type"] + if content_type then + if + content_type == "application/x-www-form-urlencoded" or + string.match(content_type, "^application/x%-www%-form%-urlencoded *;") + then + read_urlencoded_form(post_params_list, request.body) + else + local boundary = string.match( + content_type, + '^multipart/form%-data[ \t]*[;,][ \t]*boundary="([^"]+)"$' + ) or string.match( + content_type, + '^multipart/form%-data[ \t]*[;,][ \t]*boundary=([^"; \t]+)$' + ) + if boundary then + local post_metadata_list, post_metadata = new_params_list() + boundary = "--" .. boundary + local headerdata = "" + local streamer + local field_name + local metadata = {} + local value_parts + local function default_streamer(chunk) + value_parts[#value_parts+1] = chunk + end + local function stream_part_finish() + if streamer == default_streamer then + local value = table.concat(value_parts) + value_parts = nil + if field_name then + local values = post_params_list[field_name] + values[#values+1] = value + local metadata_entries = post_metadata_list[field_name] + metadata_entries[#metadata_entries+1] = metadata + end + else + streamer() + end + headerdata = "" + streamer = nil + field_name = nil + metadata = {} + end + local function stream_part_chunk(chunk) + if streamer then + streamer(chunk) + else + headerdata = headerdata .. chunk + while true do + local line, remaining = string.match(headerdata, "^(.-)\r?\n(.*)$") + if not line then + break + end + if line == "" then + streamer = streamed_post_params[field_name] + if not streamer then + for i, rule in ipairs(streamed_post_param_patterns) do + if string.match(field_name, rule[1]) then + streamer = rule[2] + break + end + end + end + if not streamer then + value_parts = {} + streamer = default_streamer + end + streamer(remaining, field_name, metadata) + return + end + headerdata = remaining + local header_key, header_value = string.match(line, "^([^:]*):[ \t]*(.-)[ \t]*$") + if not header_key then + error("Invalid header in multipart/form-data part") + end + header_key = string.lower(header_key) + if header_key == "content-disposition" then + local escaped_header_value = string.gsub(header_value, '"[^"]*"', function(str) + return string.gsub(str, "=", "==") + end) + field_name = string.match(escaped_header_value, ';[ \t]*name="([^"]*)"') + if field_name then + field_name = string.gsub(field_name, "==", "=") + else + field_name = string.match(header_value, ';[ \t]*name=([^"; \t]+)') + end + metadata.file_name = string.match(escaped_header_value, ';[ \t]*filename="([^"]*)"') + if metadata.file_name then + metadata.file_name = string.gsub(metadata.file_name, "==", "=") + else + string.match(header_value, ';[ \t]*filename=([^"; \t]+)') + end + elseif header_key == "content-type" then + metadata.content_type = header_value + elseif header_key == "content-transfer-encoding" then + error("Content-transfer-encoding not supported by multipart/form-data parser") + end + end + end + end + local skippart = true -- ignore data until first boundary + local afterbound = false -- interpret 2 bytes after boundary ("\r\n" or "--") + local terminated = false -- final boundary read + local bigchunk = "" + request:stream_request_body(function(chunk) + if terminated then + return + end + bigchunk = bigchunk .. chunk + while true do + if afterbound then + if #bigchunk <= 2 then + return + end + local terminator = string.sub(bigchunk, 1, 2) + if terminator == "\r\n" then + afterbound = false + bigchunk = string.sub(bigchunk, 3) + elseif terminator == "--" then + terminated = true + bigchunk = nil + return + else + error("Error while parsing multipart body (expected CRLF or double minus)") + end + end + local pos1, pos2 = string.find(bigchunk, boundary, 1, true) + if not pos1 then + if not skippart then + local safe = #bigchunk-#boundary + if safe > 0 then + stream_part_chunk(string.sub(bigchunk, 1, safe)) + bigchunk = string.sub(bigchunk, safe+1) + end + end + return + end + if not skippart then + stream_part_chunk(string.sub(bigchunk, 1, pos1 - 1)) + stream_part_finish() + else + boundary = "\r\n" .. boundary + skippart = false + end + bigchunk = string.sub(bigchunk, pos2 + 1) + afterbound = true + end + end) + if not terminated then + error("Premature end of multipart/form-data request body") + end + request.post_metadata_list, request.post_metadata = post_metadata_list, post_metadata + else + error("Unknown Content-Type of request body") + end + end + end + request.post_params_list, request.post_params = post_params_list, post_params + end, + -- stream request body to an (optional) callback function + -- without processing it otherwise: + stream_request_body = function(self, callback) + if input_state ~= "pending" and input_state ~= "deferred" then + if callback then + if input_state == "inprogress" then + error("Request body is already being processed") + else + error("Request body has already been processed") + end + end + return + end + input_state = "inprogress" + if request.headers_flags["Expect"]["100-continue"] then + request:send_status("100 Continue") + request:finish_headers() + end + if request.headers_flags["Transfer-Encoding"]["chunked"] then + while true do + local line = socket:readuntil("\n", 32 + remaining_body_size_limit) + if not line then + error("Unexpected EOF while reading next chunk of request body") + end + local zeros, lenstr = string.match(line, "^(0*)([1-9A-Fa-f]+[0-9A-Fa-f]*)\r?\n$") + local chunkext + if lenstr then + chunkext = "" + else + zeros, lenstr, chunkext = string.match(line, "^(0*)([1-9A-Fa-f]+[0-9A-Fa-f]*)([ \t;].-)\r?\n$") + end + if not lenstr or #lenstr > 13 then + error("Encoding error or unexpected EOF while reading chunk of request body") + end + local len = tonumber("0x" .. lenstr) + remaining_body_size_limit = remaining_body_size_limit - (#zeros + #chunkext + len) + if remaining_body_size_limit < 0 then + error("Request body size limit exceeded") + end + if len == 0 then break end + read_body_bytes(len, callback) + local term = socket:readuntil("\n", 2) + if term ~= "\r\n" and term ~= "\n" then + error("Encoding error while reading chunk of request body") + end + end + while true do + local line = socket:readuntil("\n", 2 + remaining_body_size_limit) + if line == "\r\n" or line == "\n" then break end + remaining_body_size_limit = remaining_body_size_limit - #line + if remaining_body_size_limit < 0 then + error("Request body size limit exceeded while reading trailer section of chunked request body") + end + end + elseif request_body_content_length then + read_body_bytes(request_body_content_length, callback) + end + input_state = "finished" + end + } + -- initialize tables for GET params in request object: + request.get_params_list, request.get_params = new_params_list() + -- add meta table to request object to allow access to "body" and POST params: + setmetatable(request, { + __index = function(self, key) + if key == "body" then + local chunks = {} + request:stream_request_body(function(chunk) + chunks[#chunks+1] = chunk + end) + self.body = table.concat(chunks) + return self.body + elseif + key == "post_params_list" or key == "post_params" or + key == "post_metadata_list" or key == "post_metadata" + then + request:process_request_body() + return request[key] + end + end + }) + -- low level HTTP error response (for malformed requests, etc.): + local function error_response(status, text) + request:send_status(status) + request:send_header("Content-Type", "text/plain") + if not connection_close_responded then + request:send_header("Connection", "close") + end + request:send_data(status, "\n") + if text then + request:send_data("\n", text, "\n") + end + request:finish() + return survive + end + -- read and parse request line: + local line = socket:readuntil("\n", remaining_header_size_limit) + if not line then return survive end + remaining_header_size_limit = remaining_header_size_limit - #line + if remaining_header_size_limit == 0 then + return error_response("413 Request Entity Too Large", "Request line too long") + end + local proto + request.method, request.url, proto = + line:match("^([^ \t\r]+)[ \t]+([^ \t\r]+)[ \t]*([^ \t\r]*)[ \t]*\r?\n$") + if not request.method then + return error_response("400 Bad Request") + elseif proto ~= "HTTP/1.1" then + return error_response("505 HTTP Version Not Supported") + else + -- read and parse headers: + while true do + local line = socket:readuntil("\n", remaining_header_size_limit); + remaining_header_size_limit = remaining_header_size_limit - #line + if not line then + return error_response("400 Bad Request") + end + if line == "\r\n" or line == "\n" then + break + end + if remaining_header_size_limit == 0 then + return error_response("413 Request Entity Too Large", "Headers too long") + end + local key, value = string.match(line, "^([^ \t\r]+):[ \t]*(.-)[ \t]*\r?\n$") + if not key then + return error_response("400 Bad Request") + end + local values = request.headers[key] + values[#values+1] = value + end + -- process "Connection: close" header if existent: + connection_close_requested = request.headers_flags["Connection"]["close"] + -- process "Content-Length" header if existent: + do + local values = request.headers_csv_table["Content-Length"] + if #values > 0 then + request_body_content_length = tonumber(values[1]) + local proper_value = tostring(request_body_content_length) + for i, value in ipairs(values) do + value = string.match(value, "^0*(.*)") + if value ~= proper_value then + return error_response("400 Bad Request", "Content-Length header(s) invalid") + end + end + if request_body_content_length > remaining_body_size_limit then + return error_response("413 Request Entity Too Large", "Request body too big") + end + end + end + -- process "Transfer-Encoding" header if existent: + do + local flag = request.headers_flags["Transfer-Encoding"]["chunked"] + local list = request.headers_csv_table["Transfer-Encoding"] + if (flag and #list ~= 1) or (not flag and #list ~= 0) then + return error_response("400 Bad Request", "Unexpected Transfer-Encoding") + end + end + -- process "Expect" header if existent: + for i, value in ipairs(request.headers_csv_table["Expect"]) do + if string.lower(value) ~= "100-continue" then + return error_response("417 Expectation Failed", "Unexpected Expect header") + end + end + -- parse GET params: + request.path, request.query = string.match(request.url, "^([^?]*)%??(.*)$") + read_urlencoded_form(request.get_params_list, request.query) + -- parse cookies: + for i, line in ipairs(request.headers["Cookie"]) do + for rawkey, rawvalue in + string.gmatch(line, "([^=; ]*)=([^=; ]*)") + do + request.cookies[decode_uri(rawkey)] = decode_uri(rawvalue) + end + end + -- call underlying handler and remember boolean result: + if handler(request) ~= true then survive = false end + -- finish request (unless already done by underlying handler): + request:finish() + end + end + return survive + end +end + +return _M + diff -r 000000000000 -r f6d3b3f70dab moonbridge.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/moonbridge.c Sun Jan 04 19:30:28 2015 +0100 @@ -0,0 +1,2715 @@ + +/*** Compile-time configuration ***/ + +#define MOONBR_LUA_PANIC_BUG_WORKAROUND 1 + + +/*** C preprocessor macros for portability support ***/ + +#ifndef __has_include +#define __has_include(x) 0 +#endif + + +/*** Include directives for used system libraries ***/ + +#if defined(__linux__) +#define _GNU_SOURCE +#endif +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#if defined(__FreeBSD__) || __has_include() +#include +#endif +#if defined(__linux__) || __has_include() +#include +#endif +#if defined(__linux__) || __has_include() +#include +#endif + + +/*** Fallback definitions for missing constants on some platforms ***/ + +/* INFTIM is used as timeout parameter for poll() */ +#ifndef INFTIM +#define INFTIM -1 +#endif + + +/*** Include directives for Lua ***/ + +#include +#include +#include + + +/*** Constants ***/ + +/* Backlog option for listen() call */ +#define MOONBR_LISTEN_BACKLOG 1024 + +/* Maximum length of a timestamp used for strftime() */ +#define MOONBR_LOG_MAXTIMELEN 40 + +/* Maximum length of a log message */ +#define MOONBR_LOG_MAXMSGLEN 4095 + +/* Exitcodes passed to exit() call */ +#define MOONBR_EXITCODE_GRACEFUL 0 +#define MOONBR_EXITCODE_CMDLINEERROR 1 +#define MOONBR_EXITCODE_ALREADYRUNNING 2 +#define MOONBR_EXITCODE_STARTUPERROR 3 +#define MOONBR_EXITCODE_RUNTIMEERROR 4 + +/* Maximum length of a line sent to stderr by child processes */ +#define MOONBR_MAXERRORLINELEN 1024 + +/* Maximum length of an error string returned by strerror() */ +#define MOONBR_MAXSTRERRORLEN 80 + +/* Status bytes exchanged between master and child processes */ +#define MOONBR_SOCKETTYPE_INTERVAL 'I' +#define MOONBR_SOCKETTYPE_LOCAL 'L' +#define MOONBR_SOCKETTYPE_NETWORK 'N' +#define MOONBR_STATUS_IDLE '1' +#define MOONBR_COMMAND_TERMINATE '2' +#define MOONBR_STATUS_GOODBYE '3' + +/* Constant file descriptors */ +#define MOONBR_FD_STDERR 2 +#define MOONBR_FD_CONTROL 3 +#define MOONBR_FD_END 4 + +/* Return values of moonbr_try_destroy_worker() */ +#define MOONBR_DESTROY_NONE 0 +#define MOONBR_DESTROY_PREPARE 1 +#define MOONBR_DESTROY_IDLE_OR_ASSIGNED 2 + + +/*** Types ***/ + +/* Enum for 'moonbr_pstate' */ +#define MOONBR_PSTATE_STARTUP 0 +#define MOONBR_PSTATE_RUNNING 1 +#define MOONBR_PSTATE_FORKED 2 + +/* Enum for 'proto' field of struct moonbr_listener */ +#define MOONBR_PROTO_INTERVAL 1 +#define MOONBR_PROTO_LOCAL 2 +#define MOONBR_PROTO_TCP6 3 +#define MOONBR_PROTO_TCP4 4 + +/* Data structure for a pool's listener that can accept incoming connections */ +struct moonbr_listener { + struct moonbr_pool *pool; + struct moonbr_listener *prev_listener; /* previous idle or(!) connected listener */ + struct moonbr_listener *next_listener; /* next idle or(!) connected listener */ + int proto; + union { + struct { + char *name; /* name of interval passed to 'connect' function as 'interval' field in table */ + int strict; /* nonzero = runtime of 'connect' function does not delay interval */ + struct timeval delay; /* interval between invocations of 'connect' function */ + struct timeval wakeup; /* point in time of next invocation */ + } interval; + struct { + char *path; /* full path name (i.e. filename with path) of UNIX domain socket */ + } local; + struct { + int port; /* port number to listen on (in host endianess) */ + int localhost_only; /* nonzero = listen on localhost only */ + } tcp; + } proto_specific; + int listenfd; /* -1 = none */ + int pollidx; /* -1 = none */ +}; + +/* Data structure for a child process that is handling incoming connections */ +struct moonbr_worker { + struct moonbr_pool *pool; + struct moonbr_worker *prev_worker; + struct moonbr_worker *next_worker; + struct moonbr_worker *prev_idle_worker; + struct moonbr_worker *next_idle_worker; + int idle; /* nonzero = waiting for command from parent process */ + int assigned; /* nonzero = currently handling a connection */ + pid_t pid; + int controlfd; /* socket to send/receive control message to/from child process */ + int errorfd; /* socket to receive error output from child process' stderr */ + char *errorlinebuf; /* optional buffer for collecting stderr data from child process */ + int errorlinelen; /* number of bytes stored in 'errorlinebuf' */ + int errorlineovf; /* nonzero = line length overflow */ + struct timeval idle_expiration; /* point in time until child process may stay in idle state */ + struct moonbr_listener *restart_interval_listener; /* set while interval listener is assigned */ +}; + +/* Data structure for a pool of workers and listeners */ +struct moonbr_pool { + int poolnum; /* number of pool for log output */ + struct moonbr_pool *next_pool; /* next entry in linked list starting with 'moonbr_first_pool' */ + struct moonbr_worker *first_worker; /* first worker of pool */ + struct moonbr_worker *last_worker; /* last worker of pool */ + struct moonbr_worker *first_idle_worker; /* first idle worker of pool */ + struct moonbr_worker *last_idle_worker; /* last idle worker of pool */ + int idle_worker_count; + int unassigned_worker_count; + int total_worker_count; + int worker_count_stat; /* only needed for statistics */ + int pre_fork; /* desired minimum number of unassigned workers */ + int min_fork; /* desired minimum number of workers in total */ + int max_fork; /* maximum number of workers */ + struct timeval fork_delay; /* delay after each fork() until a fork may happen again */ + struct timeval fork_wakeup; /* point in time when a fork may happen again (unless a worker terminates before) */ + struct timeval fork_error_delay; /* delay between fork()s when an error during fork or preparation occurred */ + struct timeval fork_error_wakeup; /* point in time when fork may happen again if an error in preparation occurred */ + int use_fork_error_wakeup; /* nonzero = error in preparation occured; gets reset on next fork */ + struct timeval exit_delay; /* delay for terminating excessive workers (unassigned_worker_count > pre_fork) */ + struct timeval exit_wakeup; /* point in time when terminating an excessive worker */ + struct timeval idle_timeout; /* delay before an idle worker is terminated */ + size_t memory_limit; /* maximum bytes of memory that the Lua machine may allocate */ + int listener_count; /* total number of listeners of pool (and size of 'listener' array at end of this struct) */ + struct moonbr_listener *first_idle_listener; /* first listener that is idle (i.e. has no waiting connection) */ + struct moonbr_listener *last_idle_listener; /* last listener that is idle (i.e. has no waiting connection) */ + struct moonbr_listener *first_connected_listener; /* first listener that has a pending connection */ + struct moonbr_listener *last_connected_listener; /* last listener that has a pending connection */ + struct moonbr_listener listener[1]; /* static array of variable(!) size to contain 'listener' structures */ +}; + +/* Enum for 'channel' field of struct moonbr_poll_worker */ +#define MOONBR_POLL_WORKER_CONTROLCHANNEL 1 +#define MOONBR_POLL_WORKER_ERRORCHANNEL 2 + +/* Structure to refer from 'moonbr_poll_worker_fds' entry to worker structure */ +struct moonbr_poll_worker { + struct moonbr_worker *worker; + int channel; /* field indicating whether file descriptor is 'controlfd' or 'errorfd' */ +}; + +/* Variable indicating that clean shutdown was requested */ +static int moonbr_shutdown_in_progress = 0; + + +/*** Macros for Lua registry ***/ + +/* Lightuserdata keys for Lua registry to store 'prepare', 'connect', and 'finish' functions */ +#define moonbr_luakey_prepare_func(pool) ((void *)(intptr_t)(pool) + 0) +#define moonbr_luakey_connect_func(pool) ((void *)(intptr_t)(pool) + 1) +#define moonbr_luakey_finish_func(pool) ((void *)(intptr_t)(pool) + 2) + + +/*** Global variables ***/ + +/* State of process execution */ +static int moonbr_pstate = MOONBR_PSTATE_STARTUP; + +/* Process ID of the main process */ +static pid_t moonbr_masterpid; + +/* Condition variables set by the signal handler */ +static volatile sig_atomic_t moonbr_cond_poll = 0; +static volatile sig_atomic_t moonbr_cond_terminate = 0; +static volatile sig_atomic_t moonbr_cond_interrupt = 0; +static volatile sig_atomic_t moonbr_cond_child = 0; + +/* Socket pair to denote signal delivery when signal handler was called just before poll() */ +static int moonbr_poll_signalfds[2]; +#define moonbr_poll_signalfd_read moonbr_poll_signalfds[0] +#define moonbr_poll_signalfd_write moonbr_poll_signalfds[1] + +/* Global variables for pidfile and logging */ +static struct pidfh *moonbr_pidfh = NULL; +static FILE *moonbr_logfile = NULL; +static int moonbr_use_syslog = 0; + +/* First and last entry of linked list of all created pools during initialization */ +static struct moonbr_pool *moonbr_first_pool = NULL; +static struct moonbr_pool *moonbr_last_pool = NULL; + +/* Total count of pools */ +static int moonbr_pool_count = 0; + +/* Set to a nonzero value if dynamic part of 'moonbr_poll_fds' ('moonbr_poll_worker_fds') needs an update */ +static int moonbr_poll_refresh_needed = 0; + +/* Array passed to poll(), consisting of static part and dynamic part ('moonbr_poll_worker_fds') */ +static struct pollfd *moonbr_poll_fds = NULL; /* the array */ +static int moonbr_poll_fds_bufsize = 0; /* memory allocated for this number of elements */ +static int moonbr_poll_fds_count = 0; /* total number of elements */ +static int moonbr_poll_fds_static_count; /* number of elements in static part */ + +/* Dynamic part of 'moonbr_poll_fds' array */ +#define moonbr_poll_worker_fds (moonbr_poll_fds+moonbr_poll_fds_static_count) + +/* Additional information for dynamic part of 'moonbr_poll_fds' array */ +struct moonbr_poll_worker *moonbr_poll_workers; /* the array */ +static int moonbr_poll_workers_bufsize = 0; /* memory allocated for this number of elements */ +static int moonbr_poll_worker_count = 0; /* number of elements in array */ + +/* Variable set to nonzero value to disallow further calls of 'listen' function */ +static int moonbr_booted = 0; + +/* Global variables to store information on connection socket in child process */ +static int moonbr_child_peersocket_type; /* type of socket by MOONBR_SOCKETTYPE constant */ +static int moonbr_child_peersocket_fd; /* Original file descriptor of peer socket */ +static luaL_Stream *moonbr_child_peersocket_inputstream; /* Lua input stream of socket */ +static luaL_Stream *moonbr_child_peersocket_outputstream; /* Lua output stream of socket */ + +/* Verbosity settings */ +static int moonbr_debug = 0; +static int moonbr_stat = 0; + +/* Memory consumption by Lua machine */ +static size_t moonbr_memory_usage = 0; +static size_t moonbr_memory_limit = 0; + + +/*** Functions for signal handling ***/ + +/* Signal handler for master and child processes */ +static void moonbr_signal(int sig) { + if (getpid() == moonbr_masterpid) { + /* master process */ + switch (sig) { + case SIGHUP: + case SIGINT: + /* fast shutdown requested */ + moonbr_cond_interrupt = 1; + break; + case SIGTERM: + /* clean shutdown requested */ + moonbr_cond_terminate = 1; + break; + case SIGCHLD: + /* child process terminated */ + moonbr_cond_child = 1; + break; + } + if (moonbr_cond_poll) { + /* avoid race condition if signal handler is invoked right before poll() */ + char buf[1] = {0}; + write(moonbr_poll_signalfd_write, buf, 1); + } + } else { + /* child process forwards certain signals to parent process */ + switch (sig) { + case SIGHUP: + case SIGINT: + case SIGTERM: + kill(moonbr_masterpid, sig); + } + } +} + +/* Initialize signal handling */ +static void moonbr_signal_init(){ + moonbr_masterpid = getpid(); + signal(SIGHUP, moonbr_signal); + signal(SIGINT, moonbr_signal); + signal(SIGTERM, moonbr_signal); + signal(SIGCHLD, moonbr_signal); +} + + +/*** Functions for logging in master process ***/ + +/* Logs a pre-formatted message with given syslog() priority */ +static void moonbr_log_msg(int priority, const char *msg) { + if (moonbr_logfile) { + /* logging to logfile desired (timestamp is prepended in that case) */ + time_t now_time = 0; + struct tm now_tmstruct; + char timestr[MOONBR_LOG_MAXTIMELEN+1]; + time(&now_time); + localtime_r(&now_time, &now_tmstruct); + if (!strftime( + timestr, MOONBR_LOG_MAXTIMELEN+1, "%Y-%m-%d %H:%M:%S %Z: ", &now_tmstruct + )) timestr[0] = 0; + fprintf(moonbr_logfile, "%s%s\n", timestr, msg); + } + if (moonbr_use_syslog) { + /* logging through syslog desired */ + syslog(priority, "%s", msg); + } +} + +/* Formats a message via vsnprintf() and logs it with given syslog() priority */ +static void moonbr_log(int priority, const char *message, ...) { + char msgbuf[MOONBR_LOG_MAXMSGLEN+1]; /* buffer of static size to store formatted message */ + int msglen; /* length of full message (may exceed MOONBR_LOG_MAXMSGLEN) */ + { + /* pass variable arguments to vsnprintf() to format message */ + va_list ap; + va_start(ap, message); + msglen = vsnprintf(msgbuf, MOONBR_LOG_MAXMSGLEN+1, message, ap); + va_end(ap); + } + { + /* split and log message line by line */ + char *line = msgbuf; + while (1) { + char *endptr = strchr(line, '\n'); + if (endptr) { + /* terminate string where newline character is found */ + *endptr = 0; + } else if (line != msgbuf && msglen > MOONBR_LOG_MAXMSGLEN) { + /* break if line is incomplete and not the first line */ + break; + } + moonbr_log_msg(priority, line); + if (!endptr) break; /* break if end of formatted message is reached */ + line = endptr+1; /* otherwise continue with remaining message */ + } + } + if (msglen > MOONBR_LOG_MAXMSGLEN) { + /* print warning if message was truncated */ + moonbr_log_msg(priority, "Previous log message has been truncated due to excessive length"); + } +} + + +/*** Termination function ***/ + +/* Kill all child processes, remove PID file (if existent), and exit master process with given exitcode */ +static void moonbr_terminate(int exitcode) { + { + struct moonbr_pool *pool; + for (pool=moonbr_first_pool; pool; pool=pool->next_pool) { + { + struct moonbr_worker *worker; + for (worker=pool->first_worker; worker; worker=worker->next_worker) { + moonbr_log(LOG_INFO, "Sending SIGKILL to child with PID %i", (int)worker->pid); + if (kill(worker->pid, SIGKILL)) { + moonbr_log(LOG_ERR, "Error while killing child process: %s", strerror(errno)); + } + } + } + { + int i; + for (i=0; ilistener_count; i++) { + struct moonbr_listener *listener = &pool->listener[i]; + if (listener->proto == MOONBR_PROTO_LOCAL) { + moonbr_log(LOG_INFO, "Unlinking local socket \"%s\"", listener->proto_specific.local.path); + if (unlink(listener->proto_specific.local.path)) { + moonbr_log(LOG_ERR, "Error while unlinking local socket: %s", strerror(errno)); + } + } + } + } + } + } + moonbr_log(exitcode ? LOG_ERR : LOG_NOTICE, "Terminating with exit code %i", exitcode); + if (moonbr_pidfh && pidfile_remove(moonbr_pidfh)) { + moonbr_log(LOG_ERR, "Error while removing PID file: %s", strerror(errno)); + } + exit(exitcode); +} + +/* Terminate with either MOONBR_EXITCODE_STARTUPERROR or MOONBR_EXITCODE_RUNTIMEERROR */ +#define moonbr_terminate_error() \ + moonbr_terminate( \ + moonbr_pstate == MOONBR_PSTATE_STARTUP ? \ + MOONBR_EXITCODE_STARTUPERROR : \ + MOONBR_EXITCODE_RUNTIMEERROR \ + ) + + +/*** Helper functions ***/ + +/* Fills a 'struct timeval' structure with the current time (using CLOCK_MONOTONIC) */ +static void moonbr_now(struct timeval *now) { + struct timespec ts = {0, }; + if (clock_gettime(CLOCK_MONOTONIC, &ts)) { + moonbr_log(LOG_CRIT, "Error in clock_gettime() call: %s", strerror(errno)); + moonbr_terminate_error(); + } + *now = (struct timeval){ .tv_sec = ts.tv_sec, .tv_usec = ts.tv_nsec / 1000 }; +} + +/* Formats a 'struct timeval' value (not thread-safe) */ +static char *moonbr_format_timeval(struct timeval *t) { + static char buf[32]; + snprintf(buf, 32, "%ji.%06ji seconds", (intmax_t)t->tv_sec, (intmax_t)t->tv_usec); + return buf; +} + + +/*** Functions for pool creation and startup ***/ + +/* Creates a 'struct moonbr_pool' structure with a given number of listeners */ +static struct moonbr_pool *moonbr_create_pool(int listener_count) { + struct moonbr_pool *pool; + pool = calloc(1, + sizeof(struct moonbr_pool) + /* size of 'struct moonbr_pool' with one listener */ + (listener_count-1) * sizeof(struct moonbr_listener) /* size of extra listeners */ + ); + if (!pool) { + moonbr_log(LOG_CRIT, "Memory allocation error"); + moonbr_terminate_error(); + } + pool->listener_count = listener_count; + { + /* initialization of listeners */ + int i; + for (i=0; ilistener[i]; + listener->pool = pool; + listener->listenfd = -1; + listener->pollidx = -1; + } + } + return pool; +} + +/* Destroys a 'struct moonbr_pool' structure before it has been started */ +static void moonbr_destroy_pool(struct moonbr_pool *pool) { + int i; + for (i=0; ilistener_count; i++) { + struct moonbr_listener *listener = &pool->listener[i]; + if ( + listener->proto == MOONBR_PROTO_INTERVAL && + listener->proto_specific.interval.name + ) { + free(listener->proto_specific.interval.name); + } + if ( + listener->proto == MOONBR_PROTO_LOCAL && + listener->proto_specific.local.path + ) { + free(listener->proto_specific.local.path); + } + } + free(pool); +} + +/* Starts a all listeners in a pool */ +static int moonbr_start_pool(struct moonbr_pool *pool) { + moonbr_log(LOG_INFO, "Creating pool", pool->poolnum); + { + int i; + for (i=0; ilistener_count; i++) { + struct moonbr_listener *listener = &pool->listener[i]; + switch (listener->proto) { + case MOONBR_PROTO_INTERVAL: + // nothing to do here: starting intervals is performed in moonbr_run() function + if (!listener->proto_specific.interval.name) { + moonbr_log(LOG_INFO, "Adding unnamed interval listener"); + } else { + moonbr_log(LOG_INFO, "Adding interval listener \"%s\"", listener->proto_specific.interval.name); + } + break; + case MOONBR_PROTO_LOCAL: + moonbr_log(LOG_INFO, "Adding local socket listener for path \"%s\"", listener->proto_specific.local.path); + { + struct sockaddr_un servaddr = { .sun_family = AF_UNIX }; + const int path_maxlen = sizeof(struct sockaddr_un) - ( + (void *)&servaddr.sun_path - (void *)&servaddr + ); + if ( + snprintf( + servaddr.sun_path, + path_maxlen, + "%s", + listener->proto_specific.local.path + ) >= path_maxlen + ) { + errno = ENAMETOOLONG; + }; + listener->listenfd = socket(PF_LOCAL, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0); + if (listener->listenfd == -1) goto moonbr_start_pool_error; + if (!unlink(listener->proto_specific.local.path)) { + moonbr_log(LOG_WARNING, "Unlinked named socket \"%s\" prior to listening", listener->proto_specific.local.path); + } else { + if (errno != ENOENT) { + moonbr_log(LOG_ERR, "Could not unlink named socket \"%s\" prior to listening: %s", listener->proto_specific.local.path, strerror(errno)); + } + } + if ( + bind(listener->listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) + ) goto moonbr_start_pool_error; + if (listen(listener->listenfd, MOONBR_LISTEN_BACKLOG)) goto moonbr_start_pool_error; + } + break; + case MOONBR_PROTO_TCP6: + if (listener->proto_specific.tcp.localhost_only) { + moonbr_log(LOG_INFO, "Adding localhost TCP/IPv6 listener on port %i", listener->proto_specific.tcp.port); + } else { + moonbr_log(LOG_INFO, "Adding public TCP/IPv6 listener on port %i", listener->proto_specific.tcp.port); + } + { + struct sockaddr_in6 servaddr = { + .sin6_family = AF_INET6, + .sin6_port = htons(listener->proto_specific.tcp.port) + }; + listener->listenfd = socket(PF_INET6, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0); + if (listener->listenfd == -1) goto moonbr_start_pool_error; + { + /* avoid "Address already in use" error when restarting service */ + static const int reuseval = 1; + if (setsockopt( + listener->listenfd, SOL_SOCKET, SO_REUSEADDR, &reuseval, sizeof(reuseval) + )) goto moonbr_start_pool_error; + } + { + /* default to send TCP RST when process terminates unexpectedly */ + static const struct linger lingerval = { + .l_onoff = 1, + .l_linger = 0 + }; + if (setsockopt( + listener->listenfd, SOL_SOCKET, SO_LINGER, &lingerval, sizeof(lingerval) + )) goto moonbr_start_pool_error; + } + if (listener->proto_specific.tcp.localhost_only) { + servaddr.sin6_addr.s6_addr[15] = 1; + } + if ( + bind(listener->listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) + ) goto moonbr_start_pool_error; + if (listen(listener->listenfd, MOONBR_LISTEN_BACKLOG)) goto moonbr_start_pool_error; + } + break; + case MOONBR_PROTO_TCP4: + if (listener->proto_specific.tcp.localhost_only) { + moonbr_log(LOG_INFO, "Adding localhost TCP/IPv4 listener on port %i", listener->proto_specific.tcp.port); + } else { + moonbr_log(LOG_INFO, "Adding public TCP/IPv4 listener on port %i", listener->proto_specific.tcp.port); + } + { + struct sockaddr_in servaddr = { + .sin_family = AF_INET, + .sin_port = htons(listener->proto_specific.tcp.port) + }; + listener->listenfd = socket(PF_INET, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0); + if (listener->listenfd == -1) goto moonbr_start_pool_error; + { + /* avoid "Address already in use" error when restarting service */ + static const int reuseval = 1; + if (setsockopt( + listener->listenfd, SOL_SOCKET, SO_REUSEADDR, &reuseval, sizeof(reuseval) + )) goto moonbr_start_pool_error; + } + { + /* default to send TCP RST when process terminates unexpectedly */ + static const struct linger lingerval = { + .l_onoff = 1, + .l_linger = 0 + }; + if (setsockopt( + listener->listenfd, SOL_SOCKET, SO_LINGER, &lingerval, sizeof(lingerval) + )) goto moonbr_start_pool_error; + } + if (listener->proto_specific.tcp.localhost_only) { + ((uint8_t *)&servaddr.sin_addr.s_addr)[0] = 127; + ((uint8_t *)&servaddr.sin_addr.s_addr)[3] = 1; + } + if ( + bind(listener->listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) + ) goto moonbr_start_pool_error; + if (listen(listener->listenfd, MOONBR_LISTEN_BACKLOG)) goto moonbr_start_pool_error; + } + break; + default: + moonbr_log(LOG_CRIT, "Internal error (should not happen): Unexpected value in listener.proto field"); + moonbr_terminate_error(); + } + } + goto moonbr_start_pool_ok; + moonbr_start_pool_error: + { + int j = i; + int errno2 = errno; + for (; i>=0; i--) { + struct moonbr_listener *listener = &pool->listener[i]; + if (listener->listenfd != -1) close(listener->listenfd); + } + errno = errno2; + return j; + } + } + moonbr_start_pool_ok: + pool->poolnum = ++moonbr_pool_count; + moonbr_log(LOG_INFO, "Pool #%i created", pool->poolnum); + if (moonbr_last_pool) moonbr_last_pool->next_pool = pool; + else moonbr_first_pool = pool; + moonbr_last_pool = pool; + return -1; +} + + +/*** Function to send data and a file descriptor to child process */ + +/* Sends control message of one bye plus optional file descriptor plus optional pointer to child process */ +static void moonbr_send_control_message(struct moonbr_worker *worker, char status, int fd, void *ptr) { + { + struct iovec iovector = { .iov_base = &status, .iov_len = 1 }; /* carrying status byte */ + char control_message_buffer[CMSG_SPACE(sizeof(int))] = {0, }; /* used to transfer file descriptor */ + struct msghdr message = { .msg_iov = &iovector, .msg_iovlen = 1 }; /* data structure passed to sendmsg() call */ + if (moonbr_debug) { + if (fd == -1) { + moonbr_log(LOG_DEBUG, "Sending control message \"%c\" to child process in pool #%i (PID %i)", (int)status, worker->pool->poolnum, (int)worker->pid); + } else { + moonbr_log(LOG_DEBUG, "Sending control message \"%c\" with file descriptor #%i to child process in pool #%i (PID %i)", (int)status, fd, worker->pool->poolnum, (int)worker->pid); + } + } + if (fd != -1) { + /* attach control message with file descriptor */ + message.msg_control = control_message_buffer; + message.msg_controllen = CMSG_SPACE(sizeof(int)); + { + struct cmsghdr *control_message = CMSG_FIRSTHDR(&message); + control_message->cmsg_level = SOL_SOCKET; + control_message->cmsg_type = SCM_RIGHTS; + control_message->cmsg_len = CMSG_LEN(sizeof(int)); + *((int *)CMSG_DATA(control_message)) = fd; + } + } + while (sendmsg(worker->controlfd, &message, MSG_NOSIGNAL) < 0) { + if (errno == EPIPE) { + moonbr_log(LOG_ERR, "Error while communicating with idle child process in pool #%i (PID %i): %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); + return; /* do not close socket; socket is closed when reading from it */ + } + if (errno != EINTR) { + moonbr_log(LOG_CRIT, "Unexpected error while communicating with idle child process in pool #%i (PID %i): %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); + moonbr_terminate_error(); + } + } + } + if (ptr) { + char buf[sizeof(void *)]; + char *pos = buf; + int len = sizeof(void *); + ssize_t written; + if (moonbr_debug) { + moonbr_log(LOG_DEBUG, "Sending memory pointer to child process in pool #%i (PID %i)", (int)status, worker->pool->poolnum, (int)worker->pid); + } + *((intptr_t *)buf) = (intptr_t)ptr; + while (len) { + written = send(worker->controlfd, pos, len, MSG_NOSIGNAL); + if (written > 0) { + pos += written; + len -= written; + } else if (errno == EPIPE) { + moonbr_log(LOG_ERR, "Error while communicating with idle child process in pool #%i (PID %i): %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); + return; /* do not close socket; socket is closed when reading from it */ + } else if (errno != EINTR) { + moonbr_log(LOG_CRIT, "Unexpected error while communicating with idle child process in pool #%i (PID %i): %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); + moonbr_terminate_error(); + } + } + } +} + + +/*** Functions running in child process ***/ + +/* Logs an error in child process */ +static void moonbr_child_log(const char *message) { + fprintf(stderr, "%s\n", message); +} + +/* Logs a fatal error in child process and terminates process with error status */ +static void moonbr_child_log_fatal(const char *message) { + moonbr_child_log(message); + exit(1); +} + +/* Logs an error in child process while appending error string for global errno variable */ +static void moonbr_child_log_errno(const char *message) { + char errmsg[MOONBR_MAXSTRERRORLEN]; + strerror_r(errno, errmsg, MOONBR_MAXSTRERRORLEN); /* use thread-safe call in case child created threads */ + fprintf(stderr, "%s: %s\n", message, errmsg); +} + +/* Logs a fatal error in child process while appending error string for errno and terminating process */ +static void moonbr_child_log_errno_fatal(const char *message) { + moonbr_child_log_errno(message); + exit(1); +} + +/* Receives a control message consisting of one character plus an optional file descriptor from parent process */ +static void moonbr_child_receive_control_message(int socketfd, char *status, int *fd) { + struct iovec iovector = { .iov_base = status, .iov_len = 1 }; /* reference to status byte variable */ + char control_message_buffer[CMSG_SPACE(sizeof(int))] = {0, }; /* used to receive file descriptor */ + struct msghdr message = { /* data structure passed to recvmsg() call */ + .msg_iov = &iovector, + .msg_iovlen = 1, + .msg_control = control_message_buffer, + .msg_controllen = CMSG_SPACE(sizeof(int)) + }; + { + int received; + while ((received = recvmsg(socketfd, &message, MSG_CMSG_CLOEXEC)) < 0) { + if (errno != EINTR) { + moonbr_child_log_errno_fatal("Error while trying to receive connection socket from parent process"); + } + } + if (!received) { + moonbr_child_log_fatal("Unexpected EOF while trying to receive connection socket from parent process"); + } + } + { + struct cmsghdr *control_message = CMSG_FIRSTHDR(&message); + if (control_message) { + if (control_message->cmsg_level != SOL_SOCKET) { + moonbr_child_log_fatal("Received control message with cmsg_level not equal to SOL_SOCKET"); + } + if (control_message->cmsg_type != SCM_RIGHTS) { + moonbr_child_log_fatal("Received control message with cmsg_type not equal to SCM_RIGHTS"); + } + *fd = *((int *)CMSG_DATA(control_message)); + } else { + *fd = -1; + } + } +} + +/* Receives a pointer from parent process */ +static void *moonbr_child_receive_pointer(int socketfd) { + char buf[sizeof(void *)]; + char *pos = buf; + int len = sizeof(void *); + ssize_t bytes_read; + while (len) { + bytes_read = recv(socketfd, pos, len, 0); + if (bytes_read > 0) { + pos += bytes_read; + len -= bytes_read; + } else if (!bytes_read) { + moonbr_child_log_fatal("Unexpected EOF while trying to receive memory pointer from parent process"); + } else if (errno != EINTR) { + moonbr_child_log_errno_fatal("Error while trying to receive memory pointer from parent process"); + } + } + return (void *)*(intptr_t *)buf; +} + +/* Throws a Lua error message with an error string for errno appended to it */ +static void moonbr_child_lua_errno_error(lua_State *L, char *message) { + char errmsg[MOONBR_MAXSTRERRORLEN]; + strerror_r(errno, errmsg, MOONBR_MAXSTRERRORLEN); /* use thread-safe call in case child created threads */ + luaL_error(L, "%s: %s", message, errmsg); +} + +/* Closes the input stream from peer unless it has already been closed */ +static int moonbr_child_close_peersocket_inputstream( + int cleanshut, /* nonzero = use shutdown() if applicable */ + int mark /* nonzero = mark the stream as closed for Lua */ +) { + int err = 0; /* nonzero = error occurred */ + int errno2; /* stores previous errno values that take precedence */ + if (moonbr_child_peersocket_inputstream->f) { + if (cleanshut && moonbr_child_peersocket_type == MOONBR_SOCKETTYPE_NETWORK) { + if (shutdown(moonbr_child_peersocket_fd, SHUT_RD)) { + errno2 = errno; + err = -1; + } + } + if (fclose(moonbr_child_peersocket_inputstream->f)) { + if (!err) errno2 = errno; + err = -1; + } + moonbr_child_peersocket_inputstream->f = NULL; + } + if (mark) moonbr_child_peersocket_inputstream->closef = NULL; + if (err) errno = errno2; + return err; +} + +/* Closes the output stream to peer unless it has already been closed */ +static int moonbr_child_close_peersocket_outputstream( + int cleanshut, /* nonzero = use fflush() and shutdown() if applicable */ + int mark /* nonzero = mark the stream as closed for Lua */ +) { + int err = 0; /* nonzero = error occurred */ + int errno2; /* stores previous errno values that take precedence */ + if (moonbr_child_peersocket_outputstream->f) { + if (moonbr_child_peersocket_type == MOONBR_SOCKETTYPE_NETWORK) { + if (cleanshut) { + if (fflush(moonbr_child_peersocket_outputstream->f)) { + errno2 = errno; + err = -1; + } else { + if (shutdown(moonbr_child_peersocket_fd, SHUT_WR)) { + errno2 = errno; + err = -1; + } + } + } else { + fpurge(moonbr_child_peersocket_outputstream->f); + } + } + if (fclose(moonbr_child_peersocket_outputstream->f)) { + if (!err) errno2 = errno; + err = -1; + } + moonbr_child_peersocket_outputstream->f = NULL; + } + if (mark) moonbr_child_peersocket_outputstream->closef = NULL; + if (err) errno = errno2; + return err; +} + +/* Perform a clean shutdown of input and output stream (may be called multiple times) */ +static int moonbr_child_close_peersocket(int timeout) { + int errprio = 0; + int errno2; + if (moonbr_child_peersocket_fd == -1) return 0; + if (moonbr_child_close_peersocket_inputstream(1, 1)) { + errprio = 1; + errno2 = errno; + } + if (moonbr_child_close_peersocket_outputstream(1, 1)) { + errprio = 4; + errno2 = errno; + } + if (moonbr_child_peersocket_type == MOONBR_SOCKETTYPE_NETWORK) { + struct linger lingerval = { 0, }; + if (timeout && !errprio) { + lingerval.l_onoff = 1; + lingerval.l_linger = timeout; + } + if (setsockopt(moonbr_child_peersocket_fd, SOL_SOCKET, SO_LINGER, &lingerval, sizeof(lingerval))) { + if (errprio < 2) { + errprio = 2; + errno2 = errno; + } + } + } + if (close(moonbr_child_peersocket_fd)) { + if (errprio < 3) { + errprio = 3; + errno2 = errno; + } + } + moonbr_child_peersocket_fd = -1; + if (errprio) { + errno = errno2; + return -1; + } + return 0; +} + +/* Close socket and cause reset of TCP connection (TCP RST aka "Connection reset by peer") if possible */ +static int moonbr_child_cancel_peersocket() { + int err = 0; + if (moonbr_child_close_peersocket_inputstream(0, 1)) err = -1; + if (moonbr_child_close_peersocket_outputstream(0, 1)) err = -1; + if (close(moonbr_child_peersocket_fd)) err = -1; + moonbr_child_peersocket_fd = -1; + return err; +} + +/* Lua method for socket object to read from input stream */ +static int moonbr_child_lua_read_stream(lua_State *L) { + lua_getfield(L, 1, "input"); + lua_getfield(L, -1, "read"); + lua_insert(L, 1); + lua_replace(L, 2); + lua_call(L, lua_gettop(L) - 1, LUA_MULTRET); + return lua_gettop(L); +} + +/* Lua method for socket object to read from input stream until terminator */ +static int moonbr_child_lua_readuntil_stream(lua_State *L) { + lua_getfield(L, 1, "input"); + lua_getfield(L, -1, "readuntil"); + lua_insert(L, 1); + lua_replace(L, 2); + lua_call(L, lua_gettop(L) - 1, LUA_MULTRET); + return lua_gettop(L); +} + +/* Lua method for socket object to iterate over input stream */ +static int moonbr_child_lua_lines_stream(lua_State *L) { + lua_getfield(L, 1, "input"); + lua_getfield(L, -1, "lines"); + lua_insert(L, 1); + lua_replace(L, 2); + lua_call(L, lua_gettop(L) - 1, LUA_MULTRET); + return lua_gettop(L); +} + +/* Lua method for socket object to write to output stream */ +static int moonbr_child_lua_write_stream(lua_State *L) { + lua_getfield(L, 1, "output"); + lua_getfield(L, -1, "write"); + lua_insert(L, 1); + lua_replace(L, 2); + lua_call(L, lua_gettop(L) - 1, LUA_MULTRET); + return lua_gettop(L); +} + +/* Lua method for socket object to flush the output stream */ +static int moonbr_child_lua_flush_stream(lua_State *L) { + lua_getfield(L, 1, "output"); + lua_getfield(L, -1, "flush"); + lua_insert(L, 1); + lua_replace(L, 2); + lua_call(L, lua_gettop(L) - 1, LUA_MULTRET); + return lua_gettop(L); +} + +/* Lua function to close a single stream (input or output) from/to peer */ +static int moonbr_child_lua_close_stream(lua_State *L) { + luaL_Stream *stream = lua_touserdata(L, 1); + if (stream == moonbr_child_peersocket_inputstream) { + if (moonbr_child_close_peersocket_inputstream(1, 0)) { /* don't mark as closed as it's done by Lua */ + moonbr_child_lua_errno_error(L, "Could not close input stream"); + } + } else if (stream == moonbr_child_peersocket_outputstream) { + if (moonbr_child_close_peersocket_outputstream(1, 0)) { /* don't mark as closed as it's done by Lua */ + moonbr_child_lua_errno_error(L, "Could not close output stream"); + } + } else { + luaL_argerror(L, 1, "Not a connection socket"); + } + return 0; +} + +/* Lua function to close both input and output stream from/to peer */ +static int moonbr_child_lua_close_both_streams(lua_State *L) { + int timeout = 0; + if (!lua_isnoneornil(L, 2)) timeout = luaL_checkint(L, 2); + if (moonbr_child_peersocket_fd == -1) { + luaL_error(L, "Connection with peer has already been explicitly closed"); + } + if (moonbr_child_close_peersocket(timeout)) { + moonbr_child_lua_errno_error(L, "Could not close socket connection with peer"); + } + return 0; +} + +/* Lua function to close both input and output stream from/to peer */ +static int moonbr_child_lua_cancel_both_streams(lua_State *L) { + if (moonbr_child_peersocket_fd == -1) { + luaL_error(L, "Connection with peer has already been explicitly closed"); + } + if (moonbr_child_cancel_peersocket()) { + moonbr_child_lua_errno_error(L, "Could not cancel socket connection with peer"); + } + return 0; +} + +/* Methods of (bidirectional) socket object passed to handler */ +static luaL_Reg moonbr_child_lua_socket_functions[] = { + {"read", moonbr_child_lua_read_stream}, + {"readuntil", moonbr_child_lua_readuntil_stream}, + {"lines", moonbr_child_lua_lines_stream}, + {"write", moonbr_child_lua_write_stream}, + {"flush", moonbr_child_lua_flush_stream}, + {"close", moonbr_child_lua_close_both_streams}, + {"cancel", moonbr_child_lua_cancel_both_streams}, + {NULL, NULL} +}; + +/* Main function of child process to be called after fork() and file descriptor rearrangement */ +void moonbr_child_run(struct moonbr_pool *pool, lua_State *L) { + char controlmsg; + struct itimerval notimer = { { 0, }, { 0, } }; + lua_rawgetp(L, LUA_REGISTRYINDEX, moonbr_luakey_prepare_func(pool)); + if (lua_isnil(L, -1)) lua_pop(L, 1); + else if (lua_pcall(L, 0, 0, 1)) { + fprintf(stderr, "Error in \"prepare\" function: %s\n", lua_tostring(L, -1)); + exit(1); + } + while (1) { + struct moonbr_listener *listener; + if (setitimer(ITIMER_REAL, ¬imer, NULL)) { + moonbr_child_log_errno_fatal("Could not reset ITIMER_REAL via setitimer()"); + } + controlmsg = MOONBR_STATUS_IDLE; + if (write(MOONBR_FD_CONTROL, &controlmsg, 1) <= 0) { + moonbr_child_log_errno_fatal("Error while sending ready message to parent process"); + } + moonbr_child_receive_control_message( + MOONBR_FD_CONTROL, + &controlmsg, + &moonbr_child_peersocket_fd + ); + if (!( + (controlmsg == MOONBR_COMMAND_TERMINATE && moonbr_child_peersocket_fd == -1) || + (controlmsg == MOONBR_SOCKETTYPE_INTERVAL && moonbr_child_peersocket_fd == -1) || + (controlmsg == MOONBR_SOCKETTYPE_LOCAL && moonbr_child_peersocket_fd != -1) || + (controlmsg == MOONBR_SOCKETTYPE_NETWORK && moonbr_child_peersocket_fd != -1) + )) { + moonbr_child_log_fatal("Received illegal control message from parent process"); + } + if (controlmsg == MOONBR_COMMAND_TERMINATE) break; + listener = moonbr_child_receive_pointer(MOONBR_FD_CONTROL); + moonbr_child_peersocket_type = controlmsg; + if (moonbr_child_peersocket_fd != -1) { + { + int clonedfd; + clonedfd = dup(moonbr_child_peersocket_fd); + if (!clonedfd) { + moonbr_child_log_errno_fatal("Could not duplicate file descriptor for input stream"); + } + moonbr_child_peersocket_inputstream = lua_newuserdata(L, sizeof(luaL_Stream)); + if (!moonbr_child_peersocket_inputstream) { + moonbr_child_log_fatal("Memory allocation error"); + } + moonbr_child_peersocket_inputstream->f = fdopen(clonedfd, "rb"); + if (!moonbr_child_peersocket_inputstream->f) { + moonbr_child_log_errno_fatal("Could not open input stream for remote connection"); + } + moonbr_child_peersocket_inputstream->closef = moonbr_child_lua_close_stream; + if (luaL_newmetatable(L, LUA_FILEHANDLE)) { + moonbr_child_log_fatal("Lua metatable LUA_FILEHANDLE does not exist"); + } + lua_setmetatable(L, -2); + } + { + int clonedfd; + clonedfd = dup(moonbr_child_peersocket_fd); + if (!clonedfd) { + moonbr_child_log_errno_fatal("Could not duplicate file descriptor for output stream"); + } + moonbr_child_peersocket_outputstream = lua_newuserdata(L, sizeof(luaL_Stream)); + if (!moonbr_child_peersocket_outputstream) { + moonbr_child_log_fatal("Memory allocation error"); + } + moonbr_child_peersocket_outputstream->f = fdopen(clonedfd, "wb"); + if (!moonbr_child_peersocket_outputstream->f) { + moonbr_child_log_errno_fatal("Could not open output stream for remote connection"); + } + moonbr_child_peersocket_outputstream->closef = moonbr_child_lua_close_stream; + if (luaL_newmetatable(L, LUA_FILEHANDLE)) { + moonbr_child_log_fatal("Lua metatable LUA_FILEHANDLE does not exist"); + } + lua_setmetatable(L, -2); + } + } + lua_rawgetp(L, LUA_REGISTRYINDEX, moonbr_luakey_connect_func(pool)); + if (listener->proto == MOONBR_PROTO_INTERVAL) { + lua_newtable(L); + lua_pushstring(L, + listener->proto_specific.interval.name ? + listener->proto_specific.interval.name : "" + ); + lua_setfield(L, -2, "interval"); + } else { + lua_newtable(L); + lua_pushvalue(L, -4); + lua_setfield(L, -2, "input"); + lua_pushvalue(L, -3); + lua_setfield(L, -2, "output"); + luaL_setfuncs(L, moonbr_child_lua_socket_functions, 0); + if (listener->proto == MOONBR_PROTO_TCP6) { + struct sockaddr_in6 addr; + socklen_t addr_len = sizeof(struct sockaddr_in6); + if (getsockname(moonbr_child_peersocket_fd, (struct sockaddr *)&addr, &addr_len)) { + moonbr_child_log_errno("Could not get local IP address/port"); + } else { + lua_pushlstring(L, (char *)addr.sin6_addr.s6_addr, 16); + lua_setfield(L, -2, "local_ip6"); + lua_pushinteger(L, ntohs(addr.sin6_port)); + lua_setfield(L, -2, "local_tcpport"); + } + if (getpeername(moonbr_child_peersocket_fd, (struct sockaddr *)&addr, &addr_len)) { + moonbr_child_log_errno("Could not get remote IP address/port"); + } else { + lua_pushlstring(L, (char *)addr.sin6_addr.s6_addr, 16); + lua_setfield(L, -2, "remote_ip6"); + lua_pushinteger(L, ntohs(addr.sin6_port)); + lua_setfield(L, -2, "remote_tcpport"); + } + } else if (listener->proto == MOONBR_PROTO_TCP4) { + struct sockaddr_in addr; + socklen_t addr_len = sizeof(struct sockaddr_in); + if (getsockname(moonbr_child_peersocket_fd, (struct sockaddr *)&addr, &addr_len)) { + moonbr_child_log_errno("Could not get local IP address/port"); + } else { + lua_pushlstring(L, (char *)&addr.sin_addr.s_addr, 4); + lua_setfield(L, -2, "local_ip4"); + lua_pushinteger(L, ntohs(addr.sin_port)); + lua_setfield(L, -2, "local_tcpport"); + } + if (getpeername(moonbr_child_peersocket_fd, (struct sockaddr *)&addr, &addr_len)) { + moonbr_child_log_errno("Could not get remote IP address/port"); + } else { + lua_pushlstring(L, (char *)&addr.sin_addr.s_addr, 4); + lua_setfield(L, -2, "remote_ip4"); + lua_pushinteger(L, ntohs(addr.sin_port)); + lua_setfield(L, -2, "remote_tcpport"); + } + } + } + if (lua_pcall(L, 1, 1, 1)) { + fprintf(stderr, "Error in \"connect\" function: %s\n", lua_tostring(L, -1)); + exit(1); + } + if (moonbr_child_close_peersocket(0)) { + moonbr_child_log_errno("Could not close socket connection with peer"); + } + if (lua_type(L, -1) != LUA_TBOOLEAN || !lua_toboolean(L, -1)) break; +#ifdef MOONBR_LUA_PANIC_BUG_WORKAROUND + lua_settop(L, 2); +#else + lua_settop(L, 1); +#endif + } + controlmsg = MOONBR_STATUS_GOODBYE; + if (write(MOONBR_FD_CONTROL, &controlmsg, 1) <= 0) { + moonbr_child_log_errno_fatal("Error while sending goodbye message to parent process"); + } + if (close(MOONBR_FD_CONTROL) && errno != EINTR) { + moonbr_child_log_errno("Error while closing control socket"); + } + lua_rawgetp(L, LUA_REGISTRYINDEX, moonbr_luakey_finish_func(pool)); + if (lua_isnil(L, -1)) lua_pop(L, 1); + else if (lua_pcall(L, 0, 0, 1)) { + fprintf(stderr, "Error in \"finish\" function: %s\n", lua_tostring(L, -1)); + exit(1); + } + lua_close(L); + exit(0); +} + + +/*** Functions to spawn child process ***/ + +/* Helper function to send an error message to a file descriptor (not needing a file stream) */ +static void moonbr_child_emergency_print(int fd, char *message) { + size_t len = strlen(message); + ssize_t written; + while (len) { + written = write(fd, message, len); + if (written > 0) { + message += written; + len -= written; + } else { + if (written != -1 || errno != EINTR) break; + } + } +} + +/* Helper function to send an error message plus a text for errno to a file descriptor and terminate the process */ +static void moonbr_child_emergency_error(int fd, char *message) { + int errno2 = errno; + moonbr_child_emergency_print(fd, message); + moonbr_child_emergency_print(fd, ": "); + moonbr_child_emergency_print(fd, strerror(errno2)); + moonbr_child_emergency_print(fd, "\n"); + exit(1); +} + +/* Creates a child process and (in case of success) registers it in the 'struct moonbr_pool' structure */ +static int moonbr_create_worker(struct moonbr_pool *pool, lua_State *L) { + struct moonbr_worker *worker; + worker = calloc(1, sizeof(struct moonbr_worker)); + if (!worker) { + moonbr_log(LOG_CRIT, "Memory allocation error"); + return -1; + } + worker->pool = pool; + { + int controlfds[2]; + int errorfds[2]; + if (socketpair(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC, 0, controlfds)) { + moonbr_log(LOG_ERR, "Could not create control socket pair for communcation with child process: %s", strerror(errno)); + free(worker); + return -1; + } + if (socketpair(PF_LOCAL, SOCK_STREAM | SOCK_CLOEXEC, 0, errorfds)) { + moonbr_log(LOG_ERR, "Could not create socket pair to redirect stderr of child process: %s", strerror(errno)); + close(controlfds[0]); + close(controlfds[1]); + free(worker); + return -1; + } + if (moonbr_logfile && fflush(moonbr_logfile)) { + moonbr_log(LOG_CRIT, "Could not flush log file prior to forking: %s", strerror(errno)); + moonbr_terminate_error(); + } + worker->pid = fork(); + if (worker->pid == -1) { + moonbr_log(LOG_ERR, "Could not fork: %s", strerror(errno)); + close(controlfds[0]); + close(controlfds[1]); + close(errorfds[0]); + close(errorfds[1]); + free(worker); + return -1; + } else if (!worker->pid) { + moonbr_pstate = MOONBR_PSTATE_FORKED; +#ifdef MOONBR_LUA_PANIC_BUG_WORKAROUND + lua_pushliteral(L, "Failed to pass error message due to bug in Lua panic handler (hint: not enough memory?)"); +#endif + moonbr_memory_limit = pool->memory_limit; + if (moonbr_pidfh && pidfile_close(moonbr_pidfh)) { + moonbr_child_emergency_error(errorfds[1], "Could not close PID file in forked child process"); + } + if (moonbr_logfile && moonbr_logfile != stderr && fclose(moonbr_logfile)) { + moonbr_child_emergency_error(errorfds[1], "Could not close log file in forked child process"); + } + if (dup2(errorfds[1], MOONBR_FD_STDERR) == -1) { + moonbr_child_emergency_error(errorfds[1], "Could not duplicate socket to stderr file descriptor"); + } + if (dup2(controlfds[1], MOONBR_FD_CONTROL) == -1) { + moonbr_child_emergency_error(errorfds[1], "Could not duplicate control socket"); + } + closefrom(MOONBR_FD_END); + moonbr_child_run(pool, L); + } + if (moonbr_stat) { + moonbr_log(LOG_INFO, "Created new worker in pool #%i with PID %i", worker->pool->poolnum, (int)worker->pid); + } + worker->controlfd = controlfds[0]; + worker->errorfd = errorfds[0]; + if (close(controlfds[1]) && errno != EINTR) { + moonbr_log(LOG_CRIT, "Could not close opposite end of control file descriptor after forking"); + moonbr_terminate_error(); + } + if (close(errorfds[1]) && errno != EINTR) { + moonbr_log(LOG_CRIT, "Could not close opposite end of control file descriptor after forking"); + moonbr_terminate_error(); + } + } + worker->prev_worker = pool->last_worker; + if (worker->prev_worker) worker->prev_worker->next_worker = worker; + else pool->first_worker = worker; + pool->last_worker = worker; + pool->unassigned_worker_count++; + pool->total_worker_count++; + pool->worker_count_stat = 1; + moonbr_poll_refresh_needed = 1; + return 0; /* return zero only in case of success */ +} + + +/*** Functions to handle previously created 'struct moonbr_worker' structures ***/ + +#define moonbr_try_destroy_worker_stat(str, field) \ + moonbr_log(LOG_INFO, "Resource usage in pool #%i for PID %i: " str " %li", worker->pool->poolnum, (int)worker->pid, (long)childusage.field); + +/* Destroys a worker structure if socket connections have been closed and child process has terminated */ +static int moonbr_try_destroy_worker(struct moonbr_worker *worker) { + if (worker->controlfd != -1 || worker->errorfd != -1) return MOONBR_DESTROY_NONE; + { + int childstatus; + struct rusage childusage; + { + pid_t waitedpid; + while ( + (waitedpid = wait4(worker->pid, &childstatus, WNOHANG, &childusage)) == -1 + ) { + if (errno != EINTR) { + moonbr_log(LOG_CRIT, "Error in wait4() call: %s", strerror(errno)); + moonbr_terminate_error(); + } + } + if (!waitedpid) return 0; /* return 0 if worker couldn't be destroyed */ + if (waitedpid != worker->pid) { + moonbr_log(LOG_CRIT, "Wrong PID returned by wait4() call"); + moonbr_terminate_error(); + } + } + if (WIFEXITED(childstatus)) { + if (WEXITSTATUS(childstatus) || moonbr_stat) { + moonbr_log( + WEXITSTATUS(childstatus) ? LOG_WARNING : LOG_INFO, + "Child process in pool #%i with PID %i returned with exit code %i", worker->pool->poolnum, (int)worker->pid, WEXITSTATUS(childstatus) + ); + } + } else if (WIFSIGNALED(childstatus)) { + if (WCOREDUMP(childstatus)) { + moonbr_log(LOG_ERR, "Child process in pool #%i with PID %i died from signal %i (core dump was created)", worker->pool->poolnum, (int)worker->pid, WTERMSIG(childstatus)); + } else if (WTERMSIG(childstatus) == SIGALRM) { + moonbr_log(LOG_WARNING, "Child process in pool #%i with PID %i exited prematurely due to timeout", worker->pool->poolnum, (int)worker->pid); + } else { + moonbr_log(LOG_ERR, "Child process in pool #%i with PID %i died from signal %i", worker->pool->poolnum, (int)worker->pid, WTERMSIG(childstatus)); + } + } else { + moonbr_log(LOG_CRIT, "Illegal exit status from child process in pool #%i with PID %i", worker->pool->poolnum, (int)worker->pid); + moonbr_terminate_error(); + } + if (moonbr_stat) { + moonbr_log(LOG_INFO, "Resource usage in pool #%i for PID %i: user time %s", worker->pool->poolnum, (int)worker->pid, moonbr_format_timeval(&childusage.ru_utime)); + moonbr_log(LOG_INFO, "Resource usage in pool #%i for PID %i: system time %s", worker->pool->poolnum, (int)worker->pid, moonbr_format_timeval(&childusage.ru_stime)); + moonbr_try_destroy_worker_stat("max resident set size", ru_maxrss); + moonbr_try_destroy_worker_stat("integral shared memory size", ru_ixrss); + moonbr_try_destroy_worker_stat("integral unshared data", ru_idrss); + moonbr_try_destroy_worker_stat("integral unshared stack", ru_isrss); + moonbr_try_destroy_worker_stat("page replaims", ru_minflt); + moonbr_try_destroy_worker_stat("page faults", ru_majflt); + moonbr_try_destroy_worker_stat("swaps", ru_nswap); + moonbr_try_destroy_worker_stat("block input operations", ru_inblock); + moonbr_try_destroy_worker_stat("block output operations", ru_oublock); + moonbr_try_destroy_worker_stat("messages sent", ru_msgsnd); + moonbr_try_destroy_worker_stat("messages received", ru_msgrcv); + moonbr_try_destroy_worker_stat("signals received", ru_nsignals); + moonbr_try_destroy_worker_stat("voluntary context switches", ru_nvcsw); + moonbr_try_destroy_worker_stat("involuntary context switches", ru_nivcsw); + } + } + { + int retval = ( + (worker->idle || worker->assigned) ? + MOONBR_DESTROY_IDLE_OR_ASSIGNED : + MOONBR_DESTROY_PREPARE + ); + if (worker->prev_worker) worker->prev_worker->next_worker = worker->next_worker; + else worker->pool->first_worker = worker->next_worker; + if (worker->next_worker) worker->next_worker->prev_worker = worker->prev_worker; + else worker->pool->last_worker = worker->prev_worker; + if (worker->idle) { + if (worker->prev_idle_worker) worker->prev_idle_worker->next_idle_worker = worker->next_idle_worker; + else worker->pool->first_idle_worker = worker->next_idle_worker; + if (worker->next_idle_worker) worker->next_idle_worker->prev_idle_worker = worker->prev_idle_worker; + else worker->pool->last_idle_worker = worker->prev_idle_worker; + worker->pool->idle_worker_count--; + } + if (!worker->assigned) worker->pool->unassigned_worker_count--; + worker->pool->total_worker_count--; + worker->pool->worker_count_stat = 1; + if (worker->errorlinebuf) free(worker->errorlinebuf); + free(worker); + return retval; + } +} + +/* Marks a worker as idle and stores it in a queue, optionally setting 'idle_expiration' value */ +static void moonbr_add_idle_worker(struct moonbr_worker *worker) { + worker->prev_idle_worker = worker->pool->last_idle_worker; + if (worker->prev_idle_worker) worker->prev_idle_worker->next_idle_worker = worker; + else worker->pool->first_idle_worker = worker; + worker->pool->last_idle_worker = worker; + worker->idle = 1; + worker->pool->idle_worker_count++; + if (worker->assigned) { + worker->assigned = 0; + worker->pool->unassigned_worker_count++; + } + worker->pool->worker_count_stat = 1; + if (timerisset(&worker->pool->idle_timeout)) { + struct timeval now; + moonbr_now(&now); + timeradd(&now, &worker->pool->idle_timeout, &worker->idle_expiration); + } +} + +/* Pops a worker from the queue of idle workers (idle queue must not be empty) */ +static struct moonbr_worker *moonbr_pop_idle_worker(struct moonbr_pool *pool) { + struct moonbr_worker *worker; + worker = pool->first_idle_worker; + pool->first_idle_worker = worker->next_idle_worker; + if (pool->first_idle_worker) pool->first_idle_worker->prev_idle_worker = NULL; + else pool->last_idle_worker = NULL; + worker->next_idle_worker = NULL; + worker->idle = 0; + worker->pool->idle_worker_count--; + worker->assigned = 1; + worker->pool->unassigned_worker_count--; + worker->pool->worker_count_stat = 1; + return worker; +} + + +/*** Functions for queues of 'struct moonbr_listener' ***/ + +/* Appends a 'struct moonbr_listener' to the queue of idle listeners and registers it for poll() */ +static void moonbr_add_idle_listener(struct moonbr_listener *listener) { + listener->prev_listener = listener->pool->last_idle_listener; + if (listener->prev_listener) listener->prev_listener->next_listener = listener; + else listener->pool->first_idle_listener = listener; + listener->pool->last_idle_listener = listener; + if (listener->pollidx != -1) moonbr_poll_fds[listener->pollidx].events |= POLLIN; +} + +/* Removes a 'struct moonbr_listener' from the queue of idle listeners and unregisters it from poll() */ +static void moonbr_remove_idle_listener(struct moonbr_listener *listener) { + if (listener->prev_listener) listener->prev_listener->next_listener = listener->next_listener; + else listener->pool->first_idle_listener = listener->next_listener; + if (listener->next_listener) listener->next_listener->prev_listener = listener->prev_listener; + else listener->pool->last_idle_listener = listener->prev_listener; + listener->prev_listener = NULL; + listener->next_listener = NULL; + if (listener->pollidx != -1) moonbr_poll_fds[listener->pollidx].events &= ~POLLIN; +} + +/* Adds a listener to the queue of connected listeners (i.e. waiting to have their incoming connection accepted) */ +static void moonbr_add_connected_listener(struct moonbr_listener *listener) { + listener->prev_listener = listener->pool->last_connected_listener; + if (listener->prev_listener) listener->prev_listener->next_listener = listener; + else listener->pool->first_connected_listener = listener; + listener->pool->last_connected_listener = listener; +} + +/* Removes and returns the first connected listener in the queue */ +static struct moonbr_listener *moonbr_pop_connected_listener(struct moonbr_pool *pool) { + struct moonbr_listener *listener = pool->first_connected_listener; + listener->pool->first_connected_listener = listener->next_listener; + if (listener->pool->first_connected_listener) listener->pool->first_connected_listener->prev_listener = NULL; + else listener->pool->last_connected_listener = NULL; + listener->next_listener = NULL; + return listener; +} + + +/*** Functions to handle polling ***/ + +/* Returns an index to a new initialized entry in moonbr_poll_fds[] */ +int moonbr_poll_fds_nextindex() { + if (moonbr_poll_fds_count >= moonbr_poll_fds_bufsize) { + if (moonbr_poll_fds_bufsize) moonbr_poll_fds_bufsize *= 2; + else moonbr_poll_fds_bufsize = 1; + moonbr_poll_fds = realloc( + moonbr_poll_fds, moonbr_poll_fds_bufsize * sizeof(struct pollfd) + ); + if (!moonbr_poll_fds) { + moonbr_log(LOG_CRIT, "Memory allocation error"); + moonbr_terminate_error(); + } + } + moonbr_poll_fds[moonbr_poll_fds_count] = (struct pollfd){0, }; + return moonbr_poll_fds_count++; +} + +/* Returns an index to a new initialized entry in moonbr_poll_workers[] */ +int moonbr_poll_workers_nextindex() { + if (moonbr_poll_worker_count >= moonbr_poll_workers_bufsize) { + if (moonbr_poll_workers_bufsize) moonbr_poll_workers_bufsize *= 2; + else moonbr_poll_workers_bufsize = 1; + moonbr_poll_workers = realloc( + moonbr_poll_workers, moonbr_poll_workers_bufsize * sizeof(struct moonbr_poll_worker) + ); + if (!moonbr_poll_workers) { + moonbr_log(LOG_CRIT, "Memory allocation error"); + moonbr_terminate_error(); + } + } + moonbr_poll_workers[moonbr_poll_worker_count] = (struct moonbr_poll_worker){0, }; + return moonbr_poll_worker_count++; +} + +/* Queues all listeners as idle, and initializes static part of moonbr_poll_fds[], which is passed to poll() */ +static void moonbr_poll_init() { + if (socketpair( + PF_LOCAL, + SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, + 0, + moonbr_poll_signalfds + )) { + moonbr_log(LOG_CRIT, "Could not create socket pair for signal delivery during polling: %s", strerror(errno)); + moonbr_terminate_error(); + } + { + int j = moonbr_poll_fds_nextindex(); + struct pollfd *pollfd = &moonbr_poll_fds[j]; + pollfd->fd = moonbr_poll_signalfd_read; + pollfd->events = POLLIN; + } + { + struct moonbr_pool *pool; + for (pool=moonbr_first_pool; pool; pool=pool->next_pool) { + int i; + for (i=0; ilistener_count; i++) { + struct moonbr_listener *listener = &pool->listener[i]; + if (listener->listenfd != -1) { + int j = moonbr_poll_fds_nextindex(); + listener->pollidx = j; + moonbr_poll_fds[j].fd = listener->listenfd; + } + moonbr_add_idle_listener(listener); + } + } + } + moonbr_poll_fds_static_count = moonbr_poll_fds_count; /* remember size of static part of array */ +} + +/* Disables polling of all listeners (required for clean shutdown) */ +static void moonbr_poll_shutdown() { + int i; + for (i=1; inext_pool) { + struct moonbr_worker *worker; + for (worker=pool->first_worker; worker; worker=worker->next_worker) { + if (worker->controlfd != -1) { + int j = moonbr_poll_fds_nextindex(); + int k = moonbr_poll_workers_nextindex(); + struct pollfd *pollfd = &moonbr_poll_fds[j]; + struct moonbr_poll_worker *poll_worker = &moonbr_poll_workers[k]; + pollfd->fd = worker->controlfd; + pollfd->events = POLLIN; + poll_worker->channel = MOONBR_POLL_WORKER_CONTROLCHANNEL; + poll_worker->worker = worker; + } + if (worker->errorfd != -1) { + int j = moonbr_poll_fds_nextindex(); + int k = moonbr_poll_workers_nextindex(); + struct pollfd *pollfd = &moonbr_poll_fds[j]; + struct moonbr_poll_worker *poll_worker = &moonbr_poll_workers[k]; + pollfd->fd = worker->errorfd; + pollfd->events = POLLIN; + poll_worker->channel = MOONBR_POLL_WORKER_ERRORCHANNEL; + poll_worker->worker = worker; + } + } + } + } +} + +/* resets socket and 'revents' field of moonbr_poll_fds[] for signal delivery just before poll() is called */ +static void moonbr_poll_reset_signal() { + ssize_t readcount; + char buf[1]; + moonbr_poll_fds[0].revents = 0; + while ((readcount = read(moonbr_poll_signalfd_read, buf, 1)) < 0) { + if (errno == EAGAIN) break; + if (errno != EINTR) { + moonbr_log(LOG_CRIT, "Error while reading from signal delivery socket: %s", strerror(errno)); + moonbr_terminate_error(); + } + } + if (!readcount) { + moonbr_log(LOG_CRIT, "Unexpected EOF when reading from signal delivery socket: %s", strerror(errno)); + moonbr_terminate_error(); + } +} + + +/*** Shutdown initiation ***/ + +/* Sets global variable 'moonbr_shutdown_in_progress', closes listeners, and demands worker termination */ +static void moonbr_initiate_shutdown() { + struct moonbr_pool *pool; + int i; + if (moonbr_shutdown_in_progress) { + moonbr_log(LOG_NOTICE, "Shutdown already in progress"); + return; + } + moonbr_shutdown_in_progress = 1; + moonbr_log(LOG_NOTICE, "Initiate shutdown"); + for (pool = moonbr_first_pool; pool; pool = pool->next_pool) { + for (i=0; ilistener_count; i++) { + struct moonbr_listener *listener = &pool->listener[i]; + if (listener->listenfd != -1) { + if (close(listener->listenfd) && errno != EINTR) { + moonbr_log(LOG_CRIT, "Could not close listening socket: %s", strerror(errno)); + moonbr_terminate_error(); + } + } + } + pool->pre_fork = 0; + pool->min_fork = 0; + pool->max_fork = 0; + timerclear(&pool->exit_delay); + } + moonbr_poll_shutdown(); /* avoids loops due to error condition when polling closed listeners */ +} + + +/*** Functions to communicate with child processes ***/ + +/* Tells child process to terminate */ +static void moonbr_terminate_idle_worker(struct moonbr_worker *worker) { + moonbr_send_control_message(worker, MOONBR_COMMAND_TERMINATE, -1, NULL); +} + +/* Handles status messages from child process */ +static void moonbr_read_controlchannel(struct moonbr_worker *worker) { + char controlmsg; + { + ssize_t bytes_read; + while ((bytes_read = read(worker->controlfd, &controlmsg, 1)) <= 0) { + if (bytes_read == 0 || errno == ECONNRESET) { + moonbr_log(LOG_WARNING, "Child process in pool #%i with PID %i unexpectedly closed control socket", worker->pool->poolnum, (int)worker->pid); + if (close(worker->controlfd) && errno != EINTR) { + moonbr_log(LOG_CRIT, "Error while closing control socket to child process in pool #%i with PID %i: %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); + moonbr_terminate_error(); + } + worker->controlfd = -1; + moonbr_poll_refresh_needed = 1; + return; + } + if (errno != EINTR) { + moonbr_log(LOG_CRIT, "Unexpected error while reading control socket from child process in pool #%i with PID %i: %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); + moonbr_terminate_error(); + } + } + } + if (worker->idle) { + moonbr_log(LOG_CRIT, "Unexpected data from supposedly idle child process in pool #%i with PID %i", worker->pool->poolnum, (int)worker->pid); + moonbr_terminate_error(); + } + if (moonbr_debug) { + moonbr_log(LOG_DEBUG, "Received control message from child in pool #%i with PID %i: \"%c\"", worker->pool->poolnum, (int)worker->pid, (int)controlmsg); + } + switch (controlmsg) { + case MOONBR_STATUS_IDLE: + if (moonbr_stat) { + moonbr_log(LOG_INFO, "Child process in pool #%i with PID %i reports as idle", worker->pool->poolnum, (int)worker->pid); + } + moonbr_add_idle_worker(worker); + break; + case MOONBR_STATUS_GOODBYE: + if (moonbr_stat) { + moonbr_log(LOG_INFO, "Child process in pool #%i with PID %i announced termination", worker->pool->poolnum, (int)worker->pid); + } + if (close(worker->controlfd) && errno != EINTR) { + moonbr_log(LOG_CRIT, "Error while closing control socket to child process in pool #%i with PID %i: %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); + moonbr_terminate_error(); + } + worker->controlfd = -1; + moonbr_poll_refresh_needed = 1; + break; + default: + moonbr_log(LOG_CRIT, "Received illegal data (\"%c\") while reading control socket from child process in pool #%i with PID %i", (int)controlmsg, worker->pool->poolnum, (int)worker->pid); + moonbr_terminate_error(); + } +} + +/* Handles stderr stream from child process */ +static void moonbr_read_errorchannel(struct moonbr_worker *worker) { + char staticbuf[MOONBR_MAXERRORLINELEN+1]; + char *buf = worker->errorlinebuf; + if (!buf) buf = staticbuf; + { + ssize_t bytes_read; + while ( + (bytes_read = read( + worker->errorfd, + buf + worker->errorlinelen, + MOONBR_MAXERRORLINELEN+1 - worker->errorlinelen + )) <= 0 + ) { + if (bytes_read == 0 || errno == ECONNRESET) { + if (moonbr_debug) { + moonbr_log(LOG_DEBUG, "Child process in pool #%i with PID %i closed stderr socket", worker->pool->poolnum, (int)worker->pid); + } + if (close(worker->errorfd) && errno != EINTR) { + moonbr_log(LOG_CRIT, "Error while closing stderr socket to child process in pool #%i with PID %i: %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); + moonbr_terminate_error(); + } + worker->errorfd = -1; + moonbr_poll_refresh_needed = 1; + break; + } + if (errno != EINTR) { + moonbr_log(LOG_CRIT, "Unexpected error while reading stderr from child process in pool #%i with PID %i: %s", worker->pool->poolnum, (int)worker->pid, strerror(errno)); + moonbr_terminate_error(); + } + } + worker->errorlinelen += bytes_read; + } + { + int i; + for (i=0; ierrorlinelen; i++) { + if (buf[i] == '\n') buf[i] = 0; + if (!buf[i]) { + if (worker->errorlineovf) { + worker->errorlineovf = 0; + } else { + moonbr_log(LOG_WARNING, "Error log from process in pool #%i with PID %i: %s", worker->pool->poolnum, (int)worker->pid, buf); + } + worker->errorlinelen -= i+1; + memmove(buf, buf+i+1, worker->errorlinelen); + i = -1; + } + } + if (i > MOONBR_MAXERRORLINELEN) { + buf[MOONBR_MAXERRORLINELEN] = 0; + if (!worker->errorlineovf) { + moonbr_log(LOG_WARNING, "Error log from process in pool #%i with PID %i (line has been truncated): %s", worker->pool->poolnum, (int)worker->pid, buf); + } + worker->errorlinelen = 0; + worker->errorlineovf = 1; + } + } + if (!worker->errorlinebuf && worker->errorlinelen) { /* allocate buffer on heap only if necessary */ + worker->errorlinebuf = malloc((MOONBR_MAXERRORLINELEN+1) * sizeof(char)); + if (!worker->errorlinebuf) { + moonbr_log(LOG_CRIT, "Memory allocation error"); + moonbr_terminate_error(); + } + memcpy(worker->errorlinebuf, staticbuf, worker->errorlinelen); + } +} + + +/*** Handler for incoming connections ***/ + +/* Accepts one or more incoming connections on listener socket and passes it to worker(s) popped from idle queue */ +static void moonbr_connect(struct moonbr_pool *pool) { + struct moonbr_listener *listener = moonbr_pop_connected_listener(pool); + struct moonbr_worker *worker; + switch (listener->proto) { + case MOONBR_PROTO_INTERVAL: + worker = moonbr_pop_idle_worker(pool); + if (moonbr_stat) { + moonbr_log(LOG_INFO, "Dispatching interval timer \"%s\" of pool #%i to PID %i", listener->proto_specific.interval.name, listener->pool->poolnum, (int)worker->pid); + } + worker->restart_interval_listener = listener; + moonbr_send_control_message(worker, MOONBR_SOCKETTYPE_INTERVAL, -1, listener); + /* do not push listener to queue of idle listeners yet */ + break; + case MOONBR_PROTO_LOCAL: + do { + int peerfd; + struct sockaddr_un peeraddr; + socklen_t peeraddr_len = sizeof(struct sockaddr_un); + peerfd = accept4( + listener->listenfd, + (struct sockaddr *)&peeraddr, + &peeraddr_len, + SOCK_CLOEXEC + ); + if (peerfd == -1) { + if (errno == EWOULDBLOCK) { + break; + } else if (errno == ECONNABORTED) { + moonbr_log(LOG_WARNING, "Connection aborted before accepting it (proto=\"local\", path=\"%s\")", listener->proto_specific.local.path); + break; + } else if (errno != EINTR) { + moonbr_log(LOG_ERR, "Could not accept socket connection: %s", strerror(errno)); + moonbr_terminate_error(); + } + } else { + worker = moonbr_pop_idle_worker(pool); + if (moonbr_stat) { + moonbr_log(LOG_INFO, "Dispatching local socket connection on path \"%s\" for pool #%i to PID %i", listener->proto_specific.local.path, listener->pool->poolnum, (int)worker->pid); + } + moonbr_send_control_message(worker, MOONBR_SOCKETTYPE_LOCAL, peerfd, listener); + if (close(peerfd) && errno != EINTR) { + moonbr_log(LOG_ERR, "Could not close incoming socket connection in parent process: %s", strerror(errno)); + moonbr_terminate_error(); + } + } + } while (pool->first_idle_worker); + moonbr_add_idle_listener(listener); + break; + case MOONBR_PROTO_TCP6: + do { + int peerfd; + struct sockaddr_in6 peeraddr; + socklen_t peeraddr_len = sizeof(struct sockaddr_in6); + peerfd = accept4( + listener->listenfd, + (struct sockaddr *)&peeraddr, + &peeraddr_len, + SOCK_CLOEXEC + ); + if (peerfd == -1) { + if (errno == EWOULDBLOCK) { + break; + } else if (errno == ECONNABORTED) { + moonbr_log(LOG_WARNING, "Connection aborted before accepting it (proto=\"tcp6\", port=%i)", listener->proto_specific.tcp.port); + break; + } else if (errno != EINTR) { + moonbr_log(LOG_ERR, "Could not accept socket connection: %s", strerror(errno)); + moonbr_terminate_error(); + } + } else { + worker = moonbr_pop_idle_worker(pool); + if (moonbr_stat) { + moonbr_log(LOG_INFO, "Dispatching TCP/IPv6 connection for pool #%i on port %i to PID %i", listener->pool->poolnum, listener->proto_specific.tcp.port, (int)worker->pid); + } + moonbr_send_control_message(worker, MOONBR_SOCKETTYPE_NETWORK, peerfd, listener); + if (close(peerfd) && errno != EINTR) { + moonbr_log(LOG_ERR, "Could not close incoming socket connection in parent process: %s", strerror(errno)); + moonbr_terminate_error(); + } + } + } while (pool->first_idle_worker); + moonbr_add_idle_listener(listener); + break; + case MOONBR_PROTO_TCP4: + do { + int peerfd; + struct sockaddr_in peeraddr; + socklen_t peeraddr_len = sizeof(struct sockaddr_in); + peerfd = accept4( + listener->listenfd, + (struct sockaddr *)&peeraddr, + &peeraddr_len, + SOCK_CLOEXEC + ); + if (peerfd == -1) { + if (errno == EWOULDBLOCK) { + break; + } else if (errno == ECONNABORTED) { + moonbr_log(LOG_WARNING, "Connection aborted before accepting it (proto=\"tcp4\", port=%i)", listener->proto_specific.tcp.port); + break; + } else if (errno != EINTR) { + moonbr_log(LOG_ERR, "Could not accept socket connection: %s", strerror(errno)); + moonbr_terminate_error(); + } + } else { + worker = moonbr_pop_idle_worker(pool); + if (moonbr_stat) { + moonbr_log(LOG_INFO, "Dispatching TCP/IPv4 connection for pool #%i on port %i to PID %i", listener->pool->poolnum, listener->proto_specific.tcp.port, (int)worker->pid); + } + moonbr_send_control_message(worker, MOONBR_SOCKETTYPE_NETWORK, peerfd, listener); + if (close(peerfd) && errno != EINTR) { + moonbr_log(LOG_ERR, "Could not close incoming socket connection in parent process: %s", strerror(errno)); + moonbr_terminate_error(); + } + } + } while (pool->first_idle_worker); + moonbr_add_idle_listener(listener); + break; + default: + moonbr_log(LOG_ERR, "Internal error (should not happen): Unexpected value in listener.proto field"); + moonbr_terminate_error(); + } +} + + +/*** Functions to initialize and restart interval timers ***/ + +/* Initializes all interval timers */ +static void moonbr_interval_initialize() { + struct timeval now; + struct moonbr_pool *pool; + moonbr_now(&now); + for (pool=moonbr_first_pool; pool; pool=pool->next_pool) { + int i; + for (i=0; ilistener_count; i++) { + struct moonbr_listener *listener = &pool->listener[i]; + if (listener->proto == MOONBR_PROTO_INTERVAL) { + timeradd( + &now, + &listener->proto_specific.interval.delay, + &listener->proto_specific.interval.wakeup + ); + } + } + } +} + +/* If necessary, restarts interval timers and queues interval listener as idle after a worker changed status */ +static void moonbr_interval_restart( + struct moonbr_worker *worker, + struct timeval *now /* passed to synchronize with moonbr_run() function */ +) { + struct moonbr_listener *listener = worker->restart_interval_listener; + if (listener) { + moonbr_add_idle_listener(listener); + worker->restart_interval_listener = NULL; + if (listener->proto_specific.interval.strict) { + timeradd( + &listener->proto_specific.interval.wakeup, + &listener->proto_specific.interval.delay, + &listener->proto_specific.interval.wakeup + ); + if (timercmp(&listener->proto_specific.interval.wakeup, now, <)) { + listener->proto_specific.interval.wakeup = *now; + } + } else { + timeradd( + now, + &listener->proto_specific.interval.delay, + &listener->proto_specific.interval.wakeup + ); + } + } +} + + +/*** Main loop and helper functions ***/ + +/* Stores the earliest required wakeup time in 'wait' variable */ +static void moonbr_calc_wait(struct timeval *wait, struct timeval *wakeup) { + if (!timerisset(wait) || timercmp(wakeup, wait, <)) *wait = *wakeup; +} + +/* Main loop of Moonbridge system (including initialization of signal handlers and polling structures) */ +static void moonbr_run(lua_State *L) { + struct timeval now; + struct moonbr_pool *pool; + struct moonbr_worker *worker; + struct moonbr_worker *next_worker; /* needed when worker is removed during iteration of workers */ + struct moonbr_listener *listener; + struct moonbr_listener *next_listener; /* needed when listener is removed during iteration of listeners */ + int i; + moonbr_poll_init(); /* must be executed before moonbr_signal_init() */ + moonbr_signal_init(); + moonbr_interval_initialize(); + moonbr_pstate = MOONBR_PSTATE_RUNNING; + while (1) { + struct timeval wait = {0, }; /* point in time when premature wakeup of poll() is required */ + if (moonbr_cond_interrupt) { + moonbr_log(LOG_WARNING, "Fast shutdown requested"); + moonbr_terminate(MOONBR_EXITCODE_GRACEFUL); + } + if (moonbr_cond_terminate) { + moonbr_initiate_shutdown(); + moonbr_cond_terminate = 0; + } + moonbr_cond_child = 0; /* must not be reset between moonbr_try_destroy_worker() and poll() */ + moonbr_now(&now); + for (pool=moonbr_first_pool; pool; pool=pool->next_pool) { + int terminated_worker_count = 0; /* allows shortcut for new worker creation */ + /* terminate idle workers when expired */ + if (timerisset(&pool->idle_timeout)) { + while ((worker = pool->first_idle_worker) != NULL) { + if (timercmp(&worker->idle_expiration, &now, >)) break; + moonbr_pop_idle_worker(pool); + moonbr_terminate_idle_worker(worker); + } + } + /* mark listeners as connected when incoming connection is pending */ + for (listener=pool->first_idle_listener; listener; listener=next_listener) { + next_listener = listener->next_listener; /* extra variable necessary due to changing list */ + if (listener->pollidx != -1) { + if (moonbr_poll_fds[listener->pollidx].revents) { + moonbr_poll_fds[listener->pollidx].revents = 0; + moonbr_remove_idle_listener(listener); + moonbr_add_connected_listener(listener); + } + } else if (listener->proto == MOONBR_PROTO_INTERVAL) { + if (!timercmp(&listener->proto_specific.interval.wakeup, &now, >)) { + moonbr_remove_idle_listener(listener); + moonbr_add_connected_listener(listener); + } + } else { + moonbr_log(LOG_CRIT, "Internal error (should not happen): Listener is neither an interval timer nor has the 'pollidx' value set"); + moonbr_terminate_error(); + } + } + /* process input from child processes */ + for (i=0; ichannel) { + case MOONBR_POLL_WORKER_CONTROLCHANNEL: + moonbr_read_controlchannel(poll_worker->worker); + moonbr_interval_restart(poll_worker->worker, &now); + break; + case MOONBR_POLL_WORKER_ERRORCHANNEL: + moonbr_read_errorchannel(poll_worker->worker); + break; + } + } + } + /* collect dead child processes */ + for (worker=pool->first_worker; worker; worker=next_worker) { + next_worker = worker->next_worker; /* extra variable necessary due to changing list */ + switch (moonbr_try_destroy_worker(worker)) { + case MOONBR_DESTROY_PREPARE: + pool->use_fork_error_wakeup = 1; + break; + case MOONBR_DESTROY_IDLE_OR_ASSIGNED: + terminated_worker_count++; + break; + } + } + /* connect listeners with idle workers */ + if (!moonbr_shutdown_in_progress) { + while (pool->first_connected_listener && pool->first_idle_worker) { + moonbr_connect(pool); + } + } + /* create new worker processes */ + while ( + pool->total_worker_count < pool->max_fork && ( + pool->unassigned_worker_count < pool->pre_fork || + pool->total_worker_count < pool->min_fork + ) + ) { + if (pool->use_fork_error_wakeup) { + if (timercmp(&pool->fork_error_wakeup, &now, >)) { + moonbr_calc_wait(&wait, &pool->fork_error_wakeup); + break; + } + } else { + if (terminated_worker_count) { + terminated_worker_count--; + } else if (timercmp(&pool->fork_wakeup, &now, >)) { + moonbr_calc_wait(&wait, &pool->fork_wakeup); + break; + } + } + if (moonbr_create_worker(pool, L)) { + /* on error, enforce error delay */ + timeradd(&now, &pool->fork_error_delay, &pool->fork_error_wakeup); + pool->use_fork_error_wakeup = 1; + moonbr_calc_wait(&wait, &pool->fork_error_wakeup); + break; + } else { + /* normal fork delay on success */ + timeradd(&now, &pool->fork_delay, &pool->fork_wakeup); + timeradd(&now, &pool->fork_error_delay, &pool->fork_error_wakeup); + pool->use_fork_error_wakeup = 0; /* gets set later if error occures during preparation */ + } + } + /* terminate excessive worker processes */ + while ( + pool->total_worker_count > pool->min_fork && + pool->idle_worker_count > pool->pre_fork + ) { + if (timerisset(&pool->exit_wakeup)) { + if (timercmp(&pool->exit_wakeup, &now, >)) { + moonbr_calc_wait(&wait, &pool->exit_wakeup); + break; + } + moonbr_terminate_idle_worker(moonbr_pop_idle_worker(pool)); + timeradd(&now, &pool->exit_delay, &pool->exit_wakeup); + } else { + timeradd(&now, &pool->exit_delay, &pool->exit_wakeup); + break; + } + } + if (!( + pool->total_worker_count > pool->min_fork && + pool->idle_worker_count > pool->pre_fork + )) { + timerclear(&pool->exit_wakeup); /* timer gets restarted later when there are excessive workers */ + } + /* optionally output worker count stats */ + if (moonbr_stat && pool->worker_count_stat) { + pool->worker_count_stat = 0; + moonbr_log( + LOG_INFO, + "Worker count for pool #%i: %i idle, %i assigned, %i total", + pool->poolnum, pool->idle_worker_count, + pool->total_worker_count - pool->unassigned_worker_count, + pool->total_worker_count); + } + /* calculate wakeup time for interval listeners */ + for (listener=pool->first_idle_listener; listener; listener=listener->next_listener) { + if (listener->proto == MOONBR_PROTO_INTERVAL) { + moonbr_calc_wait(&wait, &listener->proto_specific.interval.wakeup); + } + } + /* calculate wakeup time for idle workers (only first idle worker is significant) */ + if (timerisset(&pool->idle_timeout) && pool->first_idle_worker) { + moonbr_calc_wait(&wait, &pool->first_idle_worker->idle_expiration); + } + } + /* check if shutdown is complete */ + if (moonbr_shutdown_in_progress) { + for (pool=moonbr_first_pool; pool; pool=pool->next_pool) { + if (pool->first_worker) break; + } + if (!pool) { + moonbr_log(LOG_INFO, "All worker threads have terminated"); + moonbr_terminate(MOONBR_EXITCODE_GRACEFUL); + } + } + if (moonbr_poll_refresh_needed) moonbr_poll_refresh(); + moonbr_cond_poll = 1; + if (!moonbr_cond_child && !moonbr_cond_terminate && !moonbr_cond_interrupt) { + int timeout; + if (timerisset(&wait)) { + if (timercmp(&wait, &now, <)) { + moonbr_log(LOG_CRIT, "Internal error (should not happen): Future is in the past"); + moonbr_terminate_error(); + } + timersub(&wait, &now, &wait); + timeout = wait.tv_sec * 1000 + wait.tv_usec / 1000; + } else { + timeout = INFTIM; + } + if (moonbr_debug) { + moonbr_log(LOG_DEBUG, "Waiting for I/O"); + } + poll(moonbr_poll_fds, moonbr_poll_fds_count, timeout); + } else { + if (moonbr_debug) { + moonbr_log(LOG_DEBUG, "Do not wait for I/O"); + } + } + moonbr_cond_poll = 0; + moonbr_poll_reset_signal(); + } +} + + +/*** Lua interface ***/ + +static int moonbr_lua_panic(lua_State *L) { + const char *errmsg; + errmsg = lua_tostring(L, -1); + if (!errmsg) { + if (lua_isnoneornil(L, -1)) errmsg = "(error message is nil)"; + else errmsg = "(error message is not a string)"; + } + if (moonbr_pstate == MOONBR_PSTATE_FORKED) { + fprintf(stderr, "Uncaught Lua error: %s\n", errmsg); + exit(1); + } else { + moonbr_log(LOG_CRIT, "Uncaught Lua error: %s", errmsg); + moonbr_terminate_error(); + } + return 0; +} + +static int moonbr_addtraceback(lua_State *L) { + luaL_traceback(L, L, luaL_tolstring(L, 1, NULL), 1); + return 1; +} + +/* Memory allocator that allows limiting memory consumption */ +static void *moonbr_alloc (void *ud, void *ptr, size_t osize, size_t nsize) { + (void)ud; /* not used */ + if (nsize == 0) { + if (ptr) { + moonbr_memory_usage -= osize; + free(ptr); + } + return NULL; + } else if (ptr) { + if ( + moonbr_memory_limit && + nsize > osize && + moonbr_memory_usage + (nsize - osize) > moonbr_memory_limit + ) { + return NULL; + } else { + ptr = realloc(ptr, nsize); + if (ptr) moonbr_memory_usage += nsize - osize; + } + } else { + if ( + moonbr_memory_limit && + moonbr_memory_usage + nsize > moonbr_memory_limit + ) { + return NULL; + } else { + ptr = realloc(ptr, nsize); + if (ptr) moonbr_memory_usage += nsize; + } + } + return ptr; +} + +/* New method for Lua file objects: read until terminator or length exceeded */ +static int moonbr_readuntil(lua_State *L) { + luaL_Stream *stream; + FILE *file; + const char *terminatorstr; + size_t terminatorlen; + luaL_Buffer buf; + lua_Integer maxlen; + char terminator; + int byte; + stream = luaL_checkudata(L, 1, LUA_FILEHANDLE); + terminatorstr = luaL_checklstring(L, 2, &terminatorlen); + luaL_argcheck(L, terminatorlen == 1, 2, "single byte expected"); + maxlen = luaL_optinteger(L, 3, 0); + if (!stream->closef) luaL_error(L, "attempt to use a closed file"); + file = stream->f; + luaL_buffinit(L, &buf); + if (!maxlen) maxlen = -1; + terminator = terminatorstr[0]; + while (maxlen > 0 ? maxlen-- : maxlen) { + byte = fgetc(file); + if (byte == EOF) { + if (ferror(file)) { + char errmsg[MOONBR_MAXSTRERRORLEN]; + strerror_r(errno, errmsg, MOONBR_MAXSTRERRORLEN); /* use thread-safe call in case child created threads */ + luaL_error(L, "%s", errmsg); + } else { + break; + } + } + luaL_addchar(&buf, byte); + if (byte == terminator) break; + } + luaL_pushresult(&buf); + if (!lua_rawlen(L, -1)) lua_pushnil(L); + return 1; +} + +static int moonbr_lua_tonatural(lua_State *L, int idx) { + int isnum; + lua_Number n; + n = lua_tonumberx(L, idx, &isnum); + if (isnum && n>=0 && n=0 && n<=100000000) { + value->tv_sec = n; + value->tv_usec = 1e6 * (n - value->tv_sec); + return 1; + } else { + return 0; + } +} + +static int moonbr_timeout(lua_State *L) { + struct itimerval oldval; + if (lua_isnoneornil(L, 1) && lua_isnoneornil(L, 2)) { + getitimer(ITIMER_REAL, &oldval); + } else { + struct itimerval newval = {}; + if (lua_toboolean(L, 1)) { + luaL_argcheck( + L, moonbr_lua_totimeval(L, 1, &newval.it_value), 1, + "interval in seconds expected" + ); + } + if (lua_isnoneornil(L, 2)) { + if (setitimer(ITIMER_REAL, &newval, &oldval)) { + moonbr_log(LOG_CRIT, "Could not set ITIMER_REAL via setitimer()"); + moonbr_terminate_error(); + } + } else { + getitimer(ITIMER_REAL, &oldval); + if (timercmp(&newval.it_value, &oldval.it_value, <)) { + struct itimerval remval; + if (setitimer(ITIMER_REAL, &newval, NULL)) { + moonbr_log(LOG_CRIT, "Could not set ITIMER_REAL via setitimer()"); + moonbr_terminate_error(); + } + lua_call(L, lua_gettop(L) - 2, LUA_MULTRET); + getitimer(ITIMER_REAL, &remval); + timersub(&oldval.it_value, &newval.it_value, &newval.it_value); + timeradd(&newval.it_value, &remval.it_value, &newval.it_value); + if (setitimer(ITIMER_REAL, &newval, NULL)) { + moonbr_log(LOG_CRIT, "Could not set ITIMER_REAL via setitimer()"); + moonbr_terminate_error(); + } + } else { + lua_call(L, lua_gettop(L) - 2, LUA_MULTRET); + } + return lua_gettop(L) - 1; + } + } + lua_pushnumber(L, oldval.it_value.tv_sec + 1e-6 * oldval.it_value.tv_usec); + return 1; +} + +#define moonbr_listen_init_pool_forkoption(luaname, cname, defval) { \ + lua_getfield(L, 2, luaname); \ + pool->cname = lua_isnil(L, -1) ? (defval) : moonbr_lua_tonatural(L, -1); \ +} while(0) + +#define moonbr_listen_init_pool_timeoption(luaname, cname, defval, defvalu) ( \ + lua_getfield(L, 2, luaname), \ + lua_isnil(L, -1) ? ( \ + pool->cname.tv_sec = (defval), pool->cname.tv_usec = (defvalu), \ + 1 \ + ) : ( \ + (lua_isboolean(L, -1) && !lua_toboolean(L, -1)) ? ( \ + pool->cname.tv_sec = 0, pool->cname.tv_usec = 0, \ + 1 \ + ) : ( \ + moonbr_lua_totimeval(L, -1, &pool->cname) \ + ) \ + ) \ +) + +static int moonbr_listen_init_pool(lua_State *L) { + struct moonbr_pool *pool; + const char *proto; + int i; + pool = lua_touserdata(L, 1); + for (i=0; ilistener_count; i++) { + struct moonbr_listener *listener = &pool->listener[i]; + lua_settop(L, 2); + lua_pushinteger(L, i+1); + lua_gettable(L, 2); + lua_getfield(L, 3, "proto"); + proto = lua_tostring(L, -1); + if (proto && !strcmp(proto, "interval")) { + listener->proto = MOONBR_PROTO_INTERVAL; + lua_getfield(L, 3, "name"); + { + const char *name = lua_tostring(L, -1); + if (name) { + if (asprintf(&listener->proto_specific.interval.name, "%s", name) < 0) { + moonbr_log(LOG_CRIT, "Memory allocation_error"); + moonbr_terminate_error(); + } + } + } + lua_getfield(L, 3, "delay"); + if ( + !moonbr_lua_totimeval(L, -1, &listener->proto_specific.interval.delay) || + !timerisset(&listener->proto_specific.interval.delay) + ) { + luaL_error(L, "No valid interval delay specified; use listen{{proto=\"interval\", delay=...}, ...}"); + } + lua_getfield(L, 3, "strict"); + if (!lua_isnil(L, -1)) { + if (lua_isboolean(L, -1)) { + if (lua_toboolean(L, -1)) listener->proto_specific.interval.strict = 1; + } else { + luaL_error(L, "Option \"strict\" must be a boolean if set; use listen{{proto=\"interval\", strict=true, ...}, ...}"); + } + } + } else if (proto && !strcmp(proto, "local")) { + listener->proto = MOONBR_PROTO_LOCAL; + lua_getfield(L, 3, "path"); + { + const char *path = lua_tostring(L, -1); + if (!path) { + luaL_error(L, "No valid path specified for local socket; use listen{{proto=\"local\", path=...}, ...}"); + } + if (asprintf(&listener->proto_specific.local.path, "%s", path) < 0) { + moonbr_log(LOG_CRIT, "Memory allocation_error"); + moonbr_terminate_error(); + } + } + } else if (proto && !strcmp(proto, "tcp6")) { + listener->proto = MOONBR_PROTO_TCP6; + lua_getfield(L, 3, "port"); + listener->proto_specific.tcp.port = lua_tointeger(L, -1); + if ( + listener->proto_specific.tcp.port < 1 || + listener->proto_specific.tcp.port > 65535 + ) { + luaL_error(L, "No valid port number specified; use listen{{proto=\"tcp6\", port=...}, ...}"); + } + lua_getfield(L, 3, "localhost"); + if (!lua_isnil(L, -1)) { + if (lua_isboolean(L, -1)) { + if (lua_toboolean(L, -1)) listener->proto_specific.tcp.localhost_only = 1; + } else { + luaL_error(L, "Option \"localhost\" must be a boolean if set; use listen{{proto=\"tcp6\", localhost=true, ...}, ...}"); + } + } + } else if (proto && !strcmp(proto, "tcp4")) { + listener->proto = MOONBR_PROTO_TCP4; + lua_getfield(L, 3, "port"); + listener->proto_specific.tcp.port = lua_tointeger(L, -1); + if ( + listener->proto_specific.tcp.port < 1 || + listener->proto_specific.tcp.port > 65535 + ) { + luaL_error(L, "No valid port number specified; use listen{{proto=\"tcp4\", port=...}, ...}"); + } + lua_getfield(L, 3, "localhost"); + if (!lua_isnil(L, -1)) { + if (lua_isboolean(L, -1)) { + if (lua_toboolean(L, -1)) listener->proto_specific.tcp.localhost_only = 1; + } else { + luaL_error(L, "Option \"localhost\" must be a boolean if set; use listen{{proto=\"tcp4\", localhost=true, ...}, ...}"); + } + } + } + } + lua_settop(L, 2); + moonbr_listen_init_pool_forkoption("pre_fork", pre_fork, 1); + moonbr_listen_init_pool_forkoption("min_fork", min_fork, pool->pre_fork > 2 ? pool->pre_fork : 2); + moonbr_listen_init_pool_forkoption("max_fork", max_fork, pool->min_fork > 16 ? pool->min_fork : 16); + if (!moonbr_listen_init_pool_timeoption("fork_delay", fork_delay, 1, 0)) { + luaL_error(L, "Option \"fork_delay\" is expected to be a non-negative number"); + } + if (!moonbr_listen_init_pool_timeoption("fork_error_delay", fork_error_delay, 2, 0)) { + luaL_error(L, "Option \"fork_error_delay\" is expected to be a non-negative number"); + } + if (!moonbr_listen_init_pool_timeoption("exit_delay", exit_delay, 60, 0)) { + luaL_error(L, "Option \"exit_delay\" is expected to be a non-negative number"); + } + if (timercmp(&pool->fork_error_delay, &pool->fork_delay, <)) { + pool->fork_error_delay = pool->fork_delay; + } + if (!moonbr_listen_init_pool_timeoption("idle_timeout", idle_timeout, 0, 0)) { + luaL_error(L, "Option \"idle_timeout\" is expected to be a non-negative number"); + } + lua_getfield(L, 2, "memory_limit"); + if (!lua_isnil(L, -1)) { + int isnum; + lua_Number n; + n = lua_tonumberx(L, -1, &isnum); + if (n < 0 || !isnum) { + luaL_error(L, "Option \"memory_limit\" is expected to be a non-negative number"); + } + pool->memory_limit = n; + } + lua_settop(L, 2); + lua_getfield(L, 2, "prepare"); + if (!lua_isnil(L, -1) && !lua_isfunction(L, -1)) { + luaL_error(L, "Option \"prepare\" must be nil or a function"); + } + lua_rawsetp(L, LUA_REGISTRYINDEX, moonbr_luakey_prepare_func(pool)); + lua_getfield(L, 2, "connect"); + if (!lua_isfunction(L, -1)) { + luaL_error(L, "Option \"connect\" must be a function; use listen{{...}, {...}, connect=function(socket) ... end, ...}"); + } + lua_rawsetp(L, LUA_REGISTRYINDEX, moonbr_luakey_connect_func(pool)); + lua_getfield(L, 2, "finish"); + if (!lua_isnil(L, -1) && !lua_isfunction(L, -1)) { + luaL_error(L, "Option \"finish\" must be nil or a function"); + } + lua_rawsetp(L, LUA_REGISTRYINDEX, moonbr_luakey_finish_func(pool)); + return 0; +} + +static int moonbr_listen(lua_State *L) { + struct moonbr_pool *pool; + lua_Integer listener_count; + if (moonbr_booted) luaL_error(L, "Moonbridge bootup is already complete"); + luaL_checktype(L, 1, LUA_TTABLE); + listener_count = luaL_len(L, 1); + if (!listener_count) luaL_error(L, "No listen ports specified; use listen{{proto=..., port=...},...}"); + if (listener_count > 100) luaL_error(L, "Too many listeners"); + pool = moonbr_create_pool(listener_count); + lua_pushcfunction(L, moonbr_listen_init_pool); + lua_pushlightuserdata(L, pool); + lua_pushvalue(L, 1); + if (lua_pcall(L, 2, 0, 0)) goto moonbr_listen_error; + { + int i; + i = moonbr_start_pool(pool); + if (i >= 0) { + struct moonbr_listener *listener = &pool->listener[i]; + switch (listener->proto) { + case MOONBR_PROTO_INTERVAL: + lua_pushfstring(L, "Could not initialize listener #%d (proto=\"interval\"): %s", i+1, strerror(errno)); + break; + case MOONBR_PROTO_LOCAL: + lua_pushfstring(L, "Could not initialize listener #%d (proto=\"local\", path=\"%s\"): %s", i+1, listener->proto_specific.local.path, strerror(errno)); + break; + case MOONBR_PROTO_TCP6: + lua_pushfstring(L, "Could not initialize listener #%d (proto=\"tcp6\", port=%d): %s", i+1, listener->proto_specific.tcp.port, strerror(errno)); + break; + case MOONBR_PROTO_TCP4: + lua_pushfstring(L, "Could not initialize listener #%d (proto=\"tcp4\", port=%d): %s", i+1, listener->proto_specific.tcp.port, strerror(errno)); + break; + default: + moonbr_log(LOG_ERR, "Internal error (should not happen): Unexpected value in listener.proto field"); + moonbr_terminate_error(); + } + goto moonbr_listen_error; + } + } + return 0; + moonbr_listen_error: + moonbr_destroy_pool(pool); + lua_pushnil(L); + lua_rawsetp(L, LUA_REGISTRYINDEX, moonbr_luakey_prepare_func(pool)); + lua_pushnil(L); + lua_rawsetp(L, LUA_REGISTRYINDEX, moonbr_luakey_connect_func(pool)); + lua_pushnil(L); + lua_rawsetp(L, LUA_REGISTRYINDEX, moonbr_luakey_finish_func(pool)); + lua_error(L); + return 0; // avoid compiler warning +} + + +/*** Main function and command line invokation ***/ + +static void moonbr_usage(int err, const char *cmd) { + FILE *out; + out = err ? stderr : stdout; + if (!cmd) cmd = "moonbridge"; + fprintf(out, "Get this help message: %s {-h|--help}\n", cmd); + fprintf(out, "Usage: %s \\\n", cmd); + fprintf(out, " [-b|--background] \\\n"); + fprintf(out, " [-d|--debug] \\\n"); + fprintf(out, " [-f|--logfacility {DAEMON|USER|0|1|...|7}] \\\n"); + fprintf(out, " [-i|--logident \\\n"); + fprintf(out, " [-l|--logfile ] \\\n"); + fprintf(out, " [-p|--pidfile ] \\\n"); + fprintf(out, " [-s|--stats] \\\n"); + fprintf(out, " [ ...]\n"); + exit(err); +} + +#define moonbr_usage_error() moonbr_usage(MOONBR_EXITCODE_CMDLINEERROR, argc ? argv[0] : NULL) + +int main(int argc, char **argv) { + { + int daemonize = 0; + int log_facility = LOG_USER; + const char *log_ident = "moonbridge"; + const char *log_filename = NULL; + const char *pid_filename = NULL; + int option; + struct option longopts[] = { + { "background", no_argument, NULL, 'b' }, + { "debug", no_argument, NULL, 'd' }, + { "logfacility", required_argument, NULL, 'f' }, + { "help", no_argument, NULL, 'h' }, + { "logident", required_argument, NULL, 'i' }, + { "logfile", required_argument, NULL, 'l' }, + { "pidfile", required_argument, NULL, 'p' }, + { "stats", no_argument, NULL, 's' } + }; + while ((option = getopt_long(argc, argv, "bdf:hi:l:p:s", longopts, NULL)) != -1) { + switch (option) { + case 'b': + daemonize = 1; + break; + case 'd': + moonbr_debug = 1; + moonbr_stat = 1; + break; + case 'f': + if (!strcmp(optarg, "DAEMON")) { + log_facility = LOG_DAEMON; + } else if (!strcmp(optarg, "USER")) { + log_facility = LOG_USER; + } else if (!strcmp(optarg, "0")) { + log_facility = LOG_LOCAL0; + } else if (!strcmp(optarg, "1")) { + log_facility = LOG_LOCAL1; + } else if (!strcmp(optarg, "2")) { + log_facility = LOG_LOCAL2; + } else if (!strcmp(optarg, "3")) { + log_facility = LOG_LOCAL3; + } else if (!strcmp(optarg, "4")) { + log_facility = LOG_LOCAL4; + } else if (!strcmp(optarg, "5")) { + log_facility = LOG_LOCAL5; + } else if (!strcmp(optarg, "6")) { + log_facility = LOG_LOCAL6; + } else if (!strcmp(optarg, "7")) { + log_facility = LOG_LOCAL7; + } else { + moonbr_usage_error(); + } + moonbr_use_syslog = 1; + break; + case 'h': + moonbr_usage(MOONBR_EXITCODE_GRACEFUL, argv[0]); + break; + case 'i': + log_ident = optarg; + moonbr_use_syslog = 1; + break; + case 'l': + log_filename = optarg; + break; + case 'p': + pid_filename = optarg; + break; + case 's': + moonbr_stat = 1; + break; + default: + moonbr_usage_error(); + } + } + if (argc - optind <= 0) moonbr_usage_error(); + if (pid_filename) { + pid_t otherpid; + while ((moonbr_pidfh = pidfile_open(pid_filename, 0644, &otherpid)) == NULL) { + if (errno == EEXIST) { + if (otherpid == -1) { + fprintf(stderr, "PID file \"%s\" is already locked\n", pid_filename); + } else { + fprintf(stderr, "PID file \"%s\" is already locked by process with PID: %i\n", pid_filename, (int)otherpid); + } + exit(MOONBR_EXITCODE_ALREADYRUNNING); + } else if (errno != EINTR) { + fprintf(stderr, "Could not write PID file \"%s\": %s\n", pid_filename, strerror(errno)); + exit(MOONBR_EXITCODE_STARTUPERROR); + } + } + } + if (log_filename) { + int logfd; + while ( + ( logfd = flopen( + log_filename, + O_WRONLY|O_NONBLOCK|O_CREAT|O_APPEND|O_CLOEXEC, + 0640 + ) + ) < 0 + ) { + if (errno == EWOULDBLOCK) { + fprintf(stderr, "Logfile \"%s\" is locked\n", log_filename); + exit(MOONBR_EXITCODE_ALREADYRUNNING); + } else if (errno != EINTR) { + fprintf(stderr, "Could not open logfile \"%s\": %s\n", log_filename, strerror(errno)); + exit(MOONBR_EXITCODE_STARTUPERROR); + } + } + moonbr_logfile = fdopen(logfd, "a"); + if (!moonbr_logfile) { + fprintf(stderr, "Could not open write stream to logfile \"%s\": %s\n", log_filename, strerror(errno)); + exit(MOONBR_EXITCODE_STARTUPERROR); + } + } + if (daemonize == 0 && !moonbr_logfile) moonbr_logfile = stderr; + if (moonbr_logfile) setlinebuf(moonbr_logfile); + else moonbr_use_syslog = 1; + if (moonbr_use_syslog) openlog(log_ident, LOG_NDELAY | LOG_PID, log_facility); + if (daemonize) { + if (daemon(1, 0)) { + moonbr_log(LOG_ERR, "Could not daemonize moonbridge process"); + moonbr_terminate_error(); + } + } + } + moonbr_log(LOG_NOTICE, "Starting moonbridge server"); + if (moonbr_pidfh && pidfile_write(moonbr_pidfh)) { + moonbr_log(LOG_ERR, "Could not write pidfile (after locking)"); + } + { + lua_State *L; + L = lua_newstate(moonbr_alloc, NULL); + if (!L) { + moonbr_log(LOG_CRIT, "Could not initialize Lua state"); + moonbr_terminate_error(); + } + lua_atpanic(L, moonbr_lua_panic); + luaL_openlibs(L); + if (luaL_newmetatable(L, LUA_FILEHANDLE)) { + moonbr_log(LOG_CRIT, "Lua metatable LUA_FILEHANDLE does not exist"); + moonbr_terminate_error(); + } + lua_getfield(L, -1, "__index"); + lua_pushcfunction(L, moonbr_readuntil); + lua_setfield(L, -2, "readuntil"); + lua_pop(L, 2); + lua_pushcfunction(L, moonbr_timeout); + lua_setglobal(L, "timeout"); + lua_pushcfunction(L, moonbr_listen); + lua_setglobal(L, "listen"); + lua_pushcfunction(L, moonbr_addtraceback); // on stack position 1 + { + int i; + for (i=optind; i