# HG changeset patch # User jbe # Date 1432168839 -7200 # Node ID 831f2d4b2d7335f8305beb4c51d219577366d307 # Parent 08cf8e1865e9e534c64bfa12e84d3eb20b6ce0df Initial work on reimplemented HTTP layer (utilizing non-blocking I/O with coroutines and a cleaner object-oriented structure) diff -r 08cf8e1865e9 -r 831f2d4b2d73 moonbridge_http.lua --- a/moonbridge_http.lua Tue May 19 23:15:22 2015 +0200 +++ b/moonbridge_http.lua Thu May 21 02:40:39 2015 +0200 @@ -83,79 +83,37 @@ ["304"] = true } --- handling of GET/POST param tables: -local new_params_list -- defined later -do - local params_list_mapping = setmetatable({}, {__mode="k"}) - local function nextnonempty(tbl, key) - while true do - key = next(tbl, key) - if key == nil then - return nil - end - local value = tbl[key] - if #value > 0 then - return key, value - end +-- parses URL encoded form data: +local function read_urlencoded_form(data) + local tbl = {} + for rawkey, rawvalue in string.gmatch(data, "([^?=&]*)=([^?=&]*)") do + local key = decode_uri(rawkey) + local value = decode_uri(rawvalue) + local subtbl = tbl[key] + if subtbl then + subtbl[#subtbl+1] = value + else + tbl[key] = {value} end end - local function nextvalue(tbl, key) - key = next(tbl, key) - if key == nil then - return nil - end - return key, tbl[key][1] - end - local params_list_metatable = { - __index = function(self, key) - local tbl = {} - self[key] = tbl - return tbl - end, - __pairs = function(self) - return nextnonempty, self, nil - end - } - local params_metatable = { - __index = function(self, key) - return params_list_mapping[self][key][1] - end, - __newindex = function(self, key, value) - params_list_mapping[self][key] = {value} - end, - __pairs = function(self) - return nextvalue, params_list_mapping[self], nil - end - } - -- returns a table to store key value-list pairs (i.e. multiple values), - -- and a second table automatically mapping keys to the first value - -- using the key value-list pairs in the first table: - new_params_list = function() - local params_list = setmetatable({}, params_list_metatable) - local params = setmetatable({}, params_metatable) - params_list_mapping[params] = params_list - return params_list, params - end -end --- parses URL encoded form data and stores it in --- a key value-list pairs structure that has to be --- previously obtained by calling by new_params_list(): -local function read_urlencoded_form(tbl, data) - for rawkey, rawvalue in string.gmatch(data, "([^?=&]*)=([^?=&]*)") do - local subtbl = tbl[decode_uri(rawkey)] - subtbl[#subtbl+1] = decode_uri(rawvalue) - end + return tbl end --- function creating a HTTP handler: -function generate_handler(handler, options) - -- swap arguments if necessary (for convenience): - if type(handler) ~= "function" and type(options) == "function" then - handler, options = options, handler +-- extracts first value from each subtable: +local function get_first_values(tbl) + local newtbl = {} + for key, subtbl in pairs(tbl) do + newtbl[key] = subtbl[1] end + return newtbl +end + +request_pt = {} +request_mt = { __index = request_pt } + +function request_pt:_init(handler, options) -- process options: options = options or {} - local preamble = "" -- preamble sent with every(!) HTTP response do -- named arg "static_headers" is used to create the preamble: local s = options.static_headers @@ -176,845 +134,338 @@ end end t[#t+1] = "" - preamble = table.concat(t, "\r\n") - end - local input_chunk_size = options.maximum_input_chunk_size or options.chunk_size or 16384 - local output_chunk_size = options.minimum_output_chunk_size or options.chunk_size or 1024 - local request_idle_timeout, request_header_timeout, response_timeout - if options.request_idle_timeout ~= nil then - request_idle_timeout = options.request_idle_timeout or 0 - else - request_idle_timeout = 330 + self._preamble = table.concat(t, "\r\n") -- preamble sent with every(!) HTTP response end - if options.request_header_timeout ~= nil then - request_header_timeout = options.request_header_timeout or 0 - else - request_header_timeout = 30 - end - if options.timeout ~= nil then - response_timeout = options.timeout or 0 - else - response_timeout = 1800 + self._input_chunk_size = options.maximum_input_chunk_size or options.chunk_size or 16384 + self._output_chunk_size = options.minimum_output_chunk_size or options.chunk_size or 1024 + self._header_size_limit = options.header_size_limit or 1024*1024 + local function init_timeout(name, default) + local value = options[name] + if value == nil then + self["_"..name] = default + else + self["_"..name] = value or 0 + end end - -- return connect handler: - return function(socket) - local socket_set = {[socket] = true} -- used for moonbridge_io.poll(...) - local survive = true -- set to false if process shall be terminated later - repeat - -- process named arguments "request_header_size_limit" and "request_body_size_limit": - local remaining_header_size_limit = options.request_header_size_limit or 1024*1024 - local remaining_body_size_limit = options.request_body_size_limit or 64*1024*1024 - -- state variables for request handling: - local output_state = "no_status_sent" -- one of: - -- "no_status_sent" (initial state) - -- "info_status_sent" (1xx status code has been sent) - -- "bodyless_status_sent" (204/304 status code has been sent) - -- "status_sent" (regular status code has been sent) - -- "headers_sent" (headers have been terminated) - -- "finished" (request has been answered completely) - local content_length -- value of Content-Length header sent - local bytes_sent = 0 -- number of bytes sent if Content-Length is set - local chunk_parts = {} -- list of chunks to send - local chunk_bytes = 0 -- sum of lengths of chunks to send - local connection_close_requested = false - local connection_close_responded = false - local headers_value_nil = {} -- header values that are nil - local request_body_content_length -- Content-Length of request body - local input_state = "pending" -- one of: - -- "pending" (request body has not been processed yet) - -- "deferred" (request body processing is deferred) - -- "inprogress" (request body is currently being read) - -- "finished" (request body has been read) - local streamed_post_params = {} -- mapping from POST field name to stream function - local streamed_post_param_patterns = {} -- list of POST field pattern and stream function pairs - -- object passed to handler (with methods, GET/POST params, etc.): - local request - -- handling I/O errors (including protocol violations): - local socket_closed = false - local function assert_output(retval, errmsg) - if retval then - return retval - end - request.faulty = true - errmsg = "Could not send data to client: " .. errmsg - io.stderr:write(errmsg, "\n") - if not socket_closed then - socket_closed = true - socket:reset() - end - error("Could not send data to client: " .. errmsg) + init_timeout("request_idle_timeout", 330) + init_timeout("request_header_timeout", 30) + init_timeout("request_body_timeout", 1800) + init_timeout("response_timeout", 1830) + self._poll = options.poll_function or moonbridge_io.poll + self:_create_closure("_write_yield") + self:_create_closure("_handler") + -- table mapping header field names to value-lists: + self._headers_mt = { + __index = function(tbl, key) + local lowerkey = string.lower(key) + local result = self._headers[lowerkey] + if result == nil then + result = {} end - local function request_error(throw_error, status, text) - local errmsg = "Error while reading request from client. Error response: " .. status - if text then - errmsg = errmsg .. " (" .. text .. ")" - end - io.stderr:write(errmsg, "\n") -- needs to be written now, because of possible timeout error later - if - output_state == "no_status_sent" or - output_state == "info_status_sent" - then - local error_response_status, errmsg2 = pcall(function() - request:defer_reading() -- don't read request body (because of possibly invalid state) - request:close_after_finish() -- send "Connection: close" header - request:send_status(status) - request:send_header("Content-Type", "text/plain") - request:send_data(status, "\n") - if text then - request:send_data("\n", text, "\n") - end - request:finish() - end) - if not error_response_status and not request.faulty then - request.faulty = true - error("Unexpected error while sending error response: " .. errmsg2) - end - else - if not socket_closed then - socket_closed = true - socket:reset() - end - end - if throw_error then - request.faulty = true - error(errmsg) - else - return survive - end - end - local function assert_not_faulty() - assert(not request.faulty, "Tried to use faulty request handle") - end - -- reads a number of bytes from the socket, - -- optionally feeding these bytes chunk-wise - -- into a callback function: - local function read_body_bytes(remaining, callback) - while remaining > 0 do - local limit - if remaining > input_chunk_size then - limit = input_chunk_size - else - limit = remaining - end - local chunk = socket:read(limit) - if not chunk or #chunk ~= limit then - request_error(true, "400 Bad Request", "Unexpected EOF or read error while reading chunk of request body") - end - remaining = remaining - limit - if callback then - callback(chunk) + tbl[lowerkey] = result + tbl[key] = result + return result + end + } + -- table mapping header field names to value-lists + -- (for headers with comma separated values): + self._headers_csv_table_mt = { + __index = function(tbl, key) + local result = {} + for i, line in ipairs(self.headers[key]) do + for entry in string.gmatch(line, "[^,]+") do + local value = string.match(entry, "^[ \t]*(..-)[ \t]*$") + if value then + result[#result+1] = value end end end - -- flushes or closes the socket (depending on - -- whether "Connection: close" header was sent): - local function finish_response() - if connection_close_responded then - -- close output stream: - assert_output(socket:finish()) - -- wait for EOF of peer to avoid immediate TCP RST condition: - do - local start_time = moonbridge_io.timeref() - repeat - socket:drain_nb() - local time_left = 2 - moonbridge_io.timeref(start_time) - until time_left <= 0 or not moonbridge_io.poll(socket_set, nil, time_left) - end - -- fully close socket: - socket_closed = true -- avoid double close on error - assert_output(socket:close()) - else - assert_output(socket:flush()) - request:stream_request_body() + tbl[key] = result + return result + end + } + -- table mapping header field names to a comma separated string + -- (for headers with comma separated values): + self._headers_csv_string_mt = { + __index = function(tbl, key) + local result = {} + for i, line in ipairs(self.headers[key]) do + result[#result+1] = line + end + result = string.concat(result, ", ") + tbl[key] = result + return result + end + } + -- table mapping header field names to a single string value + -- (or false if header has been sent multiple times): + self._headers_value_mt = { + __index = function(tbl, key) + if self._headers_value_nil[key] then + return nil + end + local result = nil + local values = self.headers_csv_table[key] + if #values == 0 then + self._headers_value_nil[key] = true + elseif #values == 1 then + result = values[1] + else + result = false + end + tbl[key] = result + return result + end + } + -- table mapping header field names to a flag table, + -- indicating if the comma separated value contains certain entries: + self._headers_flags_mt = { + __index = function(tbl, key) + local result = setmetatable({}, { + __index = function(tbl, key) + local lowerkey = string.lower(key) + local result = rawget(tbl, lowerkey) or false + tbl[lowerkey] = result + tbl[key] = result + return result end - end - -- writes out buffered chunks (without flushing the socket): - local function send_chunk() - if chunk_bytes > 0 then - assert_output(socket:write(string.format("%x\r\n", chunk_bytes))) - for i = 1, #chunk_parts do - assert_output(socket:write(chunk_parts[i])) - end - chunk_parts = {} - chunk_bytes = 0 - assert_output(socket:write("\r\n")) - end + }) + for i, value in ipairs(self.headers_csv_table[key]) do + result[string.lower(value)] = true end - -- terminate header section in response, optionally flushing: - -- (may be called multiple times unless response is finished) - local function finish_headers(flush) - if output_state == "no_status_sent" then - error("HTTP status has not been sent yet") - elseif output_state == "finished" then - error("Response has already been finished") - elseif output_state == "info_status_sent" then - assert_output(socket:write("\r\n")) - assert_output(socket:flush()) - output_state = "no_status_sent" - elseif output_state == "bodyless_status_sent" then - if connection_close_requested and not connection_close_responded then - request:send_header("Connection", "close") + tbl[key] = result + return result + end + } +end + +function request_pt:_create_closure(name) + self[name.."_closure"] = function(...) + return self[name](self, ...) + end +end + +function request_pt:_create_magictable(name) + self[name] = setmetatable({}, self["_"..name.."_mt"]) +end + +function request_pt:_handler(socket) + self._socket = socket + self._survive = true + self._socket_set = {[socket] = true} + self._faulty = false + self._consume_input = self._drain_input + self._headers = {} + self._headers_value_nil = {} + self:_create_magictable("headers") + self:_create_magictable("headers_csv_table") + self:_create_magictable("headers_csv_string") + self:_create_magictable("headers_value") + self:_create_magictable("headers_flags") + repeat + -- wait for input: + if not moonbridge_io.poll(self._socket_set, nil, self._request_idle_timeout) then + self:_error("408 Request Timeout", "Idle connection timed out") + return self._survive + end + -- read headers (with timeout): + do + local coro = coroutine.wrap(self._read_headers) + local timeout = self._request_header_timeout + local starttime = timeout and moonbridge_io.timeref() + while true do + local status = coro(self) + if status == nil then + local remaining + if timeout then + remaining = timeout - moonbridge_io.timeref(starttime) end - assert_output(socket:write("\r\n")) - finish_response() - output_state = "finished" - elseif output_state == "status_sent" then - if not content_length then - assert_output(socket:write("Transfer-Encoding: chunked\r\n")) - end - if connection_close_requested and not connection_close_responded then - request:send_header("Connection", "close") + if not self._poll(self._socket_set, nil, remaining) then + self:_error("408 Request Timeout", "Timeout while receiving headers") + return self._survive end - assert_output(socket:write("\r\n")) - if request.method == "HEAD" then - finish_response() - elseif flush then - assert_output(socket:flush()) - end - output_state = "headers_sent" - elseif output_state ~= "headers_sent" then - error("Unexpected internal status in HTTP engine") + elseif status == false then + return self._survive + elseif status == true then + break + else + error("Unexpected yield value") end end - -- create request object and set several functions and values: - request = { - -- error state: - faulty = false, - -- allow raw socket access: - socket = socket, - -- parsed cookies: - cookies = {}, - -- send a HTTP response status (e.g. "200 OK"): - send_status = function(self, value) - assert_not_faulty() - if input_state == "pending" then - request:process_request_body() - end - if output_state == "info_status_sent" then - assert_output(socket:write("\r\n")) - assert_output(socket:flush()) - elseif output_state ~= "no_status_sent" then - error("HTTP status has already been sent") - end - local status1 = string.sub(value, 1, 1) - local status3 = string.sub(value, 1, 3) - assert_output(socket:write("HTTP/1.1 ", value, "\r\n", preamble)) - local without_response_body = status_without_response_body[status3] - if without_response_body then - output_state = "bodyless_status_sent" - if without_response_body == "zero_content_length" then - request:send_header("Content-Length", 0) - end - elseif status1 == "1" then - output_state = "info_status_sent" - else - output_state = "status_sent" - end - end, - -- send a HTTP response header - -- (key and value as separate args): - send_header = function(self, key, value) - assert_not_faulty() - if output_state == "no_status_sent" then - error("HTTP status has not been sent yet") - elseif - output_state ~= "info_status_sent" and - output_state ~= "bodyless_status_sent" and - output_state ~= "status_sent" - then - error("All HTTP headers have already been sent") - end - local key_lower = string.lower(key) - if key_lower == "content-length" then - if output_state == "info_status_sent" then - error("Cannot set Content-Length for informational status response") - end - local new_content_length = assert(tonumber(value), "Invalid content-length") - if content_length == nil then - content_length = new_content_length - elseif content_length == new_content_length then - return - else - error("Content-Length has been set multiple times with different values") - end - elseif key_lower == "connection" then - for entry in string.gmatch(string.lower(value), "[^,]+") do - if string.match(entry, "^[ \t]*close[ \t]*$") then - if output_state == "info_status_sent" then - error("Cannot set \"Connection: close\" for informational status response") - end - connection_close_responded = true - break - end - end - end - assert_output(socket:write(key, ": ", value, "\r\n")) - end, - -- method to announce (and enforce) connection close after sending the response: - close_after_finish = function() - assert_not_faulty() - if - output_state == "headers_sent" or - output_state == "finished" - then - error("All HTTP headers have already been sent") - end - connection_close_requested = true - end, - -- method to finish and flush headers: - finish_headers = function() - assert_not_faulty() - finish_headers(true) - end, - -- send data for response body: - send_data = function(self, ...) - assert_not_faulty() - if output_state == "info_status_sent" then - error("No (non-informational) HTTP status has been sent yet") - elseif output_state == "bodyless_status_sent" then - error("Cannot send response data for body-less status message") - end - finish_headers(false) - if output_state ~= "headers_sent" then - error("Unexpected internal status in HTTP engine") - end - if request.method == "HEAD" then - return - end - for i = 1, select("#", ...) do - local str = tostring(select(i, ...)) - if #str > 0 then - if content_length then - local bytes_to_send = #str - if bytes_sent + bytes_to_send > content_length then - assert_output(socket:write(string.sub(str, 1, content_length - bytes_sent))) - bytes_sent = content_length - error("Content length exceeded") - else - assert_output(socket:write(str)) - bytes_sent = bytes_sent + bytes_to_send - end - else - chunk_bytes = chunk_bytes + #str - chunk_parts[#chunk_parts+1] = str - end - end - end - if chunk_bytes >= output_chunk_size then - send_chunk() - end - end, - -- flush output buffer: - flush = function(self) - assert_not_faulty() - send_chunk() - assert_output(socket:flush()) - end, - -- finish response: - finish = function(self) - assert_not_faulty() - if output_state == "finished" then - return - elseif output_state == "info_status_sent" then - error("Informational HTTP response can be finished with :finish_headers() method") - end - finish_headers(false) - if output_state == "headers_sent" then - if request.method ~= "HEAD" then - if content_length then - if bytes_sent ~= content_length then - error("Content length not used") - end - else - send_chunk() - assert_output(socket:write("0\r\n\r\n")) - end - finish_response() - end - output_state = "finished" - elseif output_state ~= "finished" then - error("Unexpected internal status in HTTP engine") - end - end, - -- table mapping header field names to value-lists - -- (raw access): - headers = setmetatable({}, { - __index = function(self, key) - local lowerkey = string.lower(key) - if lowerkey == key then - return - end - local result = rawget(self, lowerkey) - if result == nil then - result = {} - end - self[lowerkey] = result - self[key] = result - return result - end - }), - -- table mapping header field names to value-lists - -- (for headers with comma separated values): - headers_csv_table = setmetatable({}, { - __index = function(self, key) - local result = {} - for i, line in ipairs(request.headers[key]) do - for entry in string.gmatch(line, "[^,]+") do - local value = string.match(entry, "^[ \t]*(..-)[ \t]*$") - if value then - result[#result+1] = value - end - end - end - self[key] = result - return result - end - }), - -- table mapping header field names to a comma separated string - -- (for headers with comma separated values): - headers_csv_string = setmetatable({}, { - __index = function(self, key) - local result = {} - for i, line in ipairs(request.headers[key]) do - result[#result+1] = line - end - result = string.concat(result, ", ") - self[key] = result - return result - end - }), - -- table mapping header field names to a single string value - -- (or false if header has been sent multiple times): - headers_value = setmetatable({}, { - __index = function(self, key) - if headers_value_nil[key] then - return nil - end - local result = nil - local values = request.headers_csv_table[key] - if #values == 0 then - headers_value_nil[key] = true - elseif #values == 1 then - result = values[1] - else - result = false - end - self[key] = result - return result - end - }), - -- table mapping header field names to a flag table, - -- indicating if the comma separated value contains certain entries: - headers_flags = setmetatable({}, { - __index = function(self, key) - local result = setmetatable({}, { - __index = function(self, key) - local lowerkey = string.lower(key) - local result = rawget(self, lowerkey) or false - self[lowerkey] = result - self[key] = result - return result - end - }) - for i, value in ipairs(request.headers_csv_table[key]) do - result[string.lower(value)] = true - end - self[key] = result - return result - end - }), - -- register POST param stream handler for a single field name: - stream_post_param = function(self, field_name, callback) - assert_not_faulty() - if input_state == "inprogress" or input_state == "finished" then - error("Cannot register POST param streaming function if request body is already processed") - end - streamed_post_params[field_name] = callback - end, - -- register POST param stream handler for a field name pattern: - stream_post_params = function(self, pattern, callback) - assert_not_faulty() - if input_state == "inprogress" or input_state == "finished" then - error("Cannot register POST param streaming function if request body is already processed") - end - streamed_post_param_patterns[#streamed_post_param_patterns+1] = {pattern, callback} - end, - -- disables automatic request body processing on write - -- (use with caution): - defer_reading = function(self) - assert_not_faulty() - if input_state == "pending" then - input_state = "deferred" - end - end, - -- processes the request body and sets the request.post_params, - -- request.post_params_list, request.meta_post_params, and - -- request.meta_post_params_list values (can be called manually or - -- automatically if post_params are accessed or data is written out) - process_request_body = function(self) - assert_not_faulty() - if input_state == "finished" then - return - end - local post_params_list, post_params = new_params_list() - local content_type = request.headers_value["Content-Type"] - if content_type then - if - content_type == "application/x-www-form-urlencoded" or - string.match(content_type, "^application/x%-www%-form%-urlencoded *;") - then - read_urlencoded_form(post_params_list, request.body) - else - local boundary = string.match( - content_type, - '^multipart/form%-data[ \t]*[;,][ \t]*boundary="([^"]+)"$' - ) or string.match( - content_type, - '^multipart/form%-data[ \t]*[;,][ \t]*boundary=([^"; \t]+)$' - ) - if boundary then - local post_metadata_list, post_metadata = new_params_list() - boundary = "--" .. boundary - local headerdata = "" - local streamer - local field_name - local metadata = {} - local value_parts - local function default_streamer(chunk) - value_parts[#value_parts+1] = chunk - end - local function stream_part_finish() - if streamer == default_streamer then - local value = table.concat(value_parts) - value_parts = nil - if field_name then - local values = post_params_list[field_name] - values[#values+1] = value - local metadata_entries = post_metadata_list[field_name] - metadata_entries[#metadata_entries+1] = metadata - end - else - streamer() - end - headerdata = "" - streamer = nil - field_name = nil - metadata = {} - end - local function stream_part_chunk(chunk) - if streamer then - streamer(chunk) - else - headerdata = headerdata .. chunk - while true do - local line, remaining = string.match(headerdata, "^(.-)\r?\n(.*)$") - if not line then - break - end - if line == "" then - streamer = streamed_post_params[field_name] - if not streamer then - for i, rule in ipairs(streamed_post_param_patterns) do - if string.match(field_name, rule[1]) then - streamer = rule[2] - break - end - end - end - if not streamer then - value_parts = {} - streamer = default_streamer - end - streamer(remaining, field_name, metadata) - return - end - headerdata = remaining - local header_key, header_value = string.match(line, "^([^:]*):[ \t]*(.-)[ \t]*$") - if not header_key then - request_error(true, "400 Bad Request", "Invalid header in multipart/form-data part") - end - header_key = string.lower(header_key) - if header_key == "content-disposition" then - local escaped_header_value = string.gsub(header_value, '"[^"]*"', function(str) - return string.gsub(str, "=", "==") - end) - field_name = string.match(escaped_header_value, ';[ \t]*name="([^"]*)"') - if field_name then - field_name = string.gsub(field_name, "==", "=") - else - field_name = string.match(header_value, ';[ \t]*name=([^"; \t]+)') - end - metadata.file_name = string.match(escaped_header_value, ';[ \t]*filename="([^"]*)"') - if metadata.file_name then - metadata.file_name = string.gsub(metadata.file_name, "==", "=") - else - string.match(header_value, ';[ \t]*filename=([^"; \t]+)') - end - elseif header_key == "content-type" then - metadata.content_type = header_value - elseif header_key == "content-transfer-encoding" then - request_error(true, "400 Bad Request", "Content-transfer-encoding not supported by multipart/form-data parser") - end - end - end - end - local skippart = true -- ignore data until first boundary - local afterbound = false -- interpret 2 bytes after boundary ("\r\n" or "--") - local terminated = false -- final boundary read - local bigchunk = "" - request:stream_request_body(function(chunk) - if terminated then - return - end - bigchunk = bigchunk .. chunk - while true do - if afterbound then - if #bigchunk <= 2 then - return - end - local terminator = string.sub(bigchunk, 1, 2) - if terminator == "\r\n" then - afterbound = false - bigchunk = string.sub(bigchunk, 3) - elseif terminator == "--" then - terminated = true - bigchunk = nil - return - else - request_error(true, "400 Bad Request", "Error while parsing multipart body (expected CRLF or double minus)") - end - end - local pos1, pos2 = string.find(bigchunk, boundary, 1, true) - if not pos1 then - if not skippart then - local safe = #bigchunk-#boundary - if safe > 0 then - stream_part_chunk(string.sub(bigchunk, 1, safe)) - bigchunk = string.sub(bigchunk, safe+1) - end - end - return - end - if not skippart then - stream_part_chunk(string.sub(bigchunk, 1, pos1 - 1)) - stream_part_finish() - else - boundary = "\r\n" .. boundary - skippart = false - end - bigchunk = string.sub(bigchunk, pos2 + 1) - afterbound = true - end - end) - if not terminated then - request_error(true, "400 Bad Request", "Premature end of multipart/form-data request body") - end - request.post_metadata_list, request.post_metadata = post_metadata_list, post_metadata - else - request_error(true, "415 Unsupported Media Type", "Unknown Content-Type of request body") - end - end - end - request.post_params_list, request.post_params = post_params_list, post_params - end, - -- stream request body to an (optional) callback function - -- without processing it otherwise: - stream_request_body = function(self, callback) - assert_not_faulty() - if input_state ~= "pending" and input_state ~= "deferred" then - if callback then - if input_state == "inprogress" then - error("Request body is currently being processed") - else - error("Request body has already been processed") - end - end - return - end - input_state = "inprogress" - if request.headers_flags["Expect"]["100-continue"] then - request:send_status("100 Continue") - request:finish_headers() - end - if request.headers_flags["Transfer-Encoding"]["chunked"] then - while true do - local line = socket:read(32 + remaining_body_size_limit, "\n") - if not line then - request_error(true, "400 Bad Request", "Unexpected EOF while reading next chunk of request body") - end - local zeros, lenstr = string.match(line, "^(0*)([1-9A-Fa-f]+[0-9A-Fa-f]*)\r?\n$") - local chunkext - if lenstr then - chunkext = "" - else - zeros, lenstr, chunkext = string.match(line, "^(0*)([1-9A-Fa-f]+[0-9A-Fa-f]*)([ \t;].-)\r?\n$") - end - if not lenstr or #lenstr > 13 then - request_error(true, "400 Bad Request", "Encoding error or unexpected EOF or read error while reading chunk of request body") - end - local len = tonumber("0x" .. lenstr) - remaining_body_size_limit = remaining_body_size_limit - (#zeros + #chunkext + len) - if remaining_body_size_limit < 0 then - request_error(true, "413 Request Entity Too Large", "Request body size limit exceeded") - end - if len == 0 then break end - read_body_bytes(len, callback) - local term = socket:read(2, "\n") - if term ~= "\r\n" and term ~= "\n" then - request_error(true, "400 Bad Request", "Encoding error while reading chunk of request body") - end - end - while true do - local line = socket:read(2 + remaining_body_size_limit, "\n") - if line == "\r\n" or line == "\n" then break end - remaining_body_size_limit = remaining_body_size_limit - #line - if remaining_body_size_limit < 0 then - request_error(true, "413 Request Entity Too Large", "Request body size limit exceeded while reading trailer section of chunked request body") - end - end - elseif request_body_content_length then - read_body_bytes(request_body_content_length, callback) - end - input_state = "finished" - end - } - -- initialize tables for GET params in request object: - request.get_params_list, request.get_params = new_params_list() - -- add meta table to request object to allow access to "body" and POST params: - setmetatable(request, { - __index = function(self, key) - if key == "body" then - local chunks = {} - request:stream_request_body(function(chunk) - chunks[#chunks+1] = chunk - end) - self.body = table.concat(chunks) - return self.body - elseif - key == "post_params_list" or key == "post_params" or - key == "post_metadata_list" or key == "post_metadata" - then - request:process_request_body() - return request[key] - end - end - }) - -- wait for input: - if not moonbridge_io.poll({[socket] = true}, nil, request_idle_timeout) then - return request_error(false, "408 Request Timeout") + end + until true +end + +function request_pt:_error(status, explanation) +end + +function request_pt:_read(...) + local line, status = self._socket:read_yield(...) + if line == nil then + self._faulty = true + error(status) + else + return line, status + end +end + +function request_pt:_read_headers() + local remaining = self._header_size_limit + -- read and parse request line: + local target, proto + do + local line, status = self:_read(remaining-2, "\n") + if status == "maxlen" then + self:_error("414 Request-URI Too Long") + return false + elseif status == "eof" then + if line ~= "" then + self:_error("400 Bad Request", "Unexpected EOF in request-URI line") end - -- set timeout for request header processing: - timeout(request_header_timeout) - -- read and parse request line: - local line = socket:read(remaining_header_size_limit, "\n") - if not line then return survive end - remaining_header_size_limit = remaining_header_size_limit - #line - if remaining_header_size_limit == 0 then - return request_error(false, "414 Request-URI Too Long") - end - local target, proto - request.method, target, proto = - line:match("^([^ \t\r]+)[ \t]+([^ \t\r]+)[ \t]*([^ \t\r]*)[ \t]*\r?\n$") - if not request.method then - return request_error(false, "400 Bad Request") - elseif proto ~= "HTTP/1.1" then - return request_error(false, "505 HTTP Version Not Supported") - end - -- read and parse headers: - while true do - local line = socket:read(remaining_header_size_limit, "\n"); - remaining_header_size_limit = remaining_header_size_limit - #line - if not line then - return request_error(false, "400 Bad Request") - end - if line == "\r\n" or line == "\n" then - break - end - if remaining_header_size_limit == 0 then - return request_error(false, "431 Request Header Fields Too Large") - end - local key, value = string.match(line, "^([^ \t\r]+):[ \t]*(.-)[ \t]*\r?\n$") - if not key then - return request_error(false, "400 Bad Request") - end - local values = request.headers[key] - values[#values+1] = value - end - -- process "Connection: close" header if existent: - connection_close_requested = request.headers_flags["Connection"]["close"] - -- process "Content-Length" header if existent: - do - local values = request.headers_csv_table["Content-Length"] - if #values > 0 then - request_body_content_length = tonumber(values[1]) - local proper_value = tostring(request_body_content_length) - for i, value in ipairs(values) do - value = string.match(value, "^0*(.*)") - if value ~= proper_value then - return request_error(false, "400 Bad Request", "Content-Length header(s) invalid") - end - end - if request_body_content_length > remaining_body_size_limit then - return request_error(false, "413 Request Entity Too Large", "Announced request body size is too big") - end + return false + end + remaining = remaining - #line + self.method, target, proto = + line:match("^([^ \t\r]+)[ \t]+([^ \t\r]+)[ \t]*([^ \t\r]*)[ \t]*\r?\n$") + if not request.method then + self:_error("400 Bad Request", "Invalid request-URI line") + return false + elseif proto ~= "HTTP/1.1" then + self:_error("505 HTTP Version Not Supported") + return false + end + end + -- read and parse headers: + while true do + local line, status = self:_read(remaining, "\n"); + if status == "maxlen" then + self:_error("431 Request Header Fields Too Large") + return false + elseif status == "eof" then + self:_error("400 Bad Request", "Unexpected EOF in request headers") + return false + end + remaining = remaining - #line + if line == "\r\n" or line == "\n" then + break + end + local key, value = string.match(line, "^([^ \t\r]+):[ \t]*(.-)[ \t]*\r?\n$") + if not key then + self:_error("400 Bad Request", "Invalid header line") + return false + end + local lowerkey = key:lower() + local values = self._headers[lowerkey] + if values then + values[#values+1] = value + else + self._headers[lowerkey] = {value} + end + end + -- process "Connection: close" header if existent: + self._connection_close_requested = self.headers_flags["Connection"]["close"] + -- process "Content-Length" header if existent: + do + local values = self.headers_csv_table["Content-Length"] + if #values > 0 then + self._request_body_content_length = tonumber(values[1]) + local proper_value = tostring(request_body_content_length) + for i, value in ipairs(values) do + value = string.match(value, "^0*(.*)") + if value ~= proper_value then + self:_error("400 Bad Request", "Content-Length header(s) invalid") + return false end end - -- process "Transfer-Encoding" header if existent: - do - local flag = request.headers_flags["Transfer-Encoding"]["chunked"] - local list = request.headers_csv_table["Transfer-Encoding"] - if (flag and #list ~= 1) or (not flag and #list ~= 0) then - return request_error(false, "400 Bad Request", "Unexpected Transfer-Encoding") - end + if request_body_content_length > self._body_size_limit then + self:_error("413 Request Entity Too Large", "Announced request body size is too big") + return false end - -- process "Expect" header if existent: - for i, value in ipairs(request.headers_csv_table["Expect"]) do - if string.lower(value) ~= "100-continue" then - return request_error(false, "417 Expectation Failed", "Unexpected Expect header") - end - end - -- get mandatory Host header according to RFC 7230: - request.host = request.headers_value["Host"] - if not request.host then - return request_error(false, "400 Bad Request", "No valid host header") + end + end + -- process "Transfer-Encoding" header if existent: + do + local flag = self.headers_flags["Transfer-Encoding"]["chunked"] + local list = self.headers_csv_table["Transfer-Encoding"] + if (flag and #list ~= 1) or (not flag and #list ~= 0) then + self:_error("400 Bad Request", "Unexpected Transfer-Encoding") + return false + end + end + -- process "Expect" header if existent: + for i, value in ipairs(self.headers_csv_table["Expect"]) do + if string.lower(value) ~= "100-continue" then + self:_error("417 Expectation Failed", "Unexpected Expect header") + return false + end + end + -- get mandatory Host header according to RFC 7230: + self.host = self.headers_value["Host"] + if not self.host then + self:_error("400 Bad Request", "No valid host header") + return false + end + -- parse request target: + self.path, self.query = string.match(target, "^/([^?]*)(.*)$") + if not self.path then + local host2 + host2, self.path, self.query = string.match(target, "^[Hh][Tt][Tt][Pp]://([^/?]+)/?([^?]*)(.*)$") + if host2 then + if self.host ~= host2 then + self:_error("400 Bad Request", "No valid host header") + return false end - -- parse request target: - request.path, request.query = string.match(target, "^/([^?]*)(.*)$") - if not request.path then - local host2 - host2, request.path, request.query = string.match(target, "^[Hh][Tt][Tt][Pp]://([^/?]+)/?([^?]*)(.*)$") - if host2 then - if request.host ~= host2 then - return request_error(false, "400 Bad Request", "No valid host header") - end - elseif not (target == "*" and request.method == "OPTIONS") then - return request_error(false, "400 Bad Request", "Invalid request target") - end - end - -- parse GET params: - if request.query then - read_urlencoded_form(request.get_params_list, request.query) - end - -- parse cookies: - for i, line in ipairs(request.headers["Cookie"]) do - for rawkey, rawvalue in - string.gmatch(line, "([^=; ]*)=([^=; ]*)") - do - request.cookies[decode_uri(rawkey)] = decode_uri(rawvalue) - end - end - -- (re)set timeout for handler: - timeout(response_timeout or 0) - -- call underlying handler and remember boolean result: - if handler(request) ~= true then survive = false end - -- finish request (unless already done by underlying handler): - request:finish() - -- stop timeout timer: - timeout(0) - until connection_close_responded - return survive + elseif not (target == "*" and self.method == "OPTIONS") then + self:_error("400 Bad Request", "Invalid request target") + end + end + -- parse GET params: + if self.query then + self.get_params_list = read_urlencoded_form(request.query) + self.get_params = get_first_values(self.get_params_list) + end + -- parse cookies: + for i, line in ipairs(self.headers["Cookie"]) do + for rawkey, rawvalue in + string.gmatch(line, "([^=; ]*)=([^=; ]*)") + do + self.cookies[decode_uri(rawkey)] = decode_uri(rawvalue) + end end end +function request_pt:_assert_not_faulty() + assert(not self._faulty, "Tried to use faulty request handle") +end + +function request_pt:_write_yield() + self:_consume_input() + self._poll(self._socket_set, self._socket_set) +end + +function request_pt:_write(...) + assert(self._socket:write_call(self._write_yield_closure, ...)) +end + +function request_pt:_flush(...) + assert(self._socket:write_call(self._write_yield_closure, ...)) +end + +function request_pt:_drain_input() + socket:drain_nb(self._input_chunk_size) +end + + +-- function creating a HTTP handler: +function generate_handler(handler, options) + -- swap arguments if necessary (for convenience): + if type(handler) ~= "function" and type(options) == "function" then + handler, options = options, handler + end + local request = setmetatable({}, request_mt) + request:_init(handler, options) + return request._handler_closure +end + return _M